Composing custom runnables into larger chains
Why this matters
Production chains often need custom transformations, validation, or business logic between LLM calls. Without understanding custom runnables, you'll either break your chain with incompatible types or resort to messy workarounds outside the LCEL framework.
Explanation
What it is: A custom runnable is a Python class that implements the Runnable protocol (specifically invoke(), batch(), and stream() methods), allowing your business logic to behave like any built-in LangChain component. This makes it composable with | operators alongside prompts, LLMs, and output parsers.
How it works mechanically: Inherit from BaseModel and Runnable, define input/output schemas as Pydantic models, and implement invoke() (and optionally batch(), stream()) to transform input to output. The runtime uses your input schema to validate data flowing into your runnable, and chains verify that your output schema matches the next component's input schema. This type-safety happens at chain construction time, not runtime.
When to use: When you need to inject validation, formatting, filtering, or API calls between chain stages. The runnable becomes a first-class citizen in your LCEL pipeline, allowing you to build sophisticated chains without breaking the compositional abstraction.
Analogy
A custom runnable is like inserting a worker into an assembly line who has a specific input spec and output spec printed on their badge. The assembly line can verify everyone's specs match before production starts. The worker does their job (invoke), can handle batches of items (batch), or process items one-by-one in a stream (stream).
Code
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import Runnable, RunnableConfig
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
class DocumentChunk(BaseModel):
text: str = Field(description="Raw text chunk")
source: str = Field(description="Source identifier")
class EnrichedChunk(BaseModel):
text: str
source: str
word_count: int
char_count: int
class ChunkEnricher(Runnable[DocumentChunk, EnrichedChunk]):
"""Custom runnable that adds metadata to document chunks."""
def invoke(
self,
input: DocumentChunk,
config: RunnableConfig | None = None,
) -> EnrichedChunk:
return EnrichedChunk(
text=input.text,
source=input.source,
word_count=len(input.text.split()),
char_count=len(input.text),
)
def batch(
self,
inputs: list[DocumentChunk],
config: RunnableConfig | None = None,
**kwargs: Any,
) -> list[EnrichedChunk]:
return [self.invoke(item, config) for item in inputs]
def stream(
self,
input: DocumentChunk,
config: RunnableConfig | None = None,
**kwargs: Any,
):
yield self.invoke(input, config)
class SummaryRequest(BaseModel):
summary: str
metadata: dict
class ResponseFormatter(Runnable[str, SummaryRequest]):
"""Custom runnable that structures LLM output."""
def invoke(
self,
input: str,
config: RunnableConfig | None = None,
) -> SummaryRequest:
return SummaryRequest(
summary=input.strip(),
metadata={"source": "llm", "formatted": True},
)
def batch(
self,
inputs: list[str],
config: RunnableConfig | None = None,
**kwargs: Any,
) -> list[SummaryRequest]:
return [self.invoke(item, config) for item in inputs]
def stream(
self,
input: str,
config: RunnableConfig | None = None,
**kwargs: Any,
):
yield self.invoke(input, config)
chunk_enricher = ChunkEnricher()
response_formatter = ResponseFormatter()
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
prompt = ChatPromptTemplate.from_template(
"Summarize this text in one sentence:\n{text}"
)
chain = (
chunk_enricher
| prompt
| llm
| StrOutputParser()
| response_formatter
)
sample_chunk = DocumentChunk(
text="Artificial intelligence is transforming industries by automating complex tasks and enabling new capabilities.",
source="article_42",
)
result = chain.invoke(sample_chunk)
print(f"Summary: {result.summary}")
print(f"Word count: {result.metadata}") Summary: AI is transforming industries through task automation and new capability enablement.
Word count: {'source': 'llm', 'formatted': True} What just happened?
The code defined two custom runnables: ChunkEnricher transforms a DocumentChunk into an EnrichedChunk by calculating word and character counts. ResponseFormatter wraps the LLM's string output into a structured SummaryRequest with metadata. These were composed into a pipeline with the pipe operator. When invoked with a sample chunk, the chain flowed: chunk enricher added metadata → prompt formatted the enriched text → LLM generated a summary → output parser extracted string → formatter wrapped it in a structured object. Each step's output schema matched the next step's input schema because we defined them explicitly.
Common gotcha
The most common mistake is forgetting that your custom runnable's input type hint and invoke() parameter type must match exactly: LangChain uses Pydantic validation before your invoke() is called, so if you declare input as DocumentChunk but pass in a dict, it will fail during validation, not during your code execution. Also, if you skip the batch() and stream() methods, they fall back to calling invoke() in a loop, which works but loses parallelization benefits and streaming semantics. Always implement all three methods, even if batch() and stream() just wrap invoke().
Error recovery
TypeError: Object of type DocumentChunk is not JSON serializableValidationError: 1 validation error for DocumentChunkAttributeError: 'ChunkEnricher' object has no attribute 'stream'RuntimeError: Runnable with non-standard input/output types cannot be used in pipe expressionsExperienced dev note
The real insight: custom runnables aren't just for wrapping arbitrary logic: they're a type contract system. By declaring input and output schemas as Pydantic models, you're telling LangChain exactly what you promise to accept and produce. This enables the framework to compose chains at construction time and catch type mismatches immediately, not after the chain runs for 10 seconds and hits the LLM call. In production, this prevents subtle bugs where data flows through 5 pipeline stages and fails on stage 4 with a cryptic validation error. Also, batch() and stream() aren't optional niceties: stream() is critical for real-time UX (you can see intermediate results) and batch() is critical for throughput. A runnable that only implements invoke() will serialize every call, destroying concurrency.
Check your understanding
You have a custom runnable that takes a string and returns a dict with keys 'result' and 'confidence'. This runnable is composed in a chain after an LLM output parser. The parser returns a string. The chain fails with 'ValidationError: input_type validation failed'. What is the most likely cause, and what specifically would you change in the custom runnable's class definition?
Show answer hint
The answer must recognize that the custom runnable's input schema is not correctly declared to accept a string from the previous stage. Specifically, the runnable's class signature should explicitly declare its input type as a Runnable[str, dict] (or similar) and the invoke method parameter must be typed as str. Simply writing invoke(self, input) without a type hint won't trigger LangChain's validation.