Code Beginner easy · 5 min

astream(): async execution

What you will learn
Use astream() to run a compiled graph asynchronously and receive results one chunk at a time.

Why this matters

When building web apps, chatbots, or any I/O-bound workflow, blocking on a full graph execution wastes resources. astream() lets you start streaming results immediately while the graph is still running, which improves perceived performance and lets you handle concurrent requests without creating thread pools.

Skip if: Don't use astream() if your graph runs instantly (< 100ms) or if you need the complete final state before proceeding. For CPU-bound workloads without I/O, the overhead of async mechanics may not be worth it: use invoke() instead.

Explanation

astream() is the asynchronous streaming variant of a compiled LangGraph graph. Instead of calling invoke() which blocks until the entire graph finishes, astream() returns an async iterator that yields output after each node executes. This is crucial for applications where you want to display partial results (like streaming LLM tokens) or handle multiple requests concurrently without blocking threads. Mechanically, astream() returns an async generator; you consume it with async for in an async function. Each iteration gives you the current state snapshot, allowing you to react to intermediate results in real-time.

Analogy

Think of invoke() as ordering food and waiting at the counter until it's completely ready. astream() is like a sushi conveyor belt: plates arrive one at a time, and you start eating while new dishes keep coming. You don't wait for the entire meal to finish preparation.

Code

python
import asyncio
from langgraph.graph import StateGraph, START, END
from typing import TypedDict

class State(TypedDict):
    count: int
    message: str

def node_a(state: State) -> State:
    return {"count": state["count"] + 1, "message": f"Node A: count={state['count'] + 1}"}

def node_b(state: State) -> State:
    return {"count": state["count"] + 10, "message": f"Node B: count={state['count'] + 10}"}

def node_c(state: State) -> State:
    return {"count": state["count"] * 2, "message": f"Node C: count={state['count'] * 2}"}

graph = StateGraph(State)
graph.add_node("node_a", node_a)
graph.add_node("node_b", node_b)
graph.add_node("node_c", node_c)

graph.add_edge(START, "node_a")
graph.add_edge("node_a", "node_b")
graph.add_edge("node_b", "node_c")
graph.add_edge("node_c", END)

compiled_graph = graph.compile()

async def run_streaming():
    initial_state = {"count": 0, "message": "start"}
    async for event in compiled_graph.astream(initial_state):
        node_name = list(event.keys())[0]
        node_state = event[node_name]
        print(f"→ {node_name}: {node_state['message']}")

asyncio.run(run_streaming())
Output
→ node_a: Node A: count=1
→ node_b: Node B: count=11
→ node_c: Node C: count=22

What just happened?

The compiled graph executed asynchronously using astream(). After each node completed, the event loop yielded a dictionary with that node's name as the key and its output state as the value. The async for loop consumed these events one at a time, printing each node's result immediately rather than waiting for the graph to finish completely.

Common gotcha

astream() returns a dictionary where the key is the node name that just completed, not a label like 'output' or 'result'. New developers often expect a flat structure and iterate wrong. Also: you must call astream() inside an async function or wrap it with asyncio.run(): calling it in a synchronous context will raise RuntimeError.

Error recovery

RuntimeError: asyncio.run() cannot be called from a running event loop
You're already inside an async context (e.g., inside an async function or Jupyter notebook with async kernel). Remove asyncio.run() wrapper and just await the generator directly.
TypeError: 'async_generator' object is not subscriptable
You're treating the astream() result as a list. Use 'async for' to iterate, not indexing. astream() returns an async generator, not a list.
KeyError when accessing node state
You're assuming a fixed key name. The key in each event is the node name that just completed. Use list(event.keys())[0] to get the actual node name dynamically.

Experienced dev note

astream() is critical for web frameworks (FastAPI, Django). Return an event stream and send chunks to the client as they arrive using Server-Sent Events (SSE) or WebSockets. This creates responsive UX without blocking the server. Also: astream() respects the same checkpointing config as invoke(), so if you set up MemorySaver, it works transparently: but be aware that checkpointing still happens synchronously, so don't assume true lock-free concurrency with thread-unsafe state stores.

Check your understanding

Why would using astream() instead of invoke() improve a web application's ability to handle multiple concurrent requests, even if the graph takes the same wall-clock time to finish?

Show answer hint

A correct answer explains that astream() doesn't block the async event loop while waiting for graph execution. Each request can yield control back to the event loop between node completions, allowing other requests to make progress. invoke() blocks the entire thread, preventing the event loop from handling other tasks.

VERSION In langgraph < 0.2.0, event iteration returned tuples like ('node_name', state). In 0.2.x+, it returns dictionaries like {'node_name': state}. Update your event unpacking code when upgrading.
NEXT

Next, learn how to handle streaming output from nodes that themselves produce multiple outputs (like LLMs generating tokens), using the <code>StreamWriter</code> API in langgraph.

Community Notes

No notes yetBe the first to share a version-specific fix or tip.