How to build parallel AI pipelines
Quick answer
Use Python's
asyncio or concurrent.futures to run multiple AI API calls in parallel, leveraging SDKs like openai for concurrent chat.completions.create requests. This approach accelerates workflows by handling multiple prompts simultaneously.PREREQUISITES
Python 3.8+OpenAI API key (free tier works)pip install openai>=1.0
Setup
Install the openai Python SDK and set your API key as an environment variable for secure authentication.
pip install openai output
Collecting openai Downloading openai-1.x.x-py3-none-any.whl (xx kB) Installing collected packages: openai Successfully installed openai-1.x.x
Step by step
This example demonstrates running multiple AI chat completions in parallel using asyncio with the openai SDK v1. Each prompt is sent concurrently to the gpt-4o model.
import os
import asyncio
from openai import OpenAI
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
async def fetch_response(prompt: str) -> str:
response = await client.chat.completions.acreate(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
async def main():
prompts = [
"Explain quantum computing in simple terms.",
"Summarize the latest AI research trends.",
"Generate a creative story about a robot.",
]
tasks = [fetch_response(p) for p in prompts]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results, 1):
print(f"Response {i}: {result}\n")
if __name__ == "__main__":
asyncio.run(main()) output
Response 1: Quantum computing uses quantum bits that can be in multiple states simultaneously, enabling powerful computations. Response 2: Recent AI research focuses on large language models, multimodal learning, and efficient fine-tuning techniques. Response 3: Once upon a time, a curious robot explored the world, learning about humans and emotions.
Common variations
You can also use concurrent.futures.ThreadPoolExecutor for parallelism in synchronous code. Different models like gpt-4o-mini or claude-3-5-sonnet-20241022 can be used by changing the model parameter. For Anthropic, use their SDK with async support similarly.
import os
from concurrent.futures import ThreadPoolExecutor
from openai import OpenAI
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
def fetch_response_sync(prompt: str) -> str:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
prompts = [
"What is reinforcement learning?",
"Explain blockchain technology.",
"Write a poem about spring.",
]
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(fetch_response_sync, prompts))
for i, result in enumerate(results, 1):
print(f"Response {i}: {result}\n") output
Response 1: Reinforcement learning is a type of machine learning where agents learn by receiving rewards or penalties. Response 2: Blockchain is a decentralized ledger technology that ensures secure and transparent transactions. Response 3: Blossoms bloom and birds sing, spring awakens with vibrant colors.
Troubleshooting
- If you encounter rate limit errors, reduce concurrency or add retry logic with exponential backoff.
- Ensure your API key is correctly set in
os.environ["OPENAI_API_KEY"]. - For network timeouts, verify your internet connection and consider increasing timeout settings if supported.
Key Takeaways
- Use Python
asyncioorconcurrent.futuresto run AI API calls in parallel for faster throughput. - Leverage the latest
openaiSDK v1 async methods likeacreatefor concurrency. - Adjust concurrency based on API rate limits and model latency to optimize performance.
- Switch models easily by changing the
modelparameter in your API calls. - Handle errors gracefully with retries and environment variable checks for robust pipelines.