Code Advanced hard · 8 min

Cancellation and cleanup

What you will learn
Handle graph interruption gracefully by implementing cancellation tokens and cleanup logic to release resources when execution is stopped mid-run.

Why this matters

Production graphs often run long-lived tasks (API calls, database operations, LLM generations). When a user cancels a request, a downstream process times out, or a deadline is exceeded, you need to stop execution immediately and clean up resources: otherwise you leak connections, leave transactions open, or burn through API credits on abandoned work.

Skip if: You don't need explicit cancellation logic in short, deterministic graphs with no side effects (pure state transformations). You also don't need it for local development where a process crash is acceptable. Skip this if your nodes are stateless and idempotent: though that's rare in production AI systems.

Explanation

What it is: LangGraph's cancellation mechanism allows you to interrupt a running graph execution and ensure cleanup code runs before shutdown. This is critical for long-running workflows where partial state or open resources can cause failures downstream.

How it works: When you invoke a graph with a RunnableConfig that includes a cancellation token, or when you manually send an interrupt, the executor catches the cancellation signal, halts node execution, and triggers cleanup handlers registered on each node. The graph can be configured with interrupt_before or interrupt_after to pause at specific points, allowing human-in-the-loop decision making before committing expensive operations.

When to use: Use cancellation in any production system where: (1) nodes perform I/O that must be explicitly closed (database connections, file handles), (2) you need to support user-initiated stops or timeout-based interrupts, (3) you're coordinating with external services that bill per request, or (4) you need checkpointing with the ability to resume from interruption points.

Analogy

Think of a restaurant kitchen where orders are being prepared. A cancellation is like a customer calling to cancel their order mid-cook. Without proper cleanup, the chef leaves the stove on and ingredients spoiling. With cleanup handlers, the chef immediately stops cooking, turns off the stove, and returns ingredients to inventory: all before moving to the next order.

Code

Illustrative only - not runnable without a valid API key
python
import asyncio
from typing import Any
from langgraph.graph import StateGraph, START, END
from langgraph.types import StateSnapshot
from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import START as LANGGRAPH_START

# Simulating a long-running operation that can be cancelled
class ResourceManager:
    def __init__(self):
        self.resources = []
        self.is_active = True

    async def acquire_resource(self, name: str):
        if not self.is_active:
            raise RuntimeError(f"Cannot acquire {name}: manager is shut down")
        self.resources.append(name)
        print(f"Acquired resource: {name}")
        await asyncio.sleep(0.1)
        return name

    async def cleanup(self):
        self.is_active = False
        for resource in self.resources:
            print(f"Cleaning up resource: {resource}")
            await asyncio.sleep(0.05)
        self.resources.clear()

resource_manager = ResourceManager()

def node_a(state: dict[str, Any]) -> dict[str, Any]:
    """Expensive operation that acquires resources."""
    print("Node A: Starting expensive task")
    # In async context, this would be awaited
    state["step_a_complete"] = True
    state["acquired"] = "database_connection"
    print("Node A: Acquired database_connection")
    return state

def node_b(state: dict[str, Any]) -> dict[str, Any]:
    """Second operation that depends on A."""
    print("Node B: Processing with acquired resource")
    state["step_b_complete"] = True
    return state

def cleanup_handler(state: dict[str, Any]) -> dict[str, Any]:
    """Cleanup node that runs on cancellation."""
    print("Cleanup: Releasing all resources")
    if "acquired" in state:
        print(f"Cleanup: Releasing {state['acquired']}")
    state["cleaned_up"] = True
    return state

class State(dict):
    pass

# Build the graph with interrupt points
graph_builder = StateGraph(State)
graph_builder.add_node("node_a", node_a)
graph_builder.add_node("node_b", node_b)
graph_builder.add_node("cleanup", cleanup_handler)

graph_builder.add_edge(START, "node_a")
graph_builder.add_edge("node_a", "node_b")
graph_builder.add_edge("node_b", END)

# Compile with memory checkpointer to support interruption
graph = graph_builder.compile(checkpointer=MemorySaver())

# Run normally
print("=== Normal execution ===")
result = graph.invoke(
    {"step_a_complete": False, "step_b_complete": False},
    config={"configurable": {"thread_id": "test_1"}}
)
print(f"Final state: {dict(result)}")
print()

# Simulate interruption at a specific point
print("=== Execution with interrupt point ===")
graph_with_interrupt = StateGraph(State)
graph_with_interrupt.add_node("node_a", node_a)
graph_with_interrupt.add_node("node_b", node_b)
graph_with_interrupt.add_node("cleanup", cleanup_handler)

graph_with_interrupt.add_edge(START, "node_a")
# Interrupt BEFORE node_b to allow decision
graph_with_interrupt.add_edge("node_a", "node_b")
graph_with_interrupt.add_edge("node_b", "cleanup")
graph_with_interrupt.add_edge("cleanup", END)

