Code Intermediate medium · 7 min

Streaming to a FastAPI StreamingResponse

What you will learn
Stream LanggGraph events directly to a FastAPI client using StreamingResponse for real-time, multi-event output.

Why this matters

Real applications need real-time feedback: chat apps, agents, and agentic workflows all need to stream partial results back to users while computation is still happening. FastAPI's StreamingResponse paired with LanggGraph's event iterator is the production pattern for this.

Skip if: Don't use streaming if your graph runs in <100ms consistently or if you need to validate/modify all outputs before sending: use a single synchronous endpoint with .invoke() instead. Also avoid streaming if the client is not prepared to handle chunked, event-driven responses (e.g., some legacy REST clients).

Explanation

What it is: LanggGraph's graph.stream() method yields events as they happen (node start, node end, tool calls, etc.). FastAPI's StreamingResponse wraps an async generator to send those events to the client in real time, allowing the browser or client to render intermediate results as they arrive.

How it works mechanically: When you call graph.stream(input, config), it returns an iterator that yields (event_type, data) tuples. You wrap this iterator in a function that formats each event (usually to JSON), then pass that generator function to StreamingResponse. FastAPI sends each yielded value immediately to the client without buffering the full response: the connection stays open, events flow one by one, and the response completes when the graph finishes.

When to use it: Use this for any workflow where intermediate steps are valuable to the user: agents thinking through a problem step-by-step, document processing that yields chunks, or long-running SQL queries where users want to see progress. The key is that the graph will produce multiple meaningful events before completion.

Analogy

Think of it like a sports commentator narrating a live game play-by-play. Instead of waiting for the entire game to finish and then telling you the story, they describe each action as it happens: touchdown, replay, penalty: and you see the narrative unfold in real time. The viewer (client) can react immediately to each event rather than waiting for silence (end of computation).

Code

python
import asyncio
import json
from typing import AsyncGenerator
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict, Annotated

app = FastAPI()

class State(TypedDict):
    messages: Annotated[list, add_messages]

def node_a(state: State) -> State:
    return {"messages": ["Node A processed"]}

def node_b(state: State) -> State:
    return {"messages": ["Node B processed"]}

def node_c(state: State) -> State:
    return {"messages": ["Node C processed"]}

graph = StateGraph(State)
graph.add_node("a", node_a)
graph.add_node("b", node_b)
graph.add_node("c", node_c)
graph.add_edge(START, "a")
graph.add_edge("a", "b")
graph.add_edge("b", "c")
graph.add_edge("c", END)
compiled_graph = graph.compile()

async def event_generator(input_data: dict) -> AsyncGenerator[str, None]:
    for event_type, event_data in compiled_graph.stream(input_data):
        formatted = json.dumps({
            "type": event_type,
            "node": event_data.get("metadata", {}).get("langgraph_node", "unknown") if isinstance(event_data, dict) else "unknown",
            "data": str(event_data)[:100]
        })
        yield formatted + "\n"
        await asyncio.sleep(0.01)

@app.get("/stream")
async def stream_endpoint():
    input_data = {"messages": ["start"]}
    return StreamingResponse(
        event_generator(input_data),
        media_type="application/x-ndjson"
    )

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)
Output
Server starts at http://127.0.0.1:8000. When you GET /stream, the response streams newline-delimited JSON:
{"type": "on_chain_start", "node": "unknown", "data": "{'messages': ['start']}"}
{"type": "on_chain_stream", "node": "unknown", "data": "{'messages': ['start', 'Node A processed']}"}
{"type": "on_chain_stream", "node": "unknown", "data": "{'messages': ['start', 'Node A processed', 'Node B processed']}"}
{"type": "on_chain_stream", "node": "unknown", "data": "{'messages': ['start', 'Node A processed', 'Node B processed', 'Node C processed']}"}
{"type": "on_chain_end", "node": "unknown", "data": "{'messages': ['start', 'Node A processed', 'Node B processed', 'Node C processed']}"}
(Each event arrives within ~10-50ms of the previous, not all at once)

What just happened?

The graph created three sequential nodes and compiled them. When the /stream endpoint is called, it invokes <code>compiled_graph.stream()</code> which yields events as each node executes. The <code>event_generator()</code> function transforms each event into JSON and yields it. FastAPI's <code>StreamingResponse</code> sends each yielded line to the client immediately with media type <code>application/x-ndjson</code> (newline-delimited JSON). The client receives events in real time rather than waiting for the full computation to finish.

Common gotcha

The most common mistake is forgetting to await asyncio.sleep() or any async operation in the generator. If your generator is purely synchronous (which graph.stream() is), FastAPI will block the entire server thread while it runs: all other requests hang. Always wrap synchronous iteration in an async context using asyncio.sleep(0.01) or better yet, use graph.astream() if your nodes support async. Second gotcha: forgetting that StreamingResponse sends immediately: any unhandled exception mid-stream will disconnect the client without proper error signaling.

Error recovery

ConnectionClosedError or partial response
Your client disconnected before the graph finished. This is normal in production (user closed browser). Wrap the generator in try/except and log incomplete execution, don't crash the server.
All events arrive at once instead of streaming
You're using synchronous <code>graph.stream()</code> in a blocking way. Add <code>await asyncio.sleep(0.01)</code> between yields to yield control back to the event loop, or use <code>graph.astream()</code> if available and your nodes are async.
Empty events or missing metadata
The event_data structure varies by event type. Some events contain node metadata, others contain full state. Defensively check for key existence with <code>.get()</code> and have a fallback for unknown event shapes.
Client receives garbled JSON
Ensure your event_generator yields valid JSON lines. If you're yielding complex Python objects, convert them to strings or dict first with <code>json.dumps()</code> and handle non-JSON-serializable objects by converting to str.

Experienced dev note

In production, you'll want middleware to handle backpressure: slow clients consuming slower than your graph produces will cause memory to balloon. Consider adding a bounded queue between the graph iterator and the response generator, and gracefully drop events if the queue fills. Also, FastAPI's StreamingResponse doesn't automatically reconnect on network failure; clients need to implement their own retry logic (fetch API with manual reconnection or WebSocket for true bidirectional streaming). For truly production-grade streaming, consider WebSocket instead for better error handling and client-initiated reconnection.

Check your understanding

If the client closes the connection (refreshes the page) halfway through graph execution, what happens to the graph itself? Will it continue running, stop immediately, or get killed?

Show answer hint

The graph continues running to completion by default because the generator is started but the iteration stops; whether it fully runs depends on your checkpointing and graph state. The key insight is that <code>graph.stream()</code> is lazy: it yields on-demand, so if the client stops consuming, the generator pauses but the underlying graph thread/task may still run. This is a resource leak to watch for.

VERSION LanggGraph 0.2.x stabilized the stream() API and event structure. In 0.1.x, event types and metadata formats differed significantly. Always pin langgraph>=0.2.0 for this pattern.
NEXT

Learn how to stream graph execution with <code>astream()</code> to avoid blocking the event loop and handle truly async node operations.

Community Notes

No notes yetBe the first to share a version-specific fix or tip.