Code Intermediate medium · 6 min

Replay: re-running from a checkpoint

What you will learn
Resume graph execution from a saved checkpoint instead of re-running the entire workflow from the beginning.

Why this matters

In production, graphs may run for hours or fail mid-execution. Replaying from a checkpoint lets you resume without losing computation cost and time: critical for expensive LLM calls, long-running tasks, or debugging state at a specific step.

Skip if: Don't use replay if your graph has side effects that shouldn't repeat (external API calls, database writes, file deletions). Only use replay when re-executing nodes is idempotent or you explicitly want to re-run those operations.

Explanation

Replay means resuming graph execution from a stored checkpoint: a frozen snapshot of the graph's state at a specific step: rather than starting fresh from the input. MemorySaver() captures state after each node runs. When you call graph.invoke(input, config={'configurable': {'thread_id': 'id'}}) with an existing thread, langgraph resumes from the last saved state, skipping already-completed nodes. You retrieve a checkpoint using graph.get_state(config), which returns the exact state dict that can be fed back into invoke() with the same thread ID. This is essential for long pipelines: if node 3 crashes, you replay from node 3's input without re-running nodes 1–2.

Analogy

Like pausing a video game, saving your exact position and inventory, then loading from that save 3 days later. You don't replay the entire game: you resume from the checkpoint. If you died at a boss, you load the save and retry only the boss fight.

Code

Illustrative only - not runnable without a valid API key
python
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict
import json

class State(TypedDict):
    messages: list[str]
    step: int

def node_a(state: State) -> State:
    print(f"Running node_a, step={state['step']}")
    return {"messages": state["messages"] + ["A completed"], "step": state["step"] + 1}

def node_b(state: State) -> State:
    print(f"Running node_b, step={state['step']}")
    if state["step"] == 2:
        raise ValueError("Intentional error at node_b")
    return {"messages": state["messages"] + ["B completed"], "step": state["step"] + 1}

def node_c(state: State) -> State:
    print(f"Running node_c, step={state['step']}")
    return {"messages": state["messages"] + ["C completed"], "step": state["step"] + 1}

graph = StateGraph(State)
graph.add_node("a", node_a)
graph.add_node("b", node_b)
graph.add_node("c", node_c)

graph.add_edge(START, "a")
graph.add_edge("a", "b")
graph.add_edge("b", "c")
graph.add_edge("c", END)

checkpoint_saver = MemorySaver()
compiled_graph = graph.compile(checkpointer=checkpoint_saver)

initial_state = {"messages": [], "step": 1}

print("=== First run: will fail at node_b ===")
try:
    result = compiled_graph.invoke(
        initial_state,
        config={"configurable": {"thread_id": "thread_1"}}
    )
except ValueError as e:
    print(f"Error caught: {e}")

print("\n=== Inspect checkpoint after failure ===")
state_snapshot = compiled_graph.get_state({"configurable": {"thread_id": "thread_1"}})
print(f"Checkpoint state: {state_snapshot.values}")
print(f"Next nodes to execute: {state_snapshot.next}")

print("\n=== Replay from checkpoint (simulating fix) ===")
print("(In production, you'd fix the bug, then replay)")
print(f"Resuming from thread_id='thread_1'...")

modified_state = {**state_snapshot.values, "step": 3}
result = compiled_graph.invoke(
    modified_state,
    config={"configurable": {"thread_id": "thread_1"}}
)

print(f"Final result: {result}")
Output
=== First run: will fail at node_b ===
Running node_a, step=1
Running node_b, step=2
Error caught: Intentional error at node_b

=== Inspect checkpoint after failure ===
Checkpoint state: {'messages': ['A completed'], 'step': 2}
Next nodes to execute: ('b',)

=== Replay from checkpoint (simulating fix) ==="
(In production, you'd fix the bug, then replay)
Resuming from thread_id='thread_1'...
Running node_b, step=3
Running node_c, step=4
Final result: {'messages': ['A completed', 'B completed', 'C completed'], 'step': 4}

What just happened?

First run completed node_a but failed in node_b, with state saved at that point. <code>get_state()</code> retrieved the frozen checkpoint showing 'A completed' and step=2, plus which node was about to run (node_b). We then called <code>invoke()</code> again with the same thread_id but modified state (step=3), which resumed from the saved point and executed only node_b and node_c, skipping the re-execution of node_a.

Common gotcha

Developers assume invoke() with the same thread_id automatically replays from failure. It doesn't: it replays from the last successfully saved checkpoint, which is after each node completes. If a node crashes mid-execution before state is updated, the checkpoint is from before that node ran. You must manually inspect get_state().next to see which node will run next, and you must manually fix the state or bug before calling invoke() again. Blindly re-invoking the same input will hit the same error.

Error recovery

KeyError on thread_id
You passed a thread_id in config that doesn't exist. Use a consistent thread_id string that matches the one from the original run, e.g., config={'configurable': {'thread_id': 'thread_1'}}.
AttributeError: 'NoneType' object has no attribute 'values'
get_state() returned None because the thread_id doesn't exist or no checkpoint was saved. Ensure checkpointer=MemorySaver() is passed to compile() and the thread_id matches the invoke() call.
ValueError during node re-execution
Your fix didn't fully resolve the issue. Either the same error triggers again, or the state you passed to invoke() is malformed. Validate that the state dict keys and types match your State TypedDict schema.

Experienced dev note

Replay is not 'resume' in the async sense: it's deterministic re-execution from a saved state snapshot. If your nodes have side effects (API calls, database writes), they will re-run. Use replay only for idempotent operations or wrap side effects in a separate, checkpointed 'write' node that runs after your main logic. In production, store thread_ids tied to user sessions or request IDs so you can replay specific user workflows. Also: never assume a checkpoint exists: always guard get_state() with a null check or catch KeyError when the thread doesn't exist yet.

Check your understanding

If a graph fails at node_b after successfully running node_a, and you call invoke() again with the same thread_id but unmodified input state, which nodes will execute and why?

Show answer hint

The answer must address (1) that only node_b (and its dependents) will re-run, not node_a, because the checkpoint saved after node_a completed, and (2) whether the same error recurs depends on whether the input state or underlying logic changed: it likely will error again at the same point unless you modified the state or fixed the bug.

VERSION In langgraph < 0.2.0, checkpoint syntax and get_state() return type differed. Ensure you're on 0.2.x or later. The StateGraph + START/END imports are required in 0.2.x; MessageGraph is deprecated.
NEXT

Learn how to <strong>branch and merge execution paths</strong>: running multiple nodes in parallel and collecting their results before proceeding, which pairs naturally with replay for debugging complex conditional flows.

Community Notes

No notes yetBe the first to share a version-specific fix or tip.