REST API surface: invoke, stream, thread management
Why this matters
In production, you don't call your graph directly from Python: you expose it as an HTTP service so front-ends, mobile apps, and external systems can maintain conversation state across multiple requests without rebuilding the entire graph each time.
Explanation
LangGraph compiles graphs to runnable objects, but production systems need HTTP endpoints that can handle multiple concurrent clients, each maintaining separate conversation threads. The REST API surface exposes your graph through LangServe (or a custom HTTP server) with three key operations: invoke (single synchronous call), stream (yielding tokens/events as they flow through the graph), and thread_id management (routing subsequent messages to the same conversation context).
Mechanically, when a request arrives with a thread_id, the API retrieves the saved state for that conversation from a checkpoint backend (memory, PostgreSQL, etc.), feeds the new input into the graph, executes the nodes, and checkpoints the updated state. Streaming works by hooking into the graph's event loop and flushing partial results (intermediate node outputs or LLM token yields) as they complete, rather than buffering until the entire graph finishes.
Use this when you're building a production chatbot, multi-turn agent, or interactive system where clients need to maintain context across requests without managing state themselves.
Analogy
Think of it like a bank teller system: your graph is the teller (decision logic), <code>invoke</code> is a single transaction, <code>stream</code> is updating the customer in real-time as the transaction processes, and <code>thread_id</code> is the account number: the same customer can make multiple transactions, and the bank knows which account to update each time.
Code
import json
from typing import Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.state import BaseModel
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
class ConversationState(BaseModel):
messages: Annotated[list[BaseMessage], lambda x: x[-10:]] if hasattr(Annotated, '__class__') else list[BaseMessage]
class ConversationState(BaseModel):
messages: list[BaseMessage]
def chat_node(state: ConversationState) -> dict:
"""Simulate an LLM response based on message history."""
last_message = state["messages"][-1].content
response = f"Echo: {last_message}"
return {"messages": [AIMessage(content=response)]}
graph = StateGraph(ConversationState)
graph.add_node("chat", chat_node)
graph.add_edge(START, "chat")
graph.add_edge("chat", END)
checkpointer = MemorySaver()
compiled_graph = graph.compile(checkpointer=checkpointer)
thread_id = "user-session-123"
input_1 = {"messages": [HumanMessage(content="Hello")]}
result_1 = compiled_graph.invoke(input_1, config={"configurable": {"thread_id": thread_id}})
print("After first invoke:")
print(f" Messages: {[m.content for m in result_1['messages']]}")
input_2 = {"messages": result_1["messages"] + [HumanMessage(content="How are you?")]}
result_2 = compiled_graph.invoke(input_2, config={"configurable": {"thread_id": thread_id}})
print("\nAfter second invoke (same thread):")
print(f" Messages: {[m.content for m in result_2['messages']]}")
print("\n--- Streaming example ---")
for event in compiled_graph.stream(
{"messages": [HumanMessage(content="Stream test")]},
config={"configurable": {"thread_id": "stream-thread-456"}},
stream_mode="values"
):
print(f"Event: {[m.content for m in event.get('messages', [])]}") After first invoke: Messages: ['Hello', 'Echo: Hello'] After second invoke (same thread): Messages: ['Hello', 'Echo: Hello', 'How are you?', 'Echo: How are you?'] --- Streaming example --- Event: ['Stream test', 'Echo: Stream test']
What just happened?
The code compiled a graph with a <code>MemorySaver</code> checkpointer, then invoked it twice with the same <code>thread_id</code>. The first invoke accepted a single message, the chat node echoed it, and the result was checkpointed. The second invoke received the full history (old messages + new) and produced a new response, preserving the thread state. The streaming example used <code>stream_mode="values"</code> to emit intermediate state snapshots as the graph executed.
Common gotcha
Developers often forget that the input to the second invoke must include the entire message history, not just the new message. If you only pass the latest user message without prior context, the graph will lose memory and the checkpoint is useless. Always feed previous_result["messages"] + [new_message] as input to the next invoke on the same thread.
Error recovery
KeyError: 'configurable'TypeError: unhashable type 'dict' for thread_idAttributeError: 'NoneType' has no attribute 'messages'Experienced dev note
In production, never use MemorySaver(): it's in-process and lost on restart. Swap it for PostgresSaver(connection_string) or SqliteSaver(db_path) and your conversation threads will survive deployments. Also, if you're exposing this via HTTP (e.g., using LangServe), the framework handles the config dict automatically from query params: you only need to worry about building the graph correctly. Finally, streaming is CPU-bound on token yields; batch processes of many threads should use invoke instead to avoid context-switch overhead.
Check your understanding
If a user sends message A, the API streams a response token-by-token. Mid-stream, the connection drops. They reconnect and send message B with the same thread_id. What messages does the graph see as input, and why?
Show answer hint
The graph sees [A, response_to_A, B] because the checkpoint saved the state after the full response completed (not the partial one), even though the stream was interrupted. This is why checkpointing happens after node execution finishes, not during streaming.
get_tuple() and put_tuple(). Version 0.2.0+ uses get(), put(), and a different checkpoint tuple format. Do not mix old and new APIs.