This tutorial demonstrates how to use Ollama’s API with async/await and various techniques to dramatically speed up your API calls. Ollama provides an OpenAI-compatible API, allowing us to use the familiar OpenAI client to interact with locally hosted models.

Prerequisites

Before starting, make sure you have:

  1. Ollama installed: Download from https://ollama.ai
  2. A model pulled: Run ollama pull llama3.2 (or mistral, codellama, etc.)
  3. Ollama running: Start the server with ollama serve
  4. Required packages: Install with pip install openai aiohttp


Setup and Configuration

import asyncio
import time
import aiohttp
from openai import AsyncOpenAI
from typing import List, Dict, Any

# Initialize the async client for Ollama
# Ollama runs on localhost:11434 by default and provides OpenAI-compatible API
client = AsyncOpenAI(
    base_url="http://localhost:11434/v1",  # Ollama's API endpoint
    api_key="ollama",  # Ollama doesn't require a real API key, but the client needs one
)

# Default model to use (change this to any model you have pulled with Ollama)
# Common models: llama2, mistral, codellama, phi, gemma, llama3.2, etc.
DEFAULT_MODEL = "llama3.2"


Part 0: Helper Functions

First, let’s create a helper function to list available models:

async def list_available_models() -> List[str]:
    """
    List all available Ollama models on your system.
    This uses Ollama's native API endpoint.
    """
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get("http://localhost:11434/api/tags") as response:
                if response.status == 200:
                    data = await response.json()
                    models = [model["name"] for model in data.get("models", [])]
                    return models
                else:
                    print(f"Error fetching models: {response.status}")
                    return []
    except Exception as e:
        print(f"Error connecting to Ollama: {e}")
        print("Make sure Ollama is running: ollama serve")
        return []


Part 1: Basic Async API Call

The foundation of all async operations is a basic async function:

async def basic_async_call(prompt: str, model: str = DEFAULT_MODEL) -> str:
    """
    Basic async call to Ollama API.
    This is the foundation for all async operations.
    
    Args:
        prompt: The prompt to send to the model
        model: The Ollama model to use (default: llama3.2)
    """
    response = await client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        temperature=0.7,
    )
    return response.choices[0].message.content


Part 2: Sequential vs Concurrent - Performance Comparison

The key to speed optimization is understanding the difference between sequential and concurrent processing:

Sequential Approach (SLOW)

async def sequential_calls(prompts: List[str]) -> List[str]:
    """
    Sequential approach: Process one request at a time.
    This is SLOW because each request waits for the previous one to complete.
    """
    results = []
    for prompt in prompts:
        result = await basic_async_call(prompt)
        results.append(result)
    return results

Concurrent Approach (FAST)

async def concurrent_calls(prompts: List[str]) -> List[str]:
    """
    Concurrent approach: Process all requests simultaneously.
    This is FAST because requests are sent in parallel.
    """
    # Create tasks for all prompts
    tasks = [basic_async_call(prompt) for prompt in prompts]
    # Wait for all tasks to complete
    results = await asyncio.gather(*tasks)
    return results

Performance Comparison Demo

async def compare_performance():
    """
    Compare the performance of sequential vs concurrent calls.
    """
    prompts = [
        "What is Python?",
        "What is JavaScript?",
        "What is Rust?",
        "What is Go?",
        "What is TypeScript?",
    ]
    
    print("=" * 60)
    print("Performance Comparison: Sequential vs Concurrent")
    print("=" * 60)
    
    # Sequential approach
    start_time = time.time()
    sequential_results = await sequential_calls(prompts)
    sequential_time = time.time() - start_time
    print(f"\nSequential approach: {sequential_time:.2f} seconds")
    
    # Concurrent approach
    start_time = time.time()
    concurrent_results = await concurrent_calls(prompts)
    concurrent_time = time.time() - start_time
    print(f"Concurrent approach: {concurrent_time:.2f} seconds")
    
    speedup = sequential_time / concurrent_time
    print(f"\nSpeedup: {speedup:.2f}x faster with concurrent calls!")
    print("=" * 60)

Expected Result: Concurrent calls are typically 3-5x faster for 5 prompts!


Part 3: Advanced Speed Optimization Techniques

Technique 1: Batch Processing with Semaphore

Use a semaphore to limit concurrent requests and prevent overwhelming the API:

async def batch_with_semaphore(
    prompts: List[str], 
    max_concurrent: int = 10
) -> List[str]:
    """
    Use a semaphore to limit concurrent requests.
    This prevents overwhelming the API and helps with rate limits.
    
    Args:
        prompts: List of prompts to process
        max_concurrent: Maximum number of concurrent requests
    """
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def bounded_call(prompt: str) -> str:
        async with semaphore:  # Limits concurrent execution
            return await basic_async_call(prompt)
    
    tasks = [bounded_call(prompt) for prompt in prompts]
    results = await asyncio.gather(*tasks)
    return results

Technique 2: Error Handling with Retry Logic

Robust error handling ensures speed without sacrificing reliability:

async def batch_with_error_handling(
    prompts: List[str],
    max_retries: int = 3
) -> List[Dict[str, Any]]:
    """
    Process prompts with retry logic and error handling.
    This ensures robustness while maintaining speed.
    """
    async def call_with_retry(prompt: str, index: int) -> Dict[str, Any]:
        for attempt in range(max_retries):
            try:
                result = await basic_async_call(prompt)
                return {
                    "index": index,
                    "prompt": prompt,
                    "result": result,
                    "success": True,
                    "attempts": attempt + 1
                }
            except Exception as e:
                if attempt == max_retries - 1:
                    return {
                        "index": index,
                        "prompt": prompt,
                        "result": None,
                        "success": False,
                        "error": str(e),
                        "attempts": attempt + 1
                    }
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
    
    tasks = [call_with_retry(prompt, i) for i, prompt in enumerate(prompts)]
    results = await asyncio.gather(*tasks)
    return results

