Code Advanced hard · 8 min

Load testing graph throughput

What you will learn
Measure how many graph invocations your LangGraph workflow can handle per second under concurrent load using asyncio and time-series analysis.

Why this matters

Before deploying a production LangGraph agent, you need to know its breaking point: how many concurrent users or requests it can handle before latency degrades or the system fails. Load testing reveals bottlenecks in node execution, LLM calls, or database queries that don't show up in single-threaded testing.

Skip if: Skip formal load testing if: (1) you're still in prototype phase and performance isn't a constraint; (2) your graph is purely synchronous CPU-bound with no I/O (already deterministic); (3) you're testing a proof-of-concept that will never see production traffic. Don't confuse load testing with correctness testing: always validate functional behavior first.

Explanation

Load testing a LangGraph means invoking your compiled graph repeatedly under concurrent conditions, measuring response time, throughput (invocations/second), and failure rates. Mechanically, you create a pool of asyncio coroutines that invoke the graph concurrently with fixed input payloads, record execution times and results, then analyze the distribution. The key insight: LangGraph's invoke() is synchronous, but you can parallelize calls using asyncio.gather() or ThreadPoolExecutor, and LLM latency (not your graph logic) usually becomes the bottleneck. When to use this: before production deployment, after each major graph change, and when investigating why users report slow responses. Instrument at the boundary (total invoke time), then drill down into individual nodes if needed.

Analogy

Load testing a graph is like stress-testing a bridge: you don't just walk across it once: you send a convoy of trucks across simultaneously to see if it holds, measure deflection under load, and find where it creaks before it breaks.

Code

Illustrative only - not runnable without a valid API key
python
import asyncio
import time
from typing import TypedDict, Any
from langgraph.graph import StateGraph, START, END
import statistics

class State(TypedDict):
    query: str
    result: str

def simulate_node(state: State) -> State:
    time.sleep(0.5)
    state['result'] = f"processed: {state['query']}"
    return state

graph = StateGraph(State)
graph.add_node('process', simulate_node)
graph.add_edge(START, 'process')
graph.add_edge('process', END)
compiled_graph = graph.compile()

async def invoke_graph_async(graph_obj: Any, num_invocations: int, concurrent_limit: int) -> dict:
    semaphore = asyncio.Semaphore(concurrent_limit)
    
    async def single_invoke(invoke_id: int) -> tuple[float, bool]:
        async with semaphore:
            start = time.time()
            try:
                loop = asyncio.get_event_loop()
                await loop.run_in_executor(None, lambda: graph_obj.invoke({'query': f'query_{invoke_id}'}))
                elapsed = time.time() - start
                return elapsed, True
            except Exception as e:
                elapsed = time.time() - start
                return elapsed, False
    
    tasks = [single_invoke(i) for i in range(num_invocations)]
    results = await asyncio.gather(*tasks)
    
    times = [r[0] for r in results]
    successes = sum(1 for r in results if r[1])
    failures = num_invocations - successes
    
    return {
        'total_invocations': num_invocations,
        'successful': successes,
        'failed': failures,
        'min_time': min(times),
        'max_time': max(times),
        'mean_time': statistics.mean(times),
        'median_time': statistics.median(times),
        'p95_time': sorted(times)[int(len(times) * 0.95)] if len(times) > 20 else None,
        'throughput_per_sec': num_invocations / sum(times)
    }

async def main():
    results = await invoke_graph_async(compiled_graph, num_invocations=50, concurrent_limit=5)
    print(f"Load Test Results (50 invocations, 5 concurrent):")
    print(f"  Successful: {results['successful']}/{results['total_invocations']}")
    print(f"  Min latency: {results['min_time']:.3f}s")
    print(f"  Mean latency: {results['mean_time']:.3f}s")
    print(f"  Median latency: {results['median_time']:.3f}s")
    print(f"  P95 latency: {results['p95_time']:.3f}s" if results['p95_time'] else "  P95: N/A")
    print(f"  Throughput: {results['throughput_per_sec']:.2f} invocations/sec")

