Async memory with async session stores
Why this matters
Production systems with thousands of concurrent users cannot afford to block on synchronous database calls inside async chains. Async session stores let you build non-blocking memory layers that scale horizontally without losing message history.
Explanation
What it is: An async session store is a memory backend that implements BaseAsyncChatMessageHistory and performs all I/O operations (reads, writes, deletes) without blocking the event loop. It wraps your database or cache layer (Redis, PostgreSQL, DynamoDB) with async/await semantics.
How it works: When you build a chain with RunnableWithMessageHistory and pass an async store, LangChain checks for aget_messages() and aadd_messages() methods instead of synchronous variants. During chain.ainvoke(), the chain calls await store.aget_messages(session_id), which returns immediately to the event loop instead of blocking threads. You define a factory function that returns a new store instance per session: this prevents concurrent requests from sharing state.
When to use it: Use async session stores whenever your chain runs inside an async framework (FastAPI, Starlette, Quart) handling concurrent requests, or when you need to scale chat to multiple workers without shared memory. For simple Flask apps or batch processing, synchronous stores are fine.
Analogy
Think of sync session stores as a barista taking orders one at a time and making each drink before accepting the next order. Async session stores are a barista who takes an order, starts the espresso machine, then immediately moves to the next customer while the machine runs. Multiple customers get served without anyone waiting.
Code
import asyncio
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.output_parsers import StrOutputParser
from typing import List
class InMemoryAsyncMessageStore(BaseChatMessageHistory):
"""Async-compatible in-memory store for demonstration."""
session_id: str
messages: List[BaseMessage] = Field(default_factory=list)
class Config:
arbitrary_types_allowed = True
async def aget_messages(self) -> List[BaseMessage]:
"""Async retrieval — returns immediately without blocking."""
await asyncio.sleep(0.01)
return self.messages.copy()
async def aadd_messages(self, messages: List[BaseMessage]) -> None:
"""Async append — simulates I/O delay without blocking."""
await asyncio.sleep(0.01)
self.messages.extend(messages)
async def aclear(self) -> None:
"""Async clear operation."""
await asyncio.sleep(0.01)
self.messages.clear()
session_stores = {}
def get_session_store(session_id: str) -> InMemoryAsyncMessageStore:
"""Factory: creates or retrieves store per session.
Critical: each concurrent request gets its own store instance."""
if session_id not in session_stores:
session_stores[session_id] = InMemoryAsyncMessageStore(
session_id=session_id
)
return session_stores[session_id]
async def main():
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0
)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant. Keep answers under 50 words."),
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
])
chain = prompt | llm | StrOutputParser()
chain_with_history = RunnableWithMessageHistory(
chain,
get_session_store,
input_messages_key="input",
history_messages_key="history",
session_id_key="session_id"
)
session_id = "user_123"
response_1 = await chain_with_history.ainvoke(
{"input": "What is the capital of France?"},
config={"configurable": {"session_id": session_id}}
)
print(f"Response 1: {response_1}")
store = get_session_store(session_id)
messages = await store.aget_messages()
print(f"\nMessages in store after first call: {len(messages)}")
response_2 = await chain_with_history.ainvoke(
{"input": "What is its population?"},
config={"configurable": {"session_id": session_id}}
)
print(f"\nResponse 2: {response_2}")
messages = await store.aget_messages()
print(f"\nMessages in store after second call: {len(messages)}")
print(f"\nFinal conversation:")
for msg in messages:
print(f" {type(msg).__name__}: {msg.content[:60]}...")
asyncio.run(main()) Response 1: Paris is the capital of France. Messages in store after first call: 2 Response 2: Paris has a population of approximately 2.16 million in the city proper, and over 10 million in the metropolitan area. Messages in store after second call: 4 Final conversation: HumanMessage: What is the capital of France?... AIMessage: Paris is the capital of France.... HumanMessage: What is its population?... AIMessage: Paris has a population of approximately 2.16 million in the c...
What just happened?
The code created a custom async message store that simulates database I/O with <code>await asyncio.sleep(0.01)</code> (non-blocking delays). <code>RunnableWithMessageHistory</code> wrapped the chain and connected it to the async store via the factory function <code>get_session_store()</code>. When <code>ainvoke()</code> was called, the chain fetched history using <code>aget_messages()</code>, passed it to the LLM, got a response, then appended both human and AI messages using <code>aadd_messages()</code>. The second call retrieved the full conversation history, demonstrating that the store persisted state without synchronous blocking.
Common gotcha
Developers often create a single shared store instance and reuse it across all sessions, which causes message history from user_123 to leak into user_456's conversation. The fix is the factory pattern: get_session_store(session_id) returns a unique store per session. If you pass a fixed store object instead of a callable to RunnableWithMessageHistory, concurrent requests will collide on writes and corrupt history.
Error recovery
TypeError: object BaseChatMessageHistory can't be used in 'await' expressionRuntimeError: Event loop is closedKeyError: 'session_id' in configExperienced dev note
In production, resist the urge to use Redis or PostgreSQL async libraries without load testing. The bottleneck is rarely the store's I/O: it's usually the LLM's 5+ second latency. Optimizing store latency from 50ms to 5ms saves almost nothing if the LLM takes 7 seconds. What async stores do prevent is thread starvation: synchronous stores on a single-threaded async server (e.g., FastAPI with 1 worker) will cause the entire server to block while waiting for the database, rejecting all other requests. With async stores, thousands of waiting requests queue smoothly without spawning threads. Profile first; premature async optimization wastes weeks.
Check your understanding
Why would switching from a synchronous session store to an async one improve throughput on a FastAPI server even if the database query takes the same time? What specifically breaks if you use the same store instance for all concurrent sessions instead of a factory?
Show answer hint
A correct answer explains that async stores don't block the event loop, allowing the server to handle other requests while waiting for I/O (concurrency, not parallelism). It also identifies that shared store instances cause race conditions on concurrent writes and message history merges between users: the factory pattern isolates state per session.
BaseChatMessageHistory and implement both aget_messages() and aadd_messages(). In langchain-core < 0.3.0, the base class was BaseChatMessageHistory but async methods were not consistently supported across all derived classes. Always check your version with pip show langchain-core.