API Async Calls and Speed Optimization
This tutorial demonstrates how to use Ollama’s API with async/await and various techniques to dramatically speed up your API calls. Ollama provides an OpenAI-compatible API, allowing us to use the familiar OpenAI client to interact with locally hosted models.
Prerequisites
Before starting, make sure you have:
- Ollama installed: Download from https://ollama.ai
- A model pulled: Run
ollama pull llama3.2(or mistral, codellama, etc.) - Ollama running: Start the server with
ollama serve - Required packages: Install with
pip install openai aiohttp
Setup and Configuration
import asyncio
import time
import aiohttp
from openai import AsyncOpenAI
from typing import List, Dict, Any
# Initialize the async client for Ollama
# Ollama runs on localhost:11434 by default and provides OpenAI-compatible API
client = AsyncOpenAI(
base_url="http://localhost:11434/v1", # Ollama's API endpoint
api_key="ollama", # Ollama doesn't require a real API key, but the client needs one
)
# Default model to use (change this to any model you have pulled with Ollama)
# Common models: llama2, mistral, codellama, phi, gemma, llama3.2, etc.
DEFAULT_MODEL = "llama3.2"
Part 0: Helper Functions
First, let’s create a helper function to list available models:
async def list_available_models() -> List[str]:
"""
List all available Ollama models on your system.
This uses Ollama's native API endpoint.
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get("http://localhost:11434/api/tags") as response:
if response.status == 200:
data = await response.json()
models = [model["name"] for model in data.get("models", [])]
return models
else:
print(f"Error fetching models: {response.status}")
return []
except Exception as e:
print(f"Error connecting to Ollama: {e}")
print("Make sure Ollama is running: ollama serve")
return []
Part 1: Basic Async API Call
The foundation of all async operations is a basic async function:
async def basic_async_call(prompt: str, model: str = DEFAULT_MODEL) -> str:
"""
Basic async call to Ollama API.
This is the foundation for all async operations.
Args:
prompt: The prompt to send to the model
model: The Ollama model to use (default: llama3.2)
"""
response = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
)
return response.choices[0].message.content
Part 2: Sequential vs Concurrent - Performance Comparison
The key to speed optimization is understanding the difference between sequential and concurrent processing:
Sequential Approach (SLOW)
async def sequential_calls(prompts: List[str]) -> List[str]:
"""
Sequential approach: Process one request at a time.
This is SLOW because each request waits for the previous one to complete.
"""
results = []
for prompt in prompts:
result = await basic_async_call(prompt)
results.append(result)
return results
Concurrent Approach (FAST)
async def concurrent_calls(prompts: List[str]) -> List[str]:
"""
Concurrent approach: Process all requests simultaneously.
This is FAST because requests are sent in parallel.
"""
# Create tasks for all prompts
tasks = [basic_async_call(prompt) for prompt in prompts]
# Wait for all tasks to complete
results = await asyncio.gather(*tasks)
return results
Performance Comparison Demo
async def compare_performance():
"""
Compare the performance of sequential vs concurrent calls.
"""
prompts = [
"What is Python?",
"What is JavaScript?",
"What is Rust?",
"What is Go?",
"What is TypeScript?",
]
print("=" * 60)
print("Performance Comparison: Sequential vs Concurrent")
print("=" * 60)
# Sequential approach
start_time = time.time()
sequential_results = await sequential_calls(prompts)
sequential_time = time.time() - start_time
print(f"\nSequential approach: {sequential_time:.2f} seconds")
# Concurrent approach
start_time = time.time()
concurrent_results = await concurrent_calls(prompts)
concurrent_time = time.time() - start_time
print(f"Concurrent approach: {concurrent_time:.2f} seconds")
speedup = sequential_time / concurrent_time
print(f"\nSpeedup: {speedup:.2f}x faster with concurrent calls!")
print("=" * 60)
Expected Result: Concurrent calls are typically 3-5x faster for 5 prompts!
Part 3: Advanced Speed Optimization Techniques
Technique 1: Batch Processing with Semaphore
Use a semaphore to limit concurrent requests and prevent overwhelming the API:
async def batch_with_semaphore(
prompts: List[str],
max_concurrent: int = 10
) -> List[str]:
"""
Use a semaphore to limit concurrent requests.
This prevents overwhelming the API and helps with rate limits.
Args:
prompts: List of prompts to process
max_concurrent: Maximum number of concurrent requests
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_call(prompt: str) -> str:
async with semaphore: # Limits concurrent execution
return await basic_async_call(prompt)
tasks = [bounded_call(prompt) for prompt in prompts]
results = await asyncio.gather(*tasks)
return results
Technique 2: Error Handling with Retry Logic
Robust error handling ensures speed without sacrificing reliability:
async def batch_with_error_handling(
prompts: List[str],
max_retries: int = 3
) -> List[Dict[str, Any]]:
"""
Process prompts with retry logic and error handling.
This ensures robustness while maintaining speed.
"""
async def call_with_retry(prompt: str, index: int) -> Dict[str, Any]:
for attempt in range(max_retries):
try:
result = await basic_async_call(prompt)
return {
"index": index,
"prompt": prompt,
"result": result,
"success": True,
"attempts": attempt + 1
}
except Exception as e:
if attempt == max_retries - 1:
return {
"index": index,
"prompt": prompt,
"result": None,
"success": False,
"error": str(e),
"attempts": attempt + 1
}
await asyncio.sleep(2 ** attempt) # Exponential backoff
tasks = [call_with_retry(prompt, i) for i, prompt in enumerate(prompts)]
results = await asyncio.gather(*tasks)
return results
Technique 3: Streaming for Better User Experience
Stream responses for improved perceived performance:
async def streaming_response(prompt: str, model: str = DEFAULT_MODEL):
"""
Stream responses for better perceived performance.
Users see results as they arrive, not all at once.
Args:
prompt: The prompt to send to the model
model: The Ollama model to use (default: llama3.2)
"""
stream = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
stream=True,
)
print(f"\nStreaming response for: '{prompt}'")
print("-" * 60)
full_response = ""
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_response += content
print("\n" + "-" * 60)
return full_response
Part 4: Practical Example - Document Processing
Here’s a real-world example of processing multiple documents concurrently:
async def process_documents_async(documents: List[str]) -> List[str]:
"""
Practical example: Process multiple documents concurrently.
"""
prompts = [
f"Summarize the following text in one sentence:\n\n{doc}"
for doc in documents
]
# Use semaphore to limit concurrent requests (respect rate limits)
summaries = await batch_with_semaphore(prompts, max_concurrent=5)
return summaries
Part 5: Best Practices and Optimization Tips
Speed Optimization Checklist
- Use
asyncio.gather()for parallel requests- Don’t await each call individually
- Create all tasks first, then gather them
- Use Semaphores to control concurrency
- Prevents overwhelming the API
- Helps respect rate limits
- Typical values: 5-20 concurrent requests
- Implement retry logic with exponential backoff
- Handles transient errors
- Prevents losing progress on failures
- Use streaming for long responses
- Better user experience
- Perceived performance improvement
- Batch similar requests together
- Reduces overhead
- More efficient resource usage
- Monitor resource usage
- Ollama runs locally, limited by your hardware
- Use semaphores to prevent overwhelming your system
- Monitor GPU/CPU usage for concurrent requests
- Choose the right model
- Smaller models (phi, gemma) are faster
- Larger models (llama2, mistral) are more capable
- Use the fastest model that meets your needs
- Cache responses when possible
- Don’t re-request identical prompts
- Use memoization for repeated queries
Part 6: Complete Working Example
Here’s a complete example demonstrating all techniques:
async def main():
"""
Main function demonstrating various async techniques with Ollama.
"""
global DEFAULT_MODEL
print("\n" + "=" * 60)
print("Ollama API Async Tutorial - Examples")
print("=" * 60)
# List available models
print("\n0. Available Ollama Models:")
available_models = await list_available_models()
if available_models:
print(f"Found {len(available_models)} model(s):")
for model in available_models:
print(f" - {model}")
else:
print("No models found. Pull a model first: ollama pull llama3.2")
return
print(f"\nUsing model: {DEFAULT_MODEL}")
if DEFAULT_MODEL not in available_models:
print(f"Warning: {DEFAULT_MODEL} not found in available models!")
if available_models:
print(f"Using first available model: {available_models[0]}")
DEFAULT_MODEL = available_models[0]
print("=" * 60)
# Example 1: Basic async call
print("\n1. Basic Async Call:")
result = await basic_async_call("Explain async/await in Python in one sentence.")
print(f"Result: {result}")
# Example 2: Performance comparison
await compare_performance()
# Example 3: Batch processing with semaphore
print("\n3. Batch Processing with Semaphore:")
test_prompts = [
"What is machine learning?",
"What is deep learning?",
"What is neural network?",
]
start_time = time.time()
batch_results = await batch_with_semaphore(test_prompts, max_concurrent=3)
batch_time = time.time() - start_time
print(f"Processed {len(test_prompts)} prompts in {batch_time:.2f} seconds")
for i, result in enumerate(batch_results):
print(f"\nPrompt {i+1}: {result[:100]}...")
# Example 4: Error handling
print("\n4. Batch Processing with Error Handling:")
results_with_errors = await batch_with_error_handling(test_prompts)
for result in results_with_errors:
status = "✓" if result["success"] else "✗"
print(f"{status} Prompt {result['index']+1}: {result['success']}")
# Example 5: Streaming
print("\n5. Streaming Response:")
await streaming_response("Explain async/await in Python in 2-3 sentences.")
if __name__ == "__main__":
# Run the async main function
asyncio.run(main())
Running the Tutorial
To run this tutorial:
# 1. Make sure Ollama is running
ollama serve
# 2. Pull a model if you haven't already
ollama pull llama3.2
# 3. Install required packages
pip install openai aiohttp
# 4. Run the script
python ollama_async_tutorial.py
Key Takeaways
- Async is powerful: Concurrent API calls can be 3-5x faster than sequential
- Control concurrency: Use semaphores to prevent overwhelming resources
- Handle errors gracefully: Implement retry logic with exponential backoff
- Stream for UX: Streaming improves perceived performance
- Monitor resources: Local models are hardware-limited
- Choose wisely: Balance model size with speed requirements
With these techniques, you can build fast, robust applications that leverage local LLMs efficiently!
Enjoy Reading This Article?
Here are some more articles you might like to read next: