Data versioning and lineage
Why this matters
When a fine-tuned model fails in production, you need to know: which exact dataset version trained it, what preprocessing steps were applied, and whether that data is still available. Without this, you cannot reproduce the failure, audit compliance, or safely retrain. This matters at scale where you're managing dozens of checkpoints across experiments.
Explanation
Data versioning and lineage means recording the exact identity (hash), source, and transformations of every dataset used in a training run, then linking that metadata to the model checkpoint it produced. It answers: What data trained this model? and Can I reproduce this exact training?
Mechanically, you compute a hash of your raw data, record every preprocessing transformation (tokenization rules, filtering, augmentation), version your code that performs those steps, and write all of this as metadata alongside your checkpoint. When training, you log the data version hash and link it to the output model. When debugging later, you read that metadata, fetch the exact same data version, apply the exact same transformations, and reproduce the training.
This is critical in production because training data changes: new examples are added, old examples are removed, preprocessing rules are updated. Without lineage, a 6-month-old checkpoint becomes unreproducible. Combined with model registries and experiment tracking, versioned data lets you roll back to any prior state or audit which data influenced which decision.
Analogy
Think of it like version control for datasets. Git tracks which commits produced which build. Data versioning tracks which data produced which model. Just as you wouldn't ship code without knowing its commit hash, you shouldn't ship a model without knowing its data hash and the exact transformations that created it.
Code
import hashlib
import json
from pathlib import Path
from datetime import datetime
from typing import Any
import pickle
class DataLineageTracker:
def __init__(self, lineage_dir: str = "./data_lineage"):
self.lineage_dir = Path(lineage_dir)
self.lineage_dir.mkdir(exist_ok=True)
def compute_data_hash(self, data: list[dict]) -> str:
"""Compute deterministic hash of dataset."""
serialized = json.dumps(data, sort_keys=True)
return hashlib.sha256(serialized.encode()).hexdigest()[:16]
def log_transformation(self, transform_name: str, params: dict) -> dict:
"""Record a preprocessing step."""
return {
"name": transform_name,
"params": params,
"timestamp": datetime.utcnow().isoformat()
}
def create_lineage_record(self, data: list[dict], transformations: list[dict],
model_name: str, hyperparams: dict) -> dict:
"""Create complete lineage: raw data hash + all transforms + training config."""
data_hash = self.compute_data_hash(data)
record = {
"data_hash": data_hash,
"data_size": len(data),
"transformations": transformations,
"model_name": model_name,
"hyperparams": hyperparams,
"created_at": datetime.utcnow().isoformat(),
"schema_version": "1.0"
}
return record
def save_lineage(self, record: dict, checkpoint_name: str) -> Path:
"""Save lineage metadata alongside model checkpoint."""
lineage_file = self.lineage_dir / f"{checkpoint_name}_lineage.json"
with open(lineage_file, "w") as f:
json.dump(record, f, indent=2)
return lineage_file
def load_lineage(self, checkpoint_name: str) -> dict:
"""Retrieve lineage for a checkpoint to understand what trained it."""
lineage_file = self.lineage_dir / f"{checkpoint_name}_lineage.json"
with open(lineage_file, "r") as f:
return json.load(f)
def verify_reproducibility(self, checkpoint_name: str, new_data: list[dict]) -> bool:
"""Check if new data matches the data that trained a checkpoint."""
record = self.load_lineage(checkpoint_name)
new_hash = self.compute_data_hash(new_data)
return new_hash == record["data_hash"]
if __name__ == "__main__":
tracker = DataLineageTracker()
raw_data = [
{"text": "The cat sat on the mat.", "label": "positive"},
{"text": "I hate waiting in traffic.", "label": "negative"},
{"text": "This is great news!", "label": "positive"},
]
transformations = [
tracker.log_transformation("lowercase", {}),
tracker.log_transformation("remove_punctuation", {}),
tracker.log_transformation("tokenize", {"max_length": 128}),
]
lineage_record = tracker.create_lineage_record(
data=raw_data,
transformations=transformations,
model_name="distilbert-base-uncased",
hyperparams={"learning_rate": 2e-5, "epochs": 3, "batch_size": 8}
)
saved_path = tracker.save_lineage(lineage_record, "checkpoint_v1")
print(f"Lineage saved to: {saved_path}")
loaded_record = tracker.load_lineage("checkpoint_v1")
print(f"\nLoaded lineage data_hash: {loaded_record['data_hash']}")
print(f"Transformations applied: {[t['name'] for t in loaded_record['transformations']]}")
is_reproducible = tracker.verify_reproducibility("checkpoint_v1", raw_data)
print(f"\nCan reproduce with same data: {is_reproducible}")
modified_data = raw_data[:2]
is_reproducible_modified = tracker.verify_reproducibility("checkpoint_v1", modified_data)
print(f"Can reproduce with modified data: {is_reproducible_modified}") Lineage saved to: data_lineage/checkpoint_v1_lineage.json Loaded lineage data_hash: b8c4a5e9f7d2c6a1 Transformations applied: ['lowercase', 'remove_punctuation', 'tokenize'] Can reproduce with same data: True Can reproduce with modified data: False
What just happened?
The code created a data versioning system that computes a deterministic hash of raw training data, records every preprocessing transformation as a timestamped step, combines that with model and hyperparameter metadata into a lineage record, and saves it alongside a checkpoint. When you later load a checkpoint, you can retrieve its lineage and verify whether you have the exact same data by recomputing its hash. The verify_reproducibility call proved that the original data matches (hash identical) but modified data does not (subset = different hash).
Common gotcha
Developers often hash only the final processed data, not the raw data. This is wrong because preprocessing can be non-deterministic (random augmentation, date-dependent rules) or have bugs. Always hash the raw input before any transformation. If preprocessing is randomized, you must either seed it deterministically or exclude the random examples from the hash. Also, JSON serialization order matters: the code uses sort_keys=True to ensure the same data always hashes the same way, even if Python dict iteration order changes.
Error recovery
FileNotFoundErrorValueError (hash mismatch)json.JSONDecodeErrorExperienced dev note
At scale, separate data versioning from experiment tracking. Your experiment tracker (MLflow, Weights & Biases) logs hyperparams and metrics; your data versioning system logs data identity and transformations. Link them via the data_hash field. This separation lets you query: 'all models trained on data_hash=abc' across different experiments. Also, hash the preprocessing code itself (e.g., hash of the tokenizer config file), not just the parameters. A 'max_length=128' param is useless without knowing which tokenizer library and version created it. In production, commit preprocessing code to git and record the git hash in lineage.
Check your understanding
You have checkpoint_v1 trained on raw_data with transformations T1→T2→T3. Six months later, you retrain on raw_data with transformations T1→T2→T3_modified (T3 now has different params). Both checkpoints have the same data_hash. Is this a problem? Why or why not?
Show answer hint
A correct answer must recognize that data hashing is independent of transformations: the same raw data produces the same hash regardless of how you preprocess it. The problem is not the data hash (both are identical), but that lineage records the transformation parameters separately. You would need to compare the transformation metadata (not just the hash) to detect that T3 changed. This reveals the key insight: data versioning alone is insufficient: you also need to version the preprocessing logic and parameters alongside it.