Skip to content

How to Handle AI API Rate Limits with Fallback Strategies

Purpose

This post shows how to implement multi-provider fallback for AI APIs with automatic failover when you hit rate limits.

Problem

I built an AI-powered feature. It worked great in development. Then I deployed to production and hit rate limits immediately:

error-log.txt
ERROR: Rate limit exceeded for OpenAI API
INFO: Retrying in 60 seconds...
ERROR: Rate limit exceeded for OpenAI API
INFO: Retrying in 60 seconds...
ERROR: User request timeout after 120 seconds

My users saw errors. I saw lost revenue. All because I relied on a single API provider.

Environment

  • Python 3.11+
  • Asyncio for concurrent requests
  • Tenacity for retry logic
  • Multiple AI API keys (OpenAI, Anthropic, OpenRouter)

Solution: Multi-Provider Fallback

I implemented a fallback system with three key components:

  1. Provider abstraction - Same interface for all providers
  2. Health tracking - Know which providers are available
  3. Automatic failover - Switch providers on rate limit

Provider Abstraction

First, I created a common interface:

provider-base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class CompletionResult:
content: str
tokens_used: int
provider: str
model: str
latency_ms: float
@dataclass
class ProviderHealth:
name: str
is_healthy: bool
last_success: float
consecutive_failures: int
rate_limit_reset: Optional[float] # Unix timestamp
class AIProvider(ABC):
@abstractmethod
async def complete(
self,
prompt: str,
max_tokens: int = 1000,
temperature: float = 0.7
) -> CompletionResult:
"""Generate completion from this provider"""
pass
@abstractmethod
def get_health(self) -> ProviderHealth:
"""Check current health status"""
pass
@abstractmethod
def record_success(self, latency_ms: float):
"""Record successful call"""
pass
@abstractmethod
def record_failure(self, error: Exception):
"""Record failed call"""
pass

OpenAI Provider Implementation

openai-provider.py
import openai
import time
from typing import Optional
class OpenAIProvider(AIProvider):
def __init__(self, api_key: str, model: str = "gpt-4o-mini"):
self.client = openai.AsyncOpenAI(api_key=api_key)
self.model = model
self.name = "openai"
# Health tracking
self.is_healthy = True
self.last_success = time.time()
self.consecutive_failures = 0
self.rate_limit_reset: Optional[float] = None
async def complete(
self,
prompt: str,
max_tokens: int = 1000,
temperature: float = 0.7
) -> CompletionResult:
start = time.time()
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
temperature=temperature
)
latency = (time.time() - start) * 1000
self.record_success(latency)
return CompletionResult(
content=response.choices[0].message.content,
tokens_used=response.usage.total_tokens,
provider=self.name,
model=self.model,
latency_ms=latency
)
except openai.RateLimitError as e:
self.record_failure(e)
# Parse reset time from header if available
if hasattr(e, 'response') and e.response:
reset_header = e.response.headers.get('x-ratelimit-reset')
if reset_header:
self.rate_limit_reset = float(reset_header)
raise
except Exception as e:
self.record_failure(e)
raise
def get_health(self) -> ProviderHealth:
return ProviderHealth(
name=self.name,
is_healthy=self.is_healthy and not self._is_rate_limited(),
last_success=self.last_success,
consecutive_failures=self.consecutive_failures,
rate_limit_reset=self.rate_limit_reset
)
def _is_rate_limited(self) -> bool:
if self.rate_limit_reset is None:
return False
return time.time() < self.rate_limit_reset
def record_success(self, latency_ms: float):
self.is_healthy = True
self.last_success = time.time()
self.consecutive_failures = 0
self.rate_limit_reset = None
def record_failure(self, error: Exception):
self.consecutive_failures += 1
if self.consecutive_failures >= 3:
self.is_healthy = False

Anthropic Provider Implementation