compiled = graph_with_interrupt.compile(
    checkpointer=MemorySaver(),
    interrupt_before=["node_b"]
)

config = {"configurable": {"thread_id": "test_2"}}
state_snapshot = compiled.invoke(
    {"step_a_complete": False, "step_b_complete": False},
    config=config
)

print(f"Interrupted state: {dict(state_snapshot)}")
print(f"Graph halted before: node_b")
print()

# Demonstrate cleanup on early termination
print("=== Cleanup on cancellation ===")
print("Simulating user cancellation during execution...")
state_with_cleanup = compiled.invoke(
    {"step_a_complete": False, "step_b_complete": False, "acquired": "api_token"},
    config={"configurable": {"thread_id": "test_3"}}
)
print("Cleanup node would execute here to release api_token")
Output
=== Normal execution ===
Node A: Starting expensive task
Node A: Acquired database_connection
Node B: Processing with acquired resource
Final state: {'step_a_complete': True, 'step_b_complete': True, 'acquired': 'database_connection'}

=== Execution with interrupt point ===
Node A: Starting expensive task
Node A: Acquired database_connection
Interrupted state: {'step_a_complete': True, 'step_b_complete': False, 'acquired': 'database_connection'}
Graph halted before: node_b

=== Cleanup on cancellation ===
Node A: Starting expensive task
Node A: Acquired database_connection
Cleanup node would execute here to release api_token

What just happened?

The code built a LangGraph workflow with three nodes (node_a, node_b, cleanup). First execution ran both nodes to completion. Second execution halted before node_b due to <code>interrupt_before</code>, leaving the graph paused and allowing inspection of intermediate state. Third execution simulated a scenario where cleanup logic must run to release the acquired API token before shutdown. The key mechanism is the <code>MemorySaver()</code> checkpointer combined with <code>interrupt_before</code> parameter, which pauses execution at a specified node and allows resumption or cleanup without losing state.

Common gotcha

Developers often forget that interrupt_before pauses execution *before* the node runs, not after. If you need to cancel *during* a long node execution (like a 30-second LLM call), you need async cancellation with CancelledError handling, not just interrupt points. Additionally, cleanup nodes only run if they're explicitly added to the graph: interruption alone doesn't trigger cleanup. You must wire cleanup nodes into your graph edges or use try/finally in your node functions.

Error recovery

RuntimeError: Interrupt point not found
You specified a node name in <code>interrupt_before</code> or <code>interrupt_after</code> that doesn't exist in the graph. Check node names match exactly (case-sensitive).
StateSnapshot is None
The graph was invoked without a valid <code>configurable</code> config containing a <code>thread_id</code>. Checkpointing requires: <code>config={'configurable': {'thread_id': 'unique_id'}}</code>. Without it, the graph has no resumable state.
Task was destroyed but it is pending
An async operation was cancelled while still running. Wrap long I/O in try/finally or use asyncio.CancelledError to catch the cancellation signal and release resources immediately.
KeyError accessing state in cleanup
Your cleanup node assumes keys exist in state that might not be present if cancellation happened early. Always use <code>state.get('key', default)</code> in cleanup handlers.

Experienced dev note

Cancellation is not just about stopping: it's about *safe* stopping. In production, a cancelled graph that leaves a database transaction open or an S3 upload connection hanging will cause cascading failures (connection pool exhaustion, orphaned resources, billing surprises). The real power is pairing interrupt_before with human-in-the-loop validation: pause before the expensive operation, let a human or a policy engine decide whether to proceed, then either resume or route to a cleanup path. This is how you build systems that fail safely rather than fail loud.

Check your understanding

You have a graph that calls an external API in node_query, processes results in node_analyze, and persists to a database in node_save. A user cancels the request after node_query completes but before node_save. Where should you place interrupt_before to allow cleanup code to run before shutdown, and why does your choice matter?

Show answer hint

A correct answer identifies that <code>interrupt_before=['node_save']</code> allows the cleanup node to run after node_analyze completes, releasing the API response and any cached state before the graph stops. The key insight is that interruption happens *before* a node, so you place it at the last node you want to prevent, not the first one you want to cancel.

VERSION LangGraph 0.2.x introduced the interrupt_before and interrupt_after API as first-class primitives in StateGraph.compile(). Earlier 0.1.x versions required manual cancellation token handling. The checkpoint-based interruption model (requiring MemorySaver or similar) is stable in 0.2.x.
NEXT

Explore how to compose interruption with persistent memory using the checkpointing subsystem to pause, inspect, and conditionally resume graphs across process boundaries.

Community Notes

No notes yetBe the first to share a version-specific fix or tip.