Code Advanced hard · 8 min

Composing custom runnables into larger chains

What you will learn
Build complex chains by wrapping custom Python logic into runnable objects that compose seamlessly with LLM pipelines.

Why this matters

Production chains often need custom transformations, validation, or business logic between LLM calls. Without understanding custom runnables, you'll either break your chain with incompatible types or resort to messy workarounds outside the LCEL framework.

Skip if: If your pipeline is entirely LLM → prompt → LLM with no custom logic, stick to simple pipe syntax. Don't wrap everything into a custom runnable just for organizational clarity: that adds complexity without benefit. Also avoid custom runnables if you need distributed execution across multiple machines; use LangGraph instead.

Explanation

What it is: A custom runnable is a Python class that implements the Runnable protocol (specifically invoke(), batch(), and stream() methods), allowing your business logic to behave like any built-in LangChain component. This makes it composable with | operators alongside prompts, LLMs, and output parsers.

How it works mechanically: Inherit from BaseModel and Runnable, define input/output schemas as Pydantic models, and implement invoke() (and optionally batch(), stream()) to transform input to output. The runtime uses your input schema to validate data flowing into your runnable, and chains verify that your output schema matches the next component's input schema. This type-safety happens at chain construction time, not runtime.

When to use: When you need to inject validation, formatting, filtering, or API calls between chain stages. The runnable becomes a first-class citizen in your LCEL pipeline, allowing you to build sophisticated chains without breaking the compositional abstraction.

Analogy

A custom runnable is like inserting a worker into an assembly line who has a specific input spec and output spec printed on their badge. The assembly line can verify everyone's specs match before production starts. The worker does their job (invoke), can handle batches of items (batch), or process items one-by-one in a stream (stream).

Code

python
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import Runnable, RunnableConfig
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

class DocumentChunk(BaseModel):
    text: str = Field(description="Raw text chunk")
    source: str = Field(description="Source identifier")

class EnrichedChunk(BaseModel):
    text: str
    source: str
    word_count: int
    char_count: int

class ChunkEnricher(Runnable[DocumentChunk, EnrichedChunk]):
    """Custom runnable that adds metadata to document chunks."""

    def invoke(
        self,
        input: DocumentChunk,
        config: RunnableConfig | None = None,
    ) -> EnrichedChunk:
        return EnrichedChunk(
            text=input.text,
            source=input.source,
            word_count=len(input.text.split()),
            char_count=len(input.text),
        )

    def batch(
        self,
        inputs: list[DocumentChunk],
        config: RunnableConfig | None = None,
        **kwargs: Any,
    ) -> list[EnrichedChunk]:
        return [self.invoke(item, config) for item in inputs]

    def stream(
        self,
        input: DocumentChunk,
        config: RunnableConfig | None = None,
        **kwargs: Any,
    ):
        yield self.invoke(input, config)

class SummaryRequest(BaseModel):
    summary: str
    metadata: dict

class ResponseFormatter(Runnable[str, SummaryRequest]):
    """Custom runnable that structures LLM output."""

    def invoke(
        self,
        input: str,
        config: RunnableConfig | None = None,
    ) -> SummaryRequest:
        return SummaryRequest(
            summary=input.strip(),
            metadata={"source": "llm", "formatted": True},
        )

    def batch(
        self,
        inputs: list[str],
        config: RunnableConfig | None = None,
        **kwargs: Any,
    ) -> list[SummaryRequest]:
        return [self.invoke(item, config) for item in inputs]

    def stream(
        self,
        input: str,
        config: RunnableConfig | None = None,
        **kwargs: Any,
    ):
        yield self.invoke(input, config)

chunk_enricher = ChunkEnricher()
response_formatter = ResponseFormatter()

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
prompt = ChatPromptTemplate.from_template(
    "Summarize this text in one sentence:\n{text}"
)

chain = (
    chunk_enricher
    | prompt
    | llm
    | StrOutputParser()
    | response_formatter
)

sample_chunk = DocumentChunk(
    text="Artificial intelligence is transforming industries by automating complex tasks and enabling new capabilities.",
    source="article_42",
)