anthropic-provider.py
import anthropic
import time
from typing import Optional
class AnthropicProvider(AIProvider):
def __init__(self, api_key: str, model: str = "claude-3-5-haiku-latest"):
self.client = anthropic.AsyncAnthropic(api_key=api_key)
self.model = model
self.name = "anthropic"
self.is_healthy = True
self.last_success = time.time()
self.consecutive_failures = 0
self.rate_limit_reset: Optional[float] = None
async def complete(
self,
prompt: str,
max_tokens: int = 1000,
temperature: float = 0.7
) -> CompletionResult:
start = time.time()
try:
response = await self.client.messages.create(
model=self.model,
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}],
temperature=temperature
)
latency = (time.time() - start) * 1000
self.record_success(latency)
return CompletionResult(
content=response.content[0].text,
tokens_used=response.usage.input_tokens + response.usage.output_tokens,
provider=self.name,
model=self.model,
latency_ms=latency
)
except anthropic.RateLimitError as e:
self.record_failure(e)
# Anthropic includes retry-after in error
if hasattr(e, 'retry_after'):
self.rate_limit_reset = time.time() + e.retry_after
raise
except Exception as e:
self.record_failure(e)
raise
def get_health(self) -> ProviderHealth:
return ProviderHealth(
name=self.name,
is_healthy=self.is_healthy and not self._is_rate_limited(),
last_success=self.last_success,
consecutive_failures=self.consecutive_failures,
rate_limit_reset=self.rate_limit_reset
)
def _is_rate_limited(self) -> bool:
if self.rate_limit_reset is None:
return False
return time.time() < self.rate_limit_reset
def record_success(self, latency_ms: float):
self.is_healthy = True
self.last_success = time.time()
self.consecutive_failures = 0
self.rate_limit_reset = None
def record_failure(self, error: Exception):
self.consecutive_failures += 1
if self.consecutive_failures >= 3:
self.is_healthy = False

Fallback Router

Now the key part - the router that handles failover:

fallback-router.py
import asyncio
import logging
from typing import Optional
from dataclasses import dataclass
logger = logging.getLogger(__name__)
class FallbackRouter:
def __init__(self, providers: list[AIProvider]):
self.providers = providers
self.current_index = 0
async def complete(
self,
prompt: str,
max_tokens: int = 1000,
temperature: float = 0.7
) -> CompletionResult:
"""Try providers in order until one succeeds"""
errors = []
# Get list of healthy providers, sorted by last success time
healthy_providers = self._get_healthy_providers()
if not healthy_providers:
# All providers unhealthy - try anyway as health check
logger.warning("All providers unhealthy, attempting recovery")
healthy_providers = self.providers
for provider in healthy_providers:
try:
result = await provider.complete(prompt, max_tokens, temperature)
logger.info(f"Success with {provider.name}")
return result
except Exception as e:
error_msg = f"{provider.name} failed: {str(e)}"
errors.append(error_msg)
logger.warning(error_msg)
# Check if this was a rate limit
health = provider.get_health()
if health.rate_limit_reset:
wait_time = health.rate_limit_reset - time.time()
logger.info(f"{provider.name} rate limited for {wait_time:.0f}s")
# Try next provider
continue
# All providers failed
raise RuntimeError(
f"All providers failed. Errors: {'; '.join(errors)}"
)
def _get_healthy_providers(self) -> list[AIProvider]:
"""Get healthy providers, sorted by preference"""
healthy = []
for provider in self.providers:
health = provider.get_health()
if health.is_healthy:
healthy.append(provider)
# Sort by last success time (prefer recently successful providers)
healthy.sort(key=lambda p: p.get_health().last_success, reverse=True)
return healthy
async def complete_with_retry(
self,
prompt: str,
max_tokens: int = 1000,
temperature: float = 0.7,
max_retries: int = 3
) -> CompletionResult:
"""Complete with additional retry logic at the router level"""
last_error = None
for attempt in range(max_retries):
try:
return await self.complete(prompt, max_tokens, temperature)
except RuntimeError as e:
last_error = e
if attempt < max_retries - 1:
# Wait before retry
wait_time = 2 ** attempt # Exponential backoff
logger.info(f"All providers failed, retrying in {wait_time}s")
await asyncio.sleep(wait_time)
raise last_error

