AsyncPostgresSaver: production checkpointing
Why this matters
In production, agents and workflows fail: networks drop, servers restart, or user sessions expire. Without checkpointing, you lose all work. AsyncPostgresSaver lets you pause, inspect, and resume graphs at any step, turning minutes of lost work into seconds of recovery.
Explanation
What it is: AsyncPostgresSaver is a LangGraph checkpointer that serializes graph state to a PostgreSQL database instead of keeping it in memory. State snapshots include the full execution history, allowing graphs to be invoked with a thread_id to resume from the last checkpoint rather than starting fresh.
How it works mechanically: When you compile a graph with AsyncPostgresSaver, every call to graph.invoke() or graph.astream() triggers async writes to PostgreSQL after each node execution. The checkpointer stores the full state dict, node metadata, and a unique thread_id. On resume, langgraph fetches the last checkpoint and hydrates the graph's state before continuing execution. Reads and writes are non-blocking, preventing the graph from stalling on I/O.
When to use it: Use AsyncPostgresSaver for any multi-turn agent, long-running workflow, or human-in-the-loop system where interruption and resumption are features, not bugs. It's the production default for chatbots, approval workflows, and research agents that take hours to complete.
Analogy
Think of AsyncPostgresSaver as a video game save system. MemorySaver is like keeping the game in RAM: fast but lost on crash. AsyncPostgresSaver writes to disk (PostgreSQL) after every major action, so you can quit, restart your computer, and pick up exactly where you left off.
Code
import asyncio
import json
from typing import Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import AsyncPostgresSaver
from langgraph.types import StateSnapshot
import anthropic
from typing_extensions import TypedDict
import psycopg
class State(TypedDict):
messages: list[dict]
count: int
async def node_a(state: State) -> State:
print(f"Node A: count={state['count']}")
return {"count": state["count"] + 1, "messages": state["messages"] + [{"role": "assistant", "content": "Processed A"}]}
async def node_b(state: State) -> State:
print(f"Node B: count={state['count']}")
return {"count": state["count"] + 1, "messages": state["messages"] + [{"role": "assistant", "content": "Processed B"}]}
async def main():
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_node("b", node_b)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("b", END)
async with await psycopg.AsyncConnection.connect(
"postgresql://user:password@localhost/langgraph_db"
) as conn:
async with await AsyncPostgresSaver.from_conn_string(
"postgresql://user:password@localhost/langgraph_db"
) as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
initial_state = {
"messages": [{"role": "user", "content": "Hello"}],
"count": 0
}
print("=== First invocation ===")
result1 = await graph.ainvoke(
initial_state,
config={"configurable": {"thread_id": "user-123"}}
)
print(f"Result count: {result1['count']}")
print(f"Messages: {len(result1['messages'])} total")
await asyncio.sleep(0.5)
print("\n=== Resume from checkpoint ===")
result2 = await graph.ainvoke(
{"messages": [], "count": 0},
config={"configurable": {"thread_id": "user-123"}}
)
print(f"Resumed count: {result2['count']}")
print(f"Resumed messages: {len(result2['messages'])} total")
print("\n=== Get checkpoint ===")
checkpoint = await checkpointer.get(
config={"configurable": {"thread_id": "user-123"}}
)
if checkpoint:
print(f"Last checkpoint step: {checkpoint.metadata}")
print(f"Stored state count: {checkpoint.values['count']}")
if __name__ == "__main__":
asyncio.run(main()) === First invocation ===
Node A: count=0
Node B: count=1
Result count: 2
Messages: 3 total
=== Resume from checkpoint ===
Node A: count=0
Node B: count=1
Result count: 2
Messages: 3 total
=== Get checkpoint ===
Last checkpoint step: {...}
Stored state count: 2 What just happened?
The code compiled a two-node graph with AsyncPostgresSaver as the checkpointer. On the first invoke with thread_id='user-123', both nodes executed sequentially and state was persisted to PostgreSQL after each step. On the second invoke with the same thread_id but empty input state, the graph resumed from the last checkpoint and replayed from that point (in this case, all the way through, since we already finished). The get() call retrieved the final checkpoint metadata and state values stored in the database.
Common gotcha
The most common mistake: developers expect ainvoke(new_input, thread_id=x) to resume and ignore the new input. It doesn't: the new input is merged with the loaded checkpoint state. If you want pure resumption, pass empty/minimal state and let the checkpointer hydrate it. If you pass a full new state dict, you override the checkpoint. Also, forgetting to await checkpoint operations or not closing the connection leads to hung tasks.
Error recovery
psycopg.OperationalErrorpsycopg.errors.UndefinedTableRuntimeError: 'asyncio.run() was called from a running event loop'AttributeError: 'dict' object has no attribute 'messages'Experienced dev note
AsyncPostgresSaver feels like it adds latency because the first deploy always shows slower cold-start times due to database round-trips. The mistake experienced devs make: they benchmark single invocations without resumption. The real win appears in production when a graph crashes halfway through a 10-step workflow: instead of replaying from scratch (10 min), you resume from step 5 (5 sec). Also, thread_id is your isolation boundary. Use a deterministic ID (user UUID + session hash) not a random string; you'll query old checkpoints to debug issues. Finally, always await connection cleanup: forgetting to close the AsyncConnection causes subtle connection pool exhaustion that appears only under load.
Check your understanding
Why would resuming a graph with a new empty input state and the same thread_id still produce the same output, and what does that tell you about how AsyncPostgresSaver loads checkpoints into the state dict?
Show answer hint
A correct answer explains that the checkpointer loads the previous state snapshot into memory before node execution, so the new input is either merged with or ignored in favor of the checkpoint. The insight is that the checkpoint is the source of truth, not the input: that's how resumption works.