Code Intermediate medium · 7 min

astream_events(): granular event types

What you will learn
Stream individual execution events (node runs, tool calls, LLM outputs) from a langgraph graph to build real-time, fine-grained UI updates or logging.

Why this matters

Most applications need to show users what's happening in real-time: which node is running, what tool was called, what the LLM returned. <code>astream_events()</code> is the canonical way to tap into those granular signals without blocking, enabling responsive streaming UI, detailed audit trails, and multi-step debugging.

Skip if: Do not use <code>astream_events()</code> if you only care about the final result and have no users waiting. Use <code>invoke()</code> instead: it's simpler and faster. Also skip <code>astream_events()</code> if you need high-frequency telemetry (>1000 events/sec) where the overhead of event emission becomes a bottleneck; use internal graph callbacks or custom instrumentation instead.

Explanation

What it is: astream_events() is an async method on a compiled langgraph graph that emits a stream of fine-grained execution events as the graph runs. Instead of waiting for the full result, you get real-time notifications about node execution, tool calls, LLM outputs, and errors: each as a typed event object.

How it works mechanically: When you call astream_events(input, config), the graph runs in background while yielding StreamEvent objects to your async iterator. Each event has a type field (e.g., 'on_chain_start', 'on_tool_call', 'on_chain_end') and structured data (event, data, metadata). You filter and handle events in real-time: useful for streaming text to a UI, logging intermediate states, or canceling the graph early if a condition is met.

When to use it: Use astream_events() whenever the user or downstream system needs visibility into the execution pipeline in real-time. Common patterns: streaming chatbot responses token-by-token, displaying which tool the agent is calling, building a visual trace of node execution for debugging, or aggregating metrics across a multi-step workflow.

Analogy

Think of <code>astream_events()</code> like watching a factory assembly line. Instead of waiting for the finished product, you observe each station (node) as work passes through, notice when a specific machine (tool) runs, and see what it produces. You can react immediately: stop the line if something looks wrong, log what happened at each step, or show a live status board to observers.

Code

Illustrative only - not runnable without a valid API key
python
import asyncio
from langgraph.graph import StateGraph, START, END
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage
from typing import TypedDict


class State(TypedDict):
    messages: list


def llm_node(state: State) -> State:
    """Call Claude and return response."""
    client = ChatAnthropic(model="claude-3-5-sonnet-20241022")
    response = client.invoke(state["messages"])
    return {"messages": state["messages"] + [response]}


def tool_node(state: State) -> State:
    """Simulate a tool call."""
    return {"messages": state["messages"] + [{"role": "tool", "content": "Tool result"}]}


def should_continue(state: State) -> str:
    """Route based on message count."""
    if len(state["messages"]) >= 3:
        return END
    return "tool"


# Build graph
graph = StateGraph(State)
graph.add_node("llm", llm_node)
graph.add_node("tool", tool_node)
graph.add_edge(START, "llm")
graph.add_conditional_edges("llm", should_continue, {"tool": "tool", END: END})
graph.add_edge("tool", "llm")
compiled = graph.compile()


async def stream_with_event_types():
    """Stream events and filter by type."""
    input_state = {"messages": [HumanMessage(content="What is 2+2?")]}
    
    event_counts = {"on_chain_start": 0, "on_chain_end": 0, "on_chain_error": 0}
    
    async for event in compiled.astream_events(input_state, config={"configurable": {"thread_id": "test"}}):
        event_type = event.get("event")
        
        # Count event types
        if event_type in event_counts:
            event_counts[event_type] += 1
        
        # Log node transitions
        if event_type == "on_chain_start":
            node_name = event.get("metadata", {}).get("langgraph_node", "unknown")
            print(f"[START] Node: {node_name}")
        elif event_type == "on_chain_end":
            node_name = event.get("metadata", {}).get("langgraph_node", "unknown")
            print(f"[END] Node: {node_name}")
    
    print(f"\nEvent summary: {event_counts}")
    return event_counts


# Run it
result = asyncio.run(stream_with_event_types())
Output
[START] Node: llm
[END] Node: llm
[START] Node: tool
[END] Node: tool
[START] Node: llm
[END] Node: llm

Event summary: {'on_chain_start': 3, 'on_chain_end': 3, 'on_chain_error': 0}

What just happened?

The code defined a two-node graph (LLM → Tool → LLM) and compiled it. When <code>astream_events()</code> was called on the input state, the graph executed while emitting <code>on_chain_start</code> and <code>on_chain_end</code> events for each node invocation. The event stream was consumed in real-time, extracting the node name from metadata and printing it. The event counts were tallied as events arrived, not after execution finished.

Common gotcha

Developers often assume all events have a 'langgraph_node' key in metadata, but library nodes (like chain or tool calls) may emit events with different metadata structures. Always use .get() with a default and check the actual event shape in your first run, or you'll get a KeyError. Also, the config parameter (especially configurable) is required if your graph uses checkpointing; omitting it silently ignores checkpoint settings.

Error recovery

KeyError on metadata access
Event metadata may not have the key you expect. Always use <code>event.get('metadata', {}).get('langgraph_node', 'unknown')</code> instead of direct indexing.
astream_events() not found
Ensure <code>compiled = graph.compile()</code> was called. Only compiled graphs have <code>astream_events()</code>; uncompiled StateGraph objects do not.
No events yielded
Check that your graph has at least one node and the input state matches your State TypedDict. Also verify the graph is not stuck waiting for input: conditional edges returning invalid node names silently end execution.

Experienced dev note

Event streaming can create a false sense of real-time visibility. In production, remember that event emission itself has latency and overhead: under high load, batching events or sampling becomes necessary. Also, do not build critical logic on the presence of specific event types; different langgraph versions and configurations emit different subsets of events. Instead, hook onto on_chain_end with reliable data in the data field. Finally, if you're streaming to a user, always add a heartbeat or timeout mechanism: a hung LLM call won't emit events, and your UI will appear frozen.

Check your understanding

Your graph has an LLM node that calls an external API, and a tool node that processes the result. Why might you see 10 on_chain_start events but only 5 on_chain_end events in the event stream, and what should you check first?

Show answer hint

The key insight is that <code>on_chain_start</code> events include nested/internal chain calls (e.g., inside the ChatAnthropic client), while <code>on_chain_end</code> captures only the top-level node scope. Check the <code>metadata</code> and <code>parent_ids</code> fields to distinguish internal vs. graph-level events. Also verify your async loop didn't break early or get cancelled.

VERSION In langgraph < 0.2.0, astream_events() required version='v2' parameter; this is now the default and the parameter is ignored. Upgrading to 0.2.x may expose event structure differences if you were relying on the older v1 format.
NEXT

Now that you can stream individual events, learn how to use <code>stream(mode='values')</code> to get state snapshots at each step: a coarser but often simpler alternative for tracking node outputs without event-level granularity.

Community Notes

No notes yetBe the first to share a version-specific fix or tip.