Streaming to a FastAPI StreamingResponse
Why this matters
Real applications need real-time feedback: chat apps, agents, and agentic workflows all need to stream partial results back to users while computation is still happening. FastAPI's StreamingResponse paired with LanggGraph's event iterator is the production pattern for this.
Explanation
What it is: LanggGraph's graph.stream() method yields events as they happen (node start, node end, tool calls, etc.). FastAPI's StreamingResponse wraps an async generator to send those events to the client in real time, allowing the browser or client to render intermediate results as they arrive.
How it works mechanically: When you call graph.stream(input, config), it returns an iterator that yields (event_type, data) tuples. You wrap this iterator in a function that formats each event (usually to JSON), then pass that generator function to StreamingResponse. FastAPI sends each yielded value immediately to the client without buffering the full response: the connection stays open, events flow one by one, and the response completes when the graph finishes.
When to use it: Use this for any workflow where intermediate steps are valuable to the user: agents thinking through a problem step-by-step, document processing that yields chunks, or long-running SQL queries where users want to see progress. The key is that the graph will produce multiple meaningful events before completion.
Analogy
Think of it like a sports commentator narrating a live game play-by-play. Instead of waiting for the entire game to finish and then telling you the story, they describe each action as it happens: touchdown, replay, penalty: and you see the narrative unfold in real time. The viewer (client) can react immediately to each event rather than waiting for silence (end of computation).
Code
import asyncio
import json
from typing import AsyncGenerator
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict, Annotated
app = FastAPI()
class State(TypedDict):
messages: Annotated[list, add_messages]
def node_a(state: State) -> State:
return {"messages": ["Node A processed"]}
def node_b(state: State) -> State:
return {"messages": ["Node B processed"]}
def node_c(state: State) -> State:
return {"messages": ["Node C processed"]}
graph = StateGraph(State)
graph.add_node("a", node_a)
graph.add_node("b", node_b)
graph.add_node("c", node_c)
graph.add_edge(START, "a")
graph.add_edge("a", "b")
graph.add_edge("b", "c")
graph.add_edge("c", END)
compiled_graph = graph.compile()
async def event_generator(input_data: dict) -> AsyncGenerator[str, None]:
for event_type, event_data in compiled_graph.stream(input_data):
formatted = json.dumps({
"type": event_type,
"node": event_data.get("metadata", {}).get("langgraph_node", "unknown") if isinstance(event_data, dict) else "unknown",
"data": str(event_data)[:100]
})
yield formatted + "\n"
await asyncio.sleep(0.01)
@app.get("/stream")
async def stream_endpoint():
input_data = {"messages": ["start"]}
return StreamingResponse(
event_generator(input_data),
media_type="application/x-ndjson"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000) Server starts at http://127.0.0.1:8000. When you GET /stream, the response streams newline-delimited JSON:
{"type": "on_chain_start", "node": "unknown", "data": "{'messages': ['start']}"}
{"type": "on_chain_stream", "node": "unknown", "data": "{'messages': ['start', 'Node A processed']}"}
{"type": "on_chain_stream", "node": "unknown", "data": "{'messages': ['start', 'Node A processed', 'Node B processed']}"}
{"type": "on_chain_stream", "node": "unknown", "data": "{'messages': ['start', 'Node A processed', 'Node B processed', 'Node C processed']}"}
{"type": "on_chain_end", "node": "unknown", "data": "{'messages': ['start', 'Node A processed', 'Node B processed', 'Node C processed']}"}
(Each event arrives within ~10-50ms of the previous, not all at once) What just happened?
The graph created three sequential nodes and compiled them. When the /stream endpoint is called, it invokes <code>compiled_graph.stream()</code> which yields events as each node executes. The <code>event_generator()</code> function transforms each event into JSON and yields it. FastAPI's <code>StreamingResponse</code> sends each yielded line to the client immediately with media type <code>application/x-ndjson</code> (newline-delimited JSON). The client receives events in real time rather than waiting for the full computation to finish.
Common gotcha
The most common mistake is forgetting to await asyncio.sleep() or any async operation in the generator. If your generator is purely synchronous (which graph.stream() is), FastAPI will block the entire server thread while it runs: all other requests hang. Always wrap synchronous iteration in an async context using asyncio.sleep(0.01) or better yet, use graph.astream() if your nodes support async. Second gotcha: forgetting that StreamingResponse sends immediately: any unhandled exception mid-stream will disconnect the client without proper error signaling.
Error recovery
ConnectionClosedError or partial responseAll events arrive at once instead of streamingEmpty events or missing metadataClient receives garbled JSONExperienced dev note
In production, you'll want middleware to handle backpressure: slow clients consuming slower than your graph produces will cause memory to balloon. Consider adding a bounded queue between the graph iterator and the response generator, and gracefully drop events if the queue fills. Also, FastAPI's StreamingResponse doesn't automatically reconnect on network failure; clients need to implement their own retry logic (fetch API with manual reconnection or WebSocket for true bidirectional streaming). For truly production-grade streaming, consider WebSocket instead for better error handling and client-initiated reconnection.
Check your understanding
If the client closes the connection (refreshes the page) halfway through graph execution, what happens to the graph itself? Will it continue running, stop immediately, or get killed?
Show answer hint
The graph continues running to completion by default because the generator is started but the iteration stops; whether it fully runs depends on your checkpointing and graph state. The key insight is that <code>graph.stream()</code> is lazy: it yields on-demand, so if the client stops consuming, the generator pauses but the underlying graph thread/task may still run. This is a resource leak to watch for.
stream() API and event structure. In 0.1.x, event types and metadata formats differed significantly. Always pin langgraph>=0.2.0 for this pattern.