AI Observability Cheat Sheet — Monitor LLM & ML Systems
Observability is instrumentation + metrics + tracing for AI systems.
Like a hospital monitoring a patient with EKG, blood pressure, temperature, and oxygen: you need simultaneous signals to catch the heart attack before it happens. A single metric is a snapshot. Observability is the continuous, correlated story.
Key Concepts
Ai Observability Patterns
from langtrace_python_sdk import langtrace
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
langtrace.init(api_key=os.environ["LANGTRACE_API_KEY"])
prompt = ChatPromptTemplate.from_template("Explain {topic}")
llm = ChatOpenAI(model="gpt-4o")
chain = prompt | llm
result = chain.invoke({"topic": "quantum computing"})
print(result) Span captured automatically with latency, tokens, cost import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
trace.SimpleSpanProcessor(jaeger_exporter)
)
tracer = trace.get_tracer("my-app")
with tracer.start_as_current_span("retrieval") as span:
span.set_attribute("query", "What is RAG?")
docs = vector_db.search("What is RAG?", top_k=5)
span.set_attribute("doc_count", len(docs)) Jaeger trace visible at localhost:6831 with custom span attributes import os
from arize.utils.types import (
ModelType, EmbeddingColumnNames, Schema
)
from arize.pandas.logger import Client
client = Client(
api_key=os.environ["ARIZE_API_KEY"],
space_key=os.environ["ARIZE_SPACE_KEY"]
)
# Log predictions after each LLM call
response = llm.generate(prompt="Summarize this text...")
schema = Schema(
prediction_id_column_name="pred_id",
prediction_ts_column_name="timestamp",
actual_label_column_name="actual_summary"
)
client.log(
dataframe=predictions_df,
model_id="gpt4-summarizer",
model_type=ModelType.GENERATIVE_LLM,
schema=schema,
environment="production"
)
# Later: query performance by time window
metrics = client.get_metrics(
model_id="gpt4-summarizer",
metric=["accuracy", "latency_p99"]
) Dashboard showing accuracy, latency, and drift alerts per day/hour import json
import logging
from datetime import datetime
logger = logging.getLogger("llm_calls")
logger.setLevel(logging.INFO)
def log_llm_call(prompt, model, response, latency_ms, tokens_used):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"event": "llm_call",
"model": model,
"prompt_hash": hash(prompt) % 10**9,
"response_length": len(response),
"latency_ms": latency_ms,
"tokens_used": tokens_used,
"cost_usd": (tokens_used / 1000) * 0.003,
}
logger.info(json.dumps(log_entry))
# After every generation
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "..."}]
)
log_llm_call(
prompt="...",
model="gpt-4o",
response=response.choices[0].message.content,
latency_ms=245,
tokens_used=response.usage.total_tokens
) JSON logs sent to ELK/DataDog/CloudWatch for correlation import os
from openai import OpenAI
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
GOLDEN_TESTS = [
{
"input": "What is 2+2?",
"expected_keywords": ["4"],
"must_not_contain": ["5", "3"],
},
{
"input": "Explain recursion in 1 sentence.",
"expected_keywords": ["function", "itself"],
"must_not_contain": ["loop"],
},
]
def run_synthetic_check():
passed = 0
failed = []
for test in GOLDEN_TESTS:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": test["input"]}]
)
output = response.choices[0].message.content.lower()
has_keywords = all(kw.lower() in output for kw in test["expected_keywords"])
no_forbidden = not any(kw.lower() in output for kw in test["must_not_contain"])
if has_keywords and no_forbidden:
passed += 1
else:
failed.append((test["input"], output))
if failed:
alert(f"Synthetic check failed: {len(failed)}/{len(GOLDEN_TESTS)}")
return passed, failed
# Schedule hourly via cron or Airflow
passed, failed = run_synthetic_check() Alert if >1 test fails; log failed input/output for investigation Ai Observability Comparison
| Tool/Platform | Best For | Integration Effort | Cost Model |
|---|---|---|---|
| LangSmith (LangChain team) | LangChain apps + multi-agent debugging | 1 line: set LANGCHAIN_API_KEY env var | Pay-per-trace or per-project |
| LangTrace (open-source) | Framework-agnostic tracing, self-hosted option | 2-3 line init, works with any LLM framework | Free tier or self-hosted |
| Arize | Production ML monitoring, drift detection, model registry | 3-5 lines to log predictions + ground truth | Per-model per-month subscription |
| Datadog APM | Full-stack observability (app + LLM + DB) | Install agent, add tags | Per-host or ingested logs volume |
| OpenTelemetry (self-hosted) | Vendor-agnostic, complete control, cost-sensitive | 20+ lines setup, requires Jaeger/Prometheus infrastructure | Only infra costs (self-hosted) |
| Weights & Biases (W&B) | LLM experiment tracking, model evaluation, versioning | 2 lines init + log call | Free for individuals, paid for teams |
Common Errors & Fixes
"Trace exporter not initialized" or traces not appearing in dashboard Cause: Tracer or span processor not set up before first LLM call, or API key/endpoint wrong.
Initialize tracer FIRST in your app startup, before any imports of LLM libraries:
from langtrace_python_sdk import langtrace
langtrace.init(api_key=os.environ["LANGTRACE_API_KEY"])
# THEN import and use LangChain
from langchain_openai import ChatOpenAI Ground truth labels missing; can't measure accuracy or detect drift Cause: Logging predictions to Arize but not shipping actual/expected values from production.
Capture user feedback or correct answers after model runs, then backfill ground truth:
# After user provides feedback
client.log(
dataframe=corrections_df,
model_id="gpt4-summarizer",
schema=Schema(
prediction_id_column_name="pred_id",
actual_label_column_name="correct_summary"
)
)
# Arize will now compute accuracy retroactively Latency spikes but no visibility into which step (prompt, embedding, LLM, parsing) is slow Cause: Only logging end-to-end latency; not tracing individual spans for each step.
Wrap each major step in a span with timing:
with tracer.start_as_current_span("embedding") as span:
embeddings = embed_model.encode(query)
with tracer.start_as_current_span("vector_search") as span:
results = vector_db.search(embeddings)
with tracer.start_as_current_span("llm_generation") as span:
output = llm.generate(prompt) Cost attribution unclear; don't know which features/queries cost most Cause: Not tagging traces/logs with business context (customer_id, feature_name, request_type).
Add attributes to every span:
with tracer.start_as_current_span("chat_completion") as span:
span.set_attribute("customer_id", customer_id)
span.set_attribute("feature", "recommendation_engine")
span.set_attribute("model", "gpt-4o")
response = client.chat.completions.create(...) Hallucinations not detected; model says false facts and no alert fires Cause: Synthetic tests only check format/keyword presence, not factual correctness. No ground truth for natural queries.
Add a secondary fact-check step or retrieval-augmented grading:
response = llm.generate(prompt)
fact_check = llm.generate(
f"Is this factually correct? {response}\nContext: {retrieved_docs}"
)
if "no" in fact_check.lower():
alert(f"Potential hallucination detected: {response}") Production Gotchas
Sampling traces (1% of requests) saves cost but you'll miss the tail latencies and rare errors. Sample uniformly; don't drop error traces. Better: sample by latency percentile or error type.
"Latency is 500ms" means nothing. Is it p50 or p99? For which customer segment? Which model? Always add dimensions: customer_tier, model, endpoint, feature.
Drift only detectable if you're comparing predictions to reality. If ground truth is a one-time batch annotation, you'll miss silent failures between annotations.
Logging the raw prompt/response for debugging is fine locally. In production with external vendors, hash or redact email, phone, SSN, credit cards, or customer data.
Every metric threshold generates an alert: ops team mutes them all. Start with 1-2 critical alerts (error rate > 5%, latency p99 > 10s). Add rules, not noise.
GPT-4o charges input + output separately. Cheap input prompt + expensive retrieval context + cheap output ≠ cheap request. Track and attribute cost by component.
OpenAI may change gpt-4o weights mid-month. Your golden tests pass but output distribution drifts. Always log the model name and API response timestamp for version tracking.
Fire-and-forget logging (async queue) means trace loss on application crash. Use blocking or batch logging for critical observability data. Accept slightly higher latency for correctness.
Complete end-to-end observability setup for a RAG pipeline: tracing, metrics, and structured logging.
import os
import json
import logging
from datetime import datetime
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
# Setup: Initialize observability
otlp_exporter = OTLPSpanExporter(
endpoint="http://localhost:4317"
)
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(otlp_exporter)
)
tracer = trace.get_tracer("rag-pipeline")
logger = logging.getLogger("rag-calls")
logger.setLevel(logging.INFO)
# Models
llm = ChatOpenAI(model="gpt-4o", api_key=os.environ["OPENAI_API_KEY"])
vector_db = None # Initialize your vector DB here
def rag_query(user_question: str, customer_id: str) -> str:
"""Execute RAG with full observability."""
with tracer.start_as_current_span("rag_query") as main_span:
main_span.set_attribute("customer_id", customer_id)
main_span.set_attribute("question", user_question[:100])
start_time = datetime.utcnow()
# Step 1: Retrieve documents
with tracer.start_as_current_span("retrieval") as span:
span.set_attribute("query", user_question)
# documents = vector_db.search(user_question, top_k=5)
documents = [
{"text": "RAG is Retrieval-Augmented Generation.", "score": 0.95}
] # Mock
span.set_attribute("doc_count", len(documents))
span.set_attribute("top_score", documents[0]["score"])
# Step 2: Generate answer
context = "\n".join([d["text"] for d in documents])
prompt = ChatPromptTemplate.from_template(
"Context: {context}\nQuestion: {question}\nAnswer:"
)
chain = prompt | llm
with tracer.start_as_current_span("llm_generation") as span:
response = chain.invoke({"context": context, "question": user_question})
span.set_attribute("model", "gpt-4o")
span.set_attribute("output_length", len(response.content))
end_time = datetime.utcnow()
total_latency_ms = int((end_time - start_time).total_seconds() * 1000)
# Log structured event for analysis
log_entry = {
"timestamp": start_time.isoformat(),
"event": "rag_completion",
"customer_id": customer_id,
"question_hash": hash(user_question) % 10**9,
"doc_count": len(documents),
"model": "gpt-4o",
"latency_ms": total_latency_ms,
"output_length": len(response.content),
}
logger.info(json.dumps(log_entry))
return response.content
# Usage
if __name__ == "__main__":
answer = rag_query(
user_question="What is RAG?",
customer_id="cust_12345"
)
print(answer)