Usage Example

Here’s how I use it in production:

usage-example.py
import os
from openai_provider import OpenAIProvider
from anthropic_provider import AnthropicProvider
from fallback_router import FallbackRouter
# Initialize providers
providers = [
OpenAIProvider(
api_key=os.environ["OPENAI_API_KEY"],
model="gpt-4o-mini"
),
AnthropicProvider(
api_key=os.environ["ANTHROPIC_API_KEY"],
model="claude-3-5-haiku-latest"
),
]
# Create router
router = FallbackRouter(providers)
async def generate_text(prompt: str) -> str:
"""Generate text with automatic fallback"""
try:
result = await router.complete_with_retry(
prompt=prompt,
max_tokens=500,
temperature=0.7,
max_retries=3
)
return result.content
except RuntimeError as e:
# All providers failed - implement your fallback
# (cached response, queue for later, etc.)
logger.error(f"Generation failed: {e}")
return "Sorry, I'm experiencing high load. Please try again."

Testing the Fallback

I tested by intentionally triggering rate limits:

test-fallback.py
import asyncio
import pytest
async def test_fallback_on_rate_limit():
"""Test that router falls back when rate limited"""
# Mock provider that raises rate limit
class MockRateLimitedProvider(AIProvider):
def __init__(self, name):
self.name = name
async def complete(self, prompt, max_tokens, temperature):
raise openai.RateLimitError("Rate limit exceeded")
def get_health(self):
return ProviderHealth(
name=self.name,
is_healthy=True,
last_success=time.time(),
consecutive_failures=0,
rate_limit_reset=None
)
def record_success(self, latency_ms):
pass
def record_failure(self, error):
pass
class MockWorkingProvider(AIProvider):
def __init__(self, name):
self.name = name
async def complete(self, prompt, max_tokens, temperature):
return CompletionResult(
content="Success!",
tokens_used=10,
provider=self.name,
model="mock",
latency_ms=100
)
def get_health(self):
return ProviderHealth(
name=self.name,
is_healthy=True,
last_success=time.time(),
consecutive_failures=0,
rate_limit_reset=None
)
def record_success(self, latency_ms):
pass
def record_failure(self, error):
pass
# Test setup
router = FallbackRouter([
MockRateLimitedProvider("failed_provider"),
MockWorkingProvider("working_provider")
])
result = await router.complete("test prompt")
assert result.content == "Success!"
assert result.provider == "working_provider"
if __name__ == "__main__":
asyncio.run(test_fallback_on_rate_limit())

Monitoring and Observability

I added logging to track provider health:

monitoring.py
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class ProviderMonitor:
def __init__(self, router: FallbackRouter):
self.router = router
def log_health_status(self):
"""Log current health of all providers"""
for provider in self.router.providers:
health = provider.get_health()
logger.info(
f"Provider {health.name}: "
f"healthy={health.is_healthy}, "
f"failures={health.consecutive_failures}, "
f"rate_limited_until={health.rate_limit_reset}"
)
def get_metrics(self) -> dict:
"""Get metrics for monitoring system"""
metrics = {}
for provider in self.router.providers:
health = provider.get_health()
metrics[f"provider_{health.name}_healthy"] = 1 if health.is_healthy else 0
metrics[f"provider_{health.name}_failures"] = health.consecutive_failures
return metrics

Summary

In this post, I showed how to implement multi-provider fallback for AI APIs. The key components are: provider abstraction (common interface for all providers), health tracking (know which providers are available), and automatic failover (switch providers on rate limit).

The result is a resilient system that handles rate limits gracefully. When OpenAI returns a 429, Anthropic picks up the request. Your users never see an error, and you get automatic load balancing across providers.

Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments