Client-side event consumption patterns
Why this matters
In production, you often need to send updates to a frontend or external system as the graph executes: not just the final result. Event consumption lets you react to intermediate states, node executions, and errors as they happen.
Explanation
Event consumption is the pattern of iterating over events emitted by a compiled LangGraph graph as it runs. When you call graph.stream() or graph.astream(), instead of blocking until completion, you get back an iterator that yields events for each node execution, tool call, and state update.
Mechanically, after compiling your graph with .compile(), you invoke it with .stream(input) which returns an iterator. Each iteration yields a dictionary containing the node that executed, the output it produced, and the current state. This allows you to process intermediate results, stream them to a client, or halt execution based on what you observe. The key difference from .invoke() is that .invoke() blocks and returns only the final state, while .stream() yields partial states as the graph progresses.
This pattern is essential for user-facing applications where waiting 30 seconds for a response feels broken, but streaming updates every 200ms feels responsive.
Analogy
Think of it like watching a build log. You don't wait for the entire Docker build to finish and then see the output: you stream the log lines as they're generated so you know what's happening in real-time. If a layer fails at step 3, you know immediately instead of after waiting for steps 4–10 to complete.
Code
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict
class State(TypedDict):
count: int
messages: list
def increment_node(state: State) -> State:
return {"count": state["count"] + 1, "messages": state["messages"] + ["incremented"]}
def double_node(state: State) -> State:
return {"count": state["count"] * 2, "messages": state["messages"] + ["doubled"]}
def check_done(state: State) -> str:
if state["count"] >= 10:
return "done"
return "continue"
graph = StateGraph(State)
graph.add_node("increment", increment_node)
graph.add_node("double", double_node)
graph.add_edge(START, "increment")
graph.add_edge("increment", "double")
graph.add_conditional_edges("double", check_done, {"continue": "increment", "done": END})
compiled = graph.compile(checkpointer=MemorySaver())
initial_state = {"count": 0, "messages": []}
print("=== Streaming events ===")
for event in compiled.stream(initial_state, config={"configurable": {"thread_id": "test-1"}}):
for node_name, output in event.items():
print(f"Node '{node_name}' output: count={output['count']}, messages={output['messages']}")
print("\n=== Final state via invoke ===")
final = compiled.invoke(initial_state, config={"configurable": {"thread_id": "test-2"}})
print(f"Final count: {final['count']}, Final messages: {final['messages']}") === Streaming events === Node 'increment' output: count=1, messages=['incremented'] Node 'double' output: count=2, messages=['incremented', 'doubled'] Node 'increment' output: count=3, messages=['incremented', 'doubled', 'incremented'] Node 'double' output: count=6, messages=['incremented', 'doubled', 'incremented', 'doubled'] Node 'increment' output: count=7, messages=['incremented', 'doubled', 'incremented', 'doubled', 'incremented'] Node 'double' output: count=14, messages=['incremented', 'doubled', 'incremented', 'doubled', 'incremented', 'doubled'] === Final state via invoke === Final count: 14, Final messages=['incremented', 'doubled', 'incremented', 'doubled', 'incremented', 'doubled']
What just happened?
The graph executed a loop: increment → double → check (continue or done). Each time a node executed, <code>.stream()</code> yielded an event containing that node's name and its output state. We printed each intermediate result, so you saw the count grow from 0→1→2→3→6→7→14 in real-time as nodes ran. The second call using <code>.invoke()</code> ran the same graph but blocked until completion and returned only the final state.
Common gotcha
Developers often forget that .stream() returns an iterator of dictionaries where the key is the node name, not a flat list of events. If you try to unpack it as `for output in compiled.stream(state)`, you'll get a dictionary, not the values. You must iterate and destructure like `for event in compiled.stream(state): for node_name, node_output in event.items()`. Also, streaming still requires you to consume the entire iterator for the graph to finish: it doesn't run in the background.
Error recovery
RuntimeError: Graph not compiledKeyError when unpacking eventGenerator didn't finishExperienced dev note
In production, wrap your stream consumption in a try/except and send errors back to the client in real-time, not as a final response. Many teams only check for errors after the full stream completes, missing the chance to fail fast. Also, if you're streaming to a WebSocket or HTTP response, always add a timeout or heartbeat: if a node hangs, the client won't know. Finally, stream events are NOT persisted by default; if you need to replay them, you need to explicitly log them before yielding to the client.
Check your understanding
Why would you use .stream() in a production REST API instead of just calling .invoke()? What would the client experience be different?
Show answer hint
A correct answer explains that .stream() allows you to send intermediate updates to the client as the graph runs (reducing perceived latency and allowing cancellation), while .invoke() blocks the entire time and only sends a response when done. The difference is responsiveness and the ability to act on intermediate state or errors.