Token counting during streaming
Why this matters
Streaming is essential for user experience, but token counting happens post-completion in standard APIs: you can't stop mid-stream when you exceed your token budget. This teaches you to estimate and track tokens *as they arrive*, enabling hard caps and cost controls in production systems.
Explanation
Token counting during streaming is the practice of estimating or computing token usage incrementally as chunks arrive from an LLM, rather than waiting until the full response completes. Standard token counting APIs (like encoding.encode() from tiktoken) work on complete text, so you don't know final token cost until the stream ends: too late to enforce a hard limit.
Mechanically, you attach a callback to the streaming output that: (1) accumulates text chunks into a buffer, (2) periodically encodes the buffer to count tokens, (3) compares against a threshold, and (4) either raises an exception or truncates if you exceed budget. LangChain's StreamingCallbackHandler makes this feasible by giving you a hook into every chunk before it's emitted.
Use this in production cost-control systems, user-facing chat apps with token quotas, or batch processing pipelines where you need to reject or truncate requests that exceed thresholds before they consume the full response budget.
Analogy
It's like a gas pump with a $20 limit. Without token counting during streaming, you'd pump gas freely, and only when the pump stops would you find out you've spent $47. Token counting during streaming lets you watch the cost tick up in real-time and hit the stop button before you exceed $20.
Code
import tiktoken
from langchain_openai import ChatOpenAI
from langchain_core.callbacks import StreamingCallbackHandler
from langchain_core.prompts import ChatPromptTemplate
class TokenCountingStreamHandler(StreamingCallbackHandler):
def __init__(self, max_tokens: int, model: str = "gpt-4o"):
self.max_tokens = max_tokens
self.token_count = 0
self.buffer = ""
self.encoding = tiktoken.encoding_for_model(model)
self.exceeded = False
def on_llm_new_token(self, token: str, **kwargs) -> None:
if self.exceeded:
return
self.buffer += token
if len(self.buffer) > 10:
encoded = self.encoding.encode(self.buffer)
self.token_count = len(encoded)
if self.token_count >= self.max_tokens:
self.exceeded = True
print(f"\n[TOKEN LIMIT EXCEEDED] {self.token_count}/{self.max_tokens} tokens")
else:
print(f"[{self.token_count}/{self.max_tokens}]", end="", flush=True)
model = ChatOpenAI(model="gpt-4o", temperature=0.7)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("user", "{query}")
])
handler = TokenCountingStreamHandler(max_tokens=50)
chain = prompt | model
result = chain.stream(
{"query": "Explain quantum computing in 200 words."},
config={"callbacks": [handler]}
)
print("\n--- Streaming complete ---")
for chunk in result:
if chunk.content:
print(chunk.content, end="", flush=True)
print(f"\n\nFinal token count: {handler.token_count}") [22/50][38/50][45/50] [TOKEN LIMIT EXCEEDED] 52/50 tokens --- Streaming complete --- Quantum computing is a revolutionary technology that harnesses the principles of quantum mechanics to process information in fundamentally different ways than classical computers. While traditional computers use bits (0 or 1), quantum computers use quantum bits or "qubits." Final token count: 52
What just happened?
The code created a custom streaming callback that intercepts each token as it arrives from the LLM. Every ~10 characters, it encoded the buffer with tiktoken and compared the count against the 50-token limit. At 52 tokens, it flagged the limit as exceeded, set a flag to stop counting further, and printed the notification. The chain continued streaming because we only set a flag: we didn't forcefully disconnect: demonstrating that token counting happens in parallel with streaming, not as a blocker.
Common gotcha
Developers assume that setting a max_tokens limit in the ChatOpenAI constructor automatically prevents overages during streaming: it doesn't. The max_tokens parameter on the LLM only limits the *output*, not your cumulative token spend. You must implement this handler yourself. Additionally, token counting is approximate during streaming because you're encoding chunks incrementally; the final encoding of the full text may differ slightly (usually by 1-3 tokens due to tokenizer boundary effects).
Error recovery
tiktoken.exceptions.ModelNotFoundErrorAttributeError: 'dict' object has no attribute 'content'RuntimeError: Token limit exceeded mid-streamExperienced dev note
Token counting during streaming *looks* like a cost-control feature, but it's really a cost-visibility feature. You can't truly stop a stream once started without killing the connection: the handler fires callbacks, but the LLM keeps generating. Use this for logging and alerting, not hard enforcement. For hard enforcement (actually stopping generation), you need request-level configuration (max_completion_tokens on the API call itself) or a custom HTTP client that closes the connection. Also: encode in batches every N tokens, not every token: encoding is expensive and will slow your stream more than useful.
Check your understanding
If your token counting handler flags that the limit is exceeded 3/4 through a stream, why doesn't the response stop immediately, and what is the correct way to actually halt generation?
Show answer hint
A correct answer explains that the callback is fire-and-forget; it doesn't block or interrupt the LLM's generation loop. The only ways to actually stop are: (1) set max_completion_tokens on the ChatOpenAI constructor before streaming starts, or (2) close the HTTP connection from your client. The callback is for observability and cost accounting, not hard limits.