Technique 3: Streaming for Better User Experience

Stream responses for improved perceived performance:

async def streaming_response(prompt: str, model: str = DEFAULT_MODEL):
    """
    Stream responses for better perceived performance.
    Users see results as they arrive, not all at once.
    
    Args:
        prompt: The prompt to send to the model
        model: The Ollama model to use (default: llama3.2)
    """
    stream = await client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )
    
    print(f"\nStreaming response for: '{prompt}'")
    print("-" * 60)
    full_response = ""
    async for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            content = chunk.choices[0].delta.content
            print(content, end="", flush=True)
            full_response += content
    print("\n" + "-" * 60)
    return full_response


Part 4: Practical Example - Document Processing

Here’s a real-world example of processing multiple documents concurrently:

async def process_documents_async(documents: List[str]) -> List[str]:
    """
    Practical example: Process multiple documents concurrently.
    """
    prompts = [
        f"Summarize the following text in one sentence:\n\n{doc}"
        for doc in documents
    ]
    
    # Use semaphore to limit concurrent requests (respect rate limits)
    summaries = await batch_with_semaphore(prompts, max_concurrent=5)
    return summaries


Part 5: Best Practices and Optimization Tips

Speed Optimization Checklist

  1. Use asyncio.gather() for parallel requests
    • Don’t await each call individually
    • Create all tasks first, then gather them
  2. Use Semaphores to control concurrency
    • Prevents overwhelming the API
    • Helps respect rate limits
    • Typical values: 5-20 concurrent requests
  3. Implement retry logic with exponential backoff
    • Handles transient errors
    • Prevents losing progress on failures
  4. Use streaming for long responses
    • Better user experience
    • Perceived performance improvement
  5. Batch similar requests together
    • Reduces overhead
    • More efficient resource usage
  6. Monitor resource usage
    • Ollama runs locally, limited by your hardware
    • Use semaphores to prevent overwhelming your system
    • Monitor GPU/CPU usage for concurrent requests
  7. Choose the right model
    • Smaller models (phi, gemma) are faster
    • Larger models (llama2, mistral) are more capable
    • Use the fastest model that meets your needs
  8. Cache responses when possible
    • Don’t re-request identical prompts
    • Use memoization for repeated queries


Part 6: Complete Working Example

Here’s a complete example demonstrating all techniques:

async def main():
    """
    Main function demonstrating various async techniques with Ollama.
    """
    global DEFAULT_MODEL
    
    print("\n" + "=" * 60)
    print("Ollama API Async Tutorial - Examples")
    print("=" * 60)
    
    # List available models
    print("\n0. Available Ollama Models:")
    available_models = await list_available_models()
    if available_models:
        print(f"Found {len(available_models)} model(s):")
        for model in available_models:
            print(f"  - {model}")
    else:
        print("No models found. Pull a model first: ollama pull llama3.2")
        return
    
    print(f"\nUsing model: {DEFAULT_MODEL}")
    if DEFAULT_MODEL not in available_models:
        print(f"Warning: {DEFAULT_MODEL} not found in available models!")
        if available_models:
            print(f"Using first available model: {available_models[0]}")
            DEFAULT_MODEL = available_models[0]
    print("=" * 60)
    
    # Example 1: Basic async call
    print("\n1. Basic Async Call:")
    result = await basic_async_call("Explain async/await in Python in one sentence.")
    print(f"Result: {result}")
    
    # Example 2: Performance comparison
    await compare_performance()
    
    # Example 3: Batch processing with semaphore
    print("\n3. Batch Processing with Semaphore:")
    test_prompts = [
        "What is machine learning?",
        "What is deep learning?",
        "What is neural network?",
    ]
    start_time = time.time()
    batch_results = await batch_with_semaphore(test_prompts, max_concurrent=3)
    batch_time = time.time() - start_time
    print(f"Processed {len(test_prompts)} prompts in {batch_time:.2f} seconds")
    for i, result in enumerate(batch_results):
        print(f"\nPrompt {i+1}: {result[:100]}...")
    
    # Example 4: Error handling
    print("\n4. Batch Processing with Error Handling:")
    results_with_errors = await batch_with_error_handling(test_prompts)
    for result in results_with_errors:
        status = "" if result["success"] else ""
        print(f"{status} Prompt {result['index']+1}: {result['success']}")
    
    # Example 5: Streaming
    print("\n5. Streaming Response:")
    await streaming_response("Explain async/await in Python in 2-3 sentences.")


if __name__ == "__main__":
    # Run the async main function
    asyncio.run(main())


Running the Tutorial

To run this tutorial:

# 1. Make sure Ollama is running
ollama serve

# 2. Pull a model if you haven't already
ollama pull llama3.2

# 3. Install required packages
pip install openai aiohttp

# 4. Run the script
python ollama_async_tutorial.py


Key Takeaways

  • Async is powerful: Concurrent API calls can be 3-5x faster than sequential
  • Control concurrency: Use semaphores to prevent overwhelming resources
  • Handle errors gracefully: Implement retry logic with exponential backoff
  • Stream for UX: Streaming improves perceived performance
  • Monitor resources: Local models are hardware-limited
  • Choose wisely: Balance model size with speed requirements

With these techniques, you can build fast, robust applications that leverage local LLMs efficiently!