IngestionPipeline: the processing graph
Why this matters
In production, you'll process documents multiple times with the same transformations. Without IngestionPipeline, you either duplicate transformation code or build brittle ad-hoc chains. Pipeline captures the exact sequence once, making your data processing repeatable, testable, and auditable.
Explanation
What it is: IngestionPipeline is a reusable graph of document processors that transform raw documents into indexable nodes. Each node in the graph applies a transformation (split text, extract metadata, embed, deduplicate) and passes results to the next.
How it works: You instantiate processors (e.g., SentenceSplitter, MetadataExtractor), pass them to IngestionPipeline, then call .run(documents=docs) once. The pipeline executes the full chain and caches by default: reprocessing the same document returns the cached result instantly, preventing wasted API calls and compute. You can run the same pipeline against new batches without rewriting logic.
When to use it: Anytime you ingest documents into an index in production or need repeatable, version-controlled transformation logic. Common patterns: batch ingestion jobs, multi-source document imports, A/B testing different chunking strategies on the same source.
Analogy
Think of it like a CI/CD pipeline for data. You define stages (build, test, deploy) once in a YAML file, then trigger it on any code change. IngestionPipeline is the same idea: define your data transformation stages once, then run them on document batches without redefining the flow.
Code
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor, QuestionsExtractor
from llama_index.core import Document
docs = [
Document(text="Machine learning is a subset of AI that enables systems to learn from data. Deep learning uses neural networks with multiple layers."),
Document(text="Natural language processing powers chatbots and translation. Transformers are the backbone of modern NLP models like GPT."),
]
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=50, chunk_overlap=10),
TitleExtractor(),
QuestionsExtractor(questions_per_chunk=1),
]
)
nodes = pipeline.run(documents=docs)
print(f"Total nodes: {len(nodes)}")
for i, node in enumerate(nodes[:3]):
print(f"\nNode {i}:")
print(f" Text: {node.get_content()}")
if node.metadata:
print(f" Metadata: {dict(list(node.metadata.items())[:2])}") Total nodes: 6
Node 0:
Text: Machine learning is a subset of AI that enables systems to
Metadata: {'document_id': '...', 'excerpt_iq': 'What is machine learning?'}
Node 1:
Text: learn from data. Deep learning uses neural networks with
Metadata: {'document_id': '...', 'excerpt_iq': 'How do neural networks work?'}
Node 2:
Text: multiple layers.
Metadata: {'document_id': '...', 'excerpt_iq': 'What are the layers in a neural network?'} What just happened?
The pipeline took two raw documents and split each into sentences (SentenceSplitter), extracted titles for metadata context (TitleExtractor), and generated one question per chunk to improve retrieval (QuestionsExtractor). Each transformation passed its output as input to the next. The final result is 6 nodes (chunks) with embedded metadata, all generated in a single reusable run.
Common gotcha
Developers often assume caching in IngestionPipeline works by document ID. It actually caches by document content hash. If you modify a document's text but keep the same ID, the pipeline won't recalculate: it returns the old cached nodes. Always be explicit: either clear the cache with pipeline.cache.clear() before rerunning, or use pipeline.run(..., cache_dict={}) to disable caching for that run.
Error recovery
ImportError: cannot import name 'IngestionPipeline'AttributeError: 'Document' object has no attribute 'metadata'ValueError: transformations list is emptyExperienced dev note
In production systems, version your IngestionPipeline configuration separately from your index. Store the pipeline definition (transformer types, parameters, order) as code or JSON. If you change chunking strategy or add a new extractor, bump the version and reprocess old documents through the new pipeline before indexing. This prevents silent data corruption where some nodes were chunked one way and new nodes another way: leading to retrieval quality degradation that's hard to debug.
Check your understanding
You have two Document sources that update weekly. You build an IngestionPipeline with SentenceSplitter and a custom MetadataExtractor. After running the pipeline once on source A, you update source A's content and run the pipeline again on the same document. Why might your new nodes look identical to the old ones, and how would you verify the pipeline actually reprocessed the document?
Show answer hint
The answer must mention caching by content hash and demonstrate understanding that an unchanged document body will return cached results. The fix is either clearing the cache explicitly or understanding that modifying document content requires explicit cache invalidation.