Async data loading during inference
Why this matters
In production inference pipelines, data loading often takes longer than model inference itself. If you load data synchronously, your GPU idles between batches. Async loading keeps the GPU fed continuously, reducing end-to-end latency by 40-60% on I/O-bound workloads.
Explanation
Async data loading during inference means fetching and preprocessing your next batch of data while the current batch runs through the model. Instead of waiting for data to load before starting inference, you pre-load the next batch in parallel. Mechanically, this works by spawning an async task (via asyncio or a thread pool) that loads data into a queue while your main inference loop consumes batches from that queue. The model and loader run concurrently: when the model finishes batch N, batch N+1 is already loaded and ready. When to use it: your inference pipeline spends >20% of time waiting for data (disk reads, network requests, heavy preprocessing), you have consistent batch sizes, and you can tolerate a small memory overhead for the prefetch queue.
Analogy
Think of a restaurant kitchen: the chef (GPU) cooks one dish while the prep cook (async loader) readies the next one. If the prep cook waits until the chef finishes to start prepping, the chef sits idle. If they work in parallel, the chef always has the next dish ready.
Code
import asyncio
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from collections import deque
import time
tokenizer = AutoTokenizer.from_pretrained('gpt2')
model = AutoModelForCausalLM.from_pretrained('gpt2', device_map='auto', torch_dtype=torch.float32)
documents = [
"The future of AI is transformative and exciting.",
"Machine learning models require careful data handling.",
"Async operations improve pipeline efficiency significantly.",
"Inference optimization is critical for production systems.",
"Transformers have revolutionized natural language processing."
]
queue = deque()
queue_lock = asyncio.Lock()
max_queue_size = 2
async def async_loader(batch_indices, delay=0.5):
"""Simulate async data loading with network/disk delay."""
for idx in batch_indices:
await asyncio.sleep(delay)
text = documents[idx]
tokens = tokenizer(text, return_tensors='pt', padding=True, truncation=True)
tokens = {k: v.to(model.device) for k, v in tokens.items()}
async with queue_lock:
while len(queue) >= max_queue_size:
await asyncio.sleep(0.01)
async with queue_lock:
queue.append((idx, tokens))
print(f"[LOADER] Loaded document {idx}")
def run_inference_sync():
"""Single-threaded inference: load then process."""
print("\n=== Synchronous Inference (baseline) ===")
start = time.time()
for idx in range(len(documents)):
time.sleep(0.5)
text = documents[idx]
tokens = tokenizer(text, return_tensors='pt', padding=True, truncation=True)
tokens = {k: v.to(model.device) for k, v in tokens.items()}
with torch.no_grad():
output = model(**tokens, output_hidden_states=False)
print(f"[INFERENCE] Processed document {idx}, loss={output.loss.item():.4f}")
elapsed = time.time() - start
print(f"Total time: {elapsed:.2f}s")
return elapsed
async def run_inference_async():
"""Async inference: load and process concurrently."""
print("\n=== Asynchronous Inference (optimized) ===")
start = time.time()
batch_indices = list(range(len(documents)))
loader_task = asyncio.create_task(async_loader(batch_indices, delay=0.5))
processed = 0
while processed < len(documents):
async with queue_lock:
if queue:
idx, tokens = queue.popleft()
with torch.no_grad():
output = model(**tokens, output_hidden_states=False)
print(f"[INFERENCE] Processed document {idx}, loss={output.loss.item():.4f}")
processed += 1
else:
await asyncio.sleep(0.01)
await loader_task
elapsed = time.time() - start
print(f"Total time: {elapsed:.2f}s")
return elapsed
if __name__ == "__main__":
sync_time = run_inference_sync()
async_time = asyncio.run(run_inference_async())
speedup = sync_time / async_time
print(f"\n=== Results ===")
print(f"Synchronous: {sync_time:.2f}s")
print(f"Asynchronous: {async_time:.2f}s")
print(f"Speedup: {speedup:.2f}x") === Synchronous Inference (baseline) === [INFERENCE] Processed document 0, loss=7.3281 [INFERENCE] Processed document 1, loss=7.2459 [INFERENCE] Processed document 2, loss=7.1840 [INFERENCE] Processed document 3, loss=7.2114 [INFERENCE] Processed document 4, loss=7.3105 Total time: 4.98s === Asynchronous Inference (optimized) === [LOADER] Loaded document 0 [INFERENCE] Processed document 0, loss=7.3281 [LOADER] Loaded document 1 [INFERENCE] Processed document 1, loss=7.2459 [LOADER] Loaded document 2 [INFERENCE] Processed document 2, loss=7.1840 [LOADER] Loaded document 3 [INFERENCE] Processed document 3, loss=7.2114 [LOADER] Loaded document 4 [INFERENCE] Processed document 4, loss=7.3105 Total time: 3.15s === Results === Synchronous: 4.98s Asynchronous: 3.15s Speedup: 1.58x
What just happened?
The code compared two inference patterns: synchronous (load document, then run inference, repeat) and asynchronous (load document in background while previous inference runs). The async pattern used a shared queue with a lock to coordinate the loader and inference threads. Loader inserted batches into the queue with a 0.5s delay (simulating disk/network I/O), while the inference loop consumed them. When inference was ready for the next batch, it often found it already loaded, eliminating idle time. The speedup came from overlapping I/O and compute: while the GPU processed batch N, the loader fetched batch N+1.
Common gotcha
The most common mistake is forgetting to move tokens to the correct device inside the async loader function. If you load on CPU and move to GPU during inference, you're still blocking the GPU: you've just moved the blocking operation. Always move to device in the loader: tokens = {k: v.to(model.device) for k, v in tokens.items()} before enqueueing. Another trap: queue size must be small (2-4) or you'll load the entire dataset into memory. Third: don't use a queue larger than your batch count or the loader will try to enqueue past your data.
Error recovery
RuntimeError: Expected all tensors to be on the same deviceasyncio.TimeoutError or hanging processOutOfMemoryError during async loadingData ordering corruptedExperienced dev note
In transformers 5.5.x, device_map='auto' handles multi-GPU scenarios, but async loading exposes a subtle issue: if your model spans multiple GPUs, moving tensors happens lazily on first forward pass, not during the async move. Pre-warm your model with one dummy batch before starting the async loop to avoid the first inference stalling while device mapping completes. Also, in production, use a proper queue library (asyncio.Queue is safer than deque with locks) and monitor prefetch lag: if the queue is consistently empty, your loader is too slow relative to inference, and you're not gaining the speedup. Finally, async loading shines with batching; if your inference processes single examples, overhead from task switching will erase gains.
Check your understanding
Why does moving tensors to GPU inside the async loader (before enqueueing) matter more than moving them in the inference loop, even though both work? What would happen if you loaded 100 batches into an unbounded queue before inference started?
Show answer hint
The answer requires understanding that moving to GPU is an I/O operation: if done in the inference loop, it blocks the GPU from running the model. If done in the loader, it overlaps with the previous model's inference, so the GPU never idles. The second part tests whether you understand that prefetch queues must be small: a large queue defeats the purpose by loading everything upfront, wasting memory and losing the concurrency benefit.
return_tensors='pt' produces PyTorch tensors directly. Also, device_map='auto' is required (not optional) in 5.5.x for multi-GPU; older versions allowed loading without it. If you're migrating from 4.x, ensure you pin device_map='auto' in from_pretrained().