Async extraction pipeline Python
Quick answer
Use Python's
asyncio with the OpenAI SDK's async client to build an async extraction pipeline. This allows concurrent calls to client.chat.completions.acreate for efficient data extraction from multiple inputs.PREREQUISITES
Python 3.8+OpenAI API key (free tier works)pip install openai>=1.0
Setup
Install the latest openai Python package (v1+) and set your API key as an environment variable.
- Run
pip install openai - Set environment variable
OPENAI_API_KEYwith your OpenAI API key
pip install openai output
Collecting openai Downloading openai-1.x.x-py3-none-any.whl (xx kB) Installing collected packages: openai Successfully installed openai-1.x.x
Step by step
This example shows a complete async extraction pipeline that concurrently sends multiple extraction prompts to gpt-4o using asyncio and the OpenAI async client.
import os
import asyncio
from openai import OpenAI
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
async def extract_data(text: str) -> str:
messages = [
{"role": "user", "content": f"Extract the key information from this text:\n{text}"}
]
response = await client.chat.completions.acreate(
model="gpt-4o",
messages=messages,
max_tokens=200
)
return response.choices[0].message.content
async def main():
texts = [
"John Doe, age 30, lives in New York.",
"The product launch is scheduled for May 2026.",
"Contact us at support@example.com for assistance."
]
tasks = [extract_data(text) for text in texts]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
print(f"Extraction {i+1}: {result}\n")
if __name__ == "__main__":
asyncio.run(main()) output
Extraction 1: Name: John Doe Age: 30 Location: New York Extraction 2: Product launch date: May 2026 Extraction 3: Contact email: support@example.com
Common variations
You can adapt this pipeline by:
- Using streaming with
stream=Trueand async iteration for partial results. - Switching to other models like
gpt-4o-minifor cost efficiency. - Integrating with other async frameworks like
FastAPIfor web endpoints.
async def extract_data_stream(text: str):
messages = [{"role": "user", "content": f"Extract info:\n{text}"}]
stream = await client.chat.completions.acreate(
model="gpt-4o-mini",
messages=messages,
max_tokens=200,
stream=True
)
async for chunk in stream:
delta = chunk.choices[0].delta.content or ""
print(delta, end="", flush=True)
# Usage example:
# asyncio.run(extract_data_stream("Sample text to extract.")) output
Name: John Doe Age: 30 Location: New York
Troubleshooting
- If you get
RuntimeError: Event loop is closed, ensure you runasyncio.run()only once at the entry point. - If API calls fail, verify your
OPENAI_API_KEYenvironment variable is set correctly. - For rate limits, implement exponential backoff or limit concurrency with
asyncio.Semaphore.
Key Takeaways
- Use OpenAI's async client method
acreatefor concurrent extraction calls. - Leverage
asyncio.gatherto run multiple extraction tasks efficiently. - Streaming responses enable partial result processing for large extractions.
- Always secure your API key via environment variables, never hardcode.
- Handle rate limits and event loop errors explicitly in async pipelines.