result = chain.invoke(sample_chunk)
print(f"Summary: {result.summary}")
print(f"Word count: {result.metadata}")
Output
Summary: AI is transforming industries through task automation and new capability enablement.
Word count: {'source': 'llm', 'formatted': True}

What just happened?

The code defined two custom runnables: ChunkEnricher transforms a DocumentChunk into an EnrichedChunk by calculating word and character counts. ResponseFormatter wraps the LLM's string output into a structured SummaryRequest with metadata. These were composed into a pipeline with the pipe operator. When invoked with a sample chunk, the chain flowed: chunk enricher added metadata → prompt formatted the enriched text → LLM generated a summary → output parser extracted string → formatter wrapped it in a structured object. Each step's output schema matched the next step's input schema because we defined them explicitly.

Common gotcha

The most common mistake is forgetting that your custom runnable's input type hint and invoke() parameter type must match exactly: LangChain uses Pydantic validation before your invoke() is called, so if you declare input as DocumentChunk but pass in a dict, it will fail during validation, not during your code execution. Also, if you skip the batch() and stream() methods, they fall back to calling invoke() in a loop, which works but loses parallelization benefits and streaming semantics. Always implement all three methods, even if batch() and stream() just wrap invoke().

Error recovery

TypeError: Object of type DocumentChunk is not JSON serializable
Your custom runnable's output model contains non-JSON-serializable fields (like datetime without custom serializer). Solution: Add Field(serialization_alias=...) to Pydantic fields or use Pydantic v2's json_encoders config.
ValidationError: 1 validation error for DocumentChunk
The data flowing into your runnable doesn't match the declared input schema. Solution: Check that the previous component in the chain produces exactly the type you declared. Use chain.get_graph().print_ascii() to visualize type flow and catch mismatches.
AttributeError: 'ChunkEnricher' object has no attribute 'stream'
You defined a custom runnable but didn't implement stream(). Solution: Implement stream() or inherit from SimpleRunnable if you only need invoke().
RuntimeError: Runnable with non-standard input/output types cannot be used in pipe expressions
You didn't inherit from Runnable[InputType, OutputType]. Solution: Ensure your class signature is class MyRunnable(Runnable[InputType, OutputType]) and that InputType and OutputType are concrete, serializable types.

Experienced dev note

The real insight: custom runnables aren't just for wrapping arbitrary logic: they're a type contract system. By declaring input and output schemas as Pydantic models, you're telling LangChain exactly what you promise to accept and produce. This enables the framework to compose chains at construction time and catch type mismatches immediately, not after the chain runs for 10 seconds and hits the LLM call. In production, this prevents subtle bugs where data flows through 5 pipeline stages and fails on stage 4 with a cryptic validation error. Also, batch() and stream() aren't optional niceties: stream() is critical for real-time UX (you can see intermediate results) and batch() is critical for throughput. A runnable that only implements invoke() will serialize every call, destroying concurrency.

Check your understanding

You have a custom runnable that takes a string and returns a dict with keys 'result' and 'confidence'. This runnable is composed in a chain after an LLM output parser. The parser returns a string. The chain fails with 'ValidationError: input_type validation failed'. What is the most likely cause, and what specifically would you change in the custom runnable's class definition?

Show answer hint

The answer must recognize that the custom runnable's input schema is not correctly declared to accept a string from the previous stage. Specifically, the runnable's class signature should explicitly declare its input type as a Runnable[str, dict] (or similar) and the invoke method parameter must be typed as str. Simply writing invoke(self, input) without a type hint won't trigger LangChain's validation.

VERSION In langchain < 1.0.0, custom runnables used run() instead of invoke() and the protocol was more loosely defined. langchain 1.2.x (April 2026) requires explicit Runnable[InputType, OutputType] inheritance and strictly enforces the invoke/batch/stream contract through Pydantic BaseModel schemas. Code written for langchain 0.x will not work without refactoring to the new protocol.
NEXT

Once custom runnables work, the next step is composing them into stateful workflows using LangGraph, where you can manage memory, conditional branching, and multi-turn interactions.

Community Notes

No notes yetBe the first to share a version-specific fix or tip.