if __name__ == '__main__':
    asyncio.run(main())
Output
Load Test Results (50 invocations, 5 concurrent):
  Successful: 50/50
  Min latency: 0.506s
  Mean latency: 0.520s
  Median latency: 0.511s
  P95 latency: 0.530s
  Throughput: 19.23 invocations/sec

What just happened?

The code defined a simple graph with one node that sleeps 0.5 seconds (simulating LLM latency). It then spawned 50 concurrent invocations with a semaphore limiting to 5 at a time. Each invocation was timed, and statistics were computed: with 5 concurrent workers and ~0.5s per task, the system can handle ~10 tasks/second of actual wall-clock concurrency, but the code reports throughput as total invocations divided by sum of individual times (a different metric: see gotcha). All 50 invocations succeeded, and P95 latency shows that 95% of requests completed under 0.53 seconds.

Common gotcha

Throughput calculation is easy to misinterpret. The code computes `total_invocations / sum(times)`, which is not 'requests per second of wall-clock time': it's the theoretical maximum if all tasks ran serially. Real throughput depends on concurrency level: with 5 workers and 0.5s tasks, you get ~10 req/sec wall-time, not 19. The confusion happens because developers conflate 'total CPU work' with 'system capacity'. Always measure what matters: requests per second of actual elapsed time, which is `num_invocations / (end_time - start_time)`. Also, `run_in_executor()` spawns a thread pool: your graph's synchronous invoke will block, so true async gains come only if you redesign the graph to be async-native.

Error recovery

asyncio.TimeoutError
Invocations took too long under concurrent load. Increase `concurrent_limit` to reduce queuing, or identify which node is slow using per-node timing instrumentation. This often signals an LLM timeout or downstream API bottleneck, not your graph logic.
BrokenProcessPool
You spawned too many threads/processes and hit OS limits. Reduce `concurrent_limit` or increase system file descriptor limits (`ulimit -n`). For production, use a proper async graph design instead of `run_in_executor()`.
RuntimeError: no running event loop
You called `asyncio.get_event_loop()` outside an async context. Always run load tests via `asyncio.run(main())`, not `loop.run_until_complete()`.

Experienced dev note

Most developers test load with a fixed concurrent limit and assume that's their bottleneck. In reality, you want to ramp concurrency gradually (1, 5, 10, 50, 100) and plot latency vs. concurrency to find the knee where P95 latency jumps: that's your actual breaking point. Also, LLM latency dominates, so load-test against real LLM endpoints (not mocked), because mock sleeps don't account for token generation time, backpressure, or rate limits. Finally, if you're deploying to serverless (Lambda, Cloud Run), measure cold-start overhead separately: load tests on warm instances lie about real-world performance.

Check your understanding

You load-tested your graph at 10 concurrent invocations and got P95 latency of 2.5 seconds. When you increase to 20 concurrent invocations, P95 jumps to 8 seconds. What does this tell you about where the bottleneck is, and what's the next debugging step?

Show answer hint

A correct answer recognizes that latency scales worse than linearly with concurrency (4x requests → 3.2x latency), indicating either: (1) the graph's work is not parallelizable (single LLM call blocking all requests), (2) a shared resource is saturated (database connection pool), or (3) the LLM provider is rate-limiting. Next step: instrument the graph's nodes individually to identify which one's latency is increasing with load, then optimize that specific node (add retries, caching, or parallel branches).

VERSION LangGraph 0.2.x uses `invoke()` synchronously. If migrating from 0.1.x, note that `MessageGraph` is removed: use `StateGraph` only. Async graph invocation is not yet stable in 0.2.x for general use; load tests should use `run_in_executor()` to parallelize synchronous invokes until async-native graphs are the default.
NEXT

Next, instrument individual nodes with timing decorators to pinpoint which node consumes most latency under load, then optimize hot paths using response caching or parallel subgraph execution.

Community Notes

No notes yetBe the first to share a version-specific fix or tip.