astream(): async execution
Why this matters
When building web apps, chatbots, or any I/O-bound workflow, blocking on a full graph execution wastes resources. astream() lets you start streaming results immediately while the graph is still running, which improves perceived performance and lets you handle concurrent requests without creating thread pools.
Explanation
astream() is the asynchronous streaming variant of a compiled LangGraph graph. Instead of calling invoke() which blocks until the entire graph finishes, astream() returns an async iterator that yields output after each node executes. This is crucial for applications where you want to display partial results (like streaming LLM tokens) or handle multiple requests concurrently without blocking threads. Mechanically, astream() returns an async generator; you consume it with async for in an async function. Each iteration gives you the current state snapshot, allowing you to react to intermediate results in real-time.
Analogy
Think of invoke() as ordering food and waiting at the counter until it's completely ready. astream() is like a sushi conveyor belt: plates arrive one at a time, and you start eating while new dishes keep coming. You don't wait for the entire meal to finish preparation.
Code
import asyncio
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class State(TypedDict):
count: int
message: str
def node_a(state: State) -> State:
return {"count": state["count"] + 1, "message": f"Node A: count={state['count'] + 1}"}
def node_b(state: State) -> State:
return {"count": state["count"] + 10, "message": f"Node B: count={state['count'] + 10}"}
def node_c(state: State) -> State:
return {"count": state["count"] * 2, "message": f"Node C: count={state['count'] * 2}"}
graph = StateGraph(State)
graph.add_node("node_a", node_a)
graph.add_node("node_b", node_b)
graph.add_node("node_c", node_c)
graph.add_edge(START, "node_a")
graph.add_edge("node_a", "node_b")
graph.add_edge("node_b", "node_c")
graph.add_edge("node_c", END)
compiled_graph = graph.compile()
async def run_streaming():
initial_state = {"count": 0, "message": "start"}
async for event in compiled_graph.astream(initial_state):
node_name = list(event.keys())[0]
node_state = event[node_name]
print(f"→ {node_name}: {node_state['message']}")
asyncio.run(run_streaming()) → node_a: Node A: count=1 → node_b: Node B: count=11 → node_c: Node C: count=22
What just happened?
The compiled graph executed asynchronously using astream(). After each node completed, the event loop yielded a dictionary with that node's name as the key and its output state as the value. The async for loop consumed these events one at a time, printing each node's result immediately rather than waiting for the graph to finish completely.
Common gotcha
astream() returns a dictionary where the key is the node name that just completed, not a label like 'output' or 'result'. New developers often expect a flat structure and iterate wrong. Also: you must call astream() inside an async function or wrap it with asyncio.run(): calling it in a synchronous context will raise RuntimeError.
Error recovery
RuntimeError: asyncio.run() cannot be called from a running event loopTypeError: 'async_generator' object is not subscriptableKeyError when accessing node stateExperienced dev note
astream() is critical for web frameworks (FastAPI, Django). Return an event stream and send chunks to the client as they arrive using Server-Sent Events (SSE) or WebSockets. This creates responsive UX without blocking the server. Also: astream() respects the same checkpointing config as invoke(), so if you set up MemorySaver, it works transparently: but be aware that checkpointing still happens synchronously, so don't assume true lock-free concurrency with thread-unsafe state stores.
Check your understanding
Why would using astream() instead of invoke() improve a web application's ability to handle multiple concurrent requests, even if the graph takes the same wall-clock time to finish?
Show answer hint
A correct answer explains that astream() doesn't block the async event loop while waiting for graph execution. Each request can yield control back to the event loop between node completions, allowing other requests to make progress. invoke() blocks the entire thread, preventing the event loop from handling other tasks.