How to Handle AI API Rate Limits with Fallback Strategies
Purpose
This post shows how to implement multi-provider fallback for AI APIs with automatic failover when you hit rate limits.
Problem
I built an AI-powered feature. It worked great in development. Then I deployed to production and hit rate limits immediately:
ERROR: Rate limit exceeded for OpenAI APIINFO: Retrying in 60 seconds...ERROR: Rate limit exceeded for OpenAI APIINFO: Retrying in 60 seconds...ERROR: User request timeout after 120 secondsMy users saw errors. I saw lost revenue. All because I relied on a single API provider.
Environment
- Python 3.11+
- Asyncio for concurrent requests
- Tenacity for retry logic
- Multiple AI API keys (OpenAI, Anthropic, OpenRouter)
Solution: Multi-Provider Fallback
I implemented a fallback system with three key components:
- Provider abstraction - Same interface for all providers
- Health tracking - Know which providers are available
- Automatic failover - Switch providers on rate limit
Provider Abstraction
First, I created a common interface:
from abc import ABC, abstractmethodfrom dataclasses import dataclassfrom typing import Optionalimport time
@dataclassclass CompletionResult: content: str tokens_used: int provider: str model: str latency_ms: float
@dataclassclass ProviderHealth: name: str is_healthy: bool last_success: float consecutive_failures: int rate_limit_reset: Optional[float] # Unix timestamp
class AIProvider(ABC): @abstractmethod async def complete( self, prompt: str, max_tokens: int = 1000, temperature: float = 0.7 ) -> CompletionResult: """Generate completion from this provider""" pass
@abstractmethod def get_health(self) -> ProviderHealth: """Check current health status""" pass
@abstractmethod def record_success(self, latency_ms: float): """Record successful call""" pass
@abstractmethod def record_failure(self, error: Exception): """Record failed call""" passOpenAI Provider Implementation
import openaiimport timefrom typing import Optional
class OpenAIProvider(AIProvider): def __init__(self, api_key: str, model: str = "gpt-4o-mini"): self.client = openai.AsyncOpenAI(api_key=api_key) self.model = model self.name = "openai"
# Health tracking self.is_healthy = True self.last_success = time.time() self.consecutive_failures = 0 self.rate_limit_reset: Optional[float] = None
async def complete( self, prompt: str, max_tokens: int = 1000, temperature: float = 0.7 ) -> CompletionResult: start = time.time()
try: response = await self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], max_tokens=max_tokens, temperature=temperature )
latency = (time.time() - start) * 1000 self.record_success(latency)
return CompletionResult( content=response.choices[0].message.content, tokens_used=response.usage.total_tokens, provider=self.name, model=self.model, latency_ms=latency )
except openai.RateLimitError as e: self.record_failure(e) # Parse reset time from header if available if hasattr(e, 'response') and e.response: reset_header = e.response.headers.get('x-ratelimit-reset') if reset_header: self.rate_limit_reset = float(reset_header) raise
except Exception as e: self.record_failure(e) raise
def get_health(self) -> ProviderHealth: return ProviderHealth( name=self.name, is_healthy=self.is_healthy and not self._is_rate_limited(), last_success=self.last_success, consecutive_failures=self.consecutive_failures, rate_limit_reset=self.rate_limit_reset )
def _is_rate_limited(self) -> bool: if self.rate_limit_reset is None: return False return time.time() < self.rate_limit_reset
def record_success(self, latency_ms: float): self.is_healthy = True self.last_success = time.time() self.consecutive_failures = 0 self.rate_limit_reset = None
def record_failure(self, error: Exception): self.consecutive_failures += 1 if self.consecutive_failures >= 3: self.is_healthy = FalseAnthropic Provider Implementation
import anthropicimport timefrom typing import Optional
class AnthropicProvider(AIProvider): def __init__(self, api_key: str, model: str = "claude-3-5-haiku-latest"): self.client = anthropic.AsyncAnthropic(api_key=api_key) self.model = model self.name = "anthropic"
self.is_healthy = True self.last_success = time.time() self.consecutive_failures = 0 self.rate_limit_reset: Optional[float] = None
async def complete( self, prompt: str, max_tokens: int = 1000, temperature: float = 0.7 ) -> CompletionResult: start = time.time()
try: response = await self.client.messages.create( model=self.model, max_tokens=max_tokens, messages=[{"role": "user", "content": prompt}], temperature=temperature )
latency = (time.time() - start) * 1000 self.record_success(latency)
return CompletionResult( content=response.content[0].text, tokens_used=response.usage.input_tokens + response.usage.output_tokens, provider=self.name, model=self.model, latency_ms=latency )
except anthropic.RateLimitError as e: self.record_failure(e) # Anthropic includes retry-after in error if hasattr(e, 'retry_after'): self.rate_limit_reset = time.time() + e.retry_after raise
except Exception as e: self.record_failure(e) raise
def get_health(self) -> ProviderHealth: return ProviderHealth( name=self.name, is_healthy=self.is_healthy and not self._is_rate_limited(), last_success=self.last_success, consecutive_failures=self.consecutive_failures, rate_limit_reset=self.rate_limit_reset )
def _is_rate_limited(self) -> bool: if self.rate_limit_reset is None: return False return time.time() < self.rate_limit_reset
def record_success(self, latency_ms: float): self.is_healthy = True self.last_success = time.time() self.consecutive_failures = 0 self.rate_limit_reset = None
def record_failure(self, error: Exception): self.consecutive_failures += 1 if self.consecutive_failures >= 3: self.is_healthy = FalseFallback Router
Now the key part - the router that handles failover:
import asyncioimport loggingfrom typing import Optionalfrom dataclasses import dataclass
logger = logging.getLogger(__name__)
class FallbackRouter: def __init__(self, providers: list[AIProvider]): self.providers = providers self.current_index = 0
async def complete( self, prompt: str, max_tokens: int = 1000, temperature: float = 0.7 ) -> CompletionResult: """Try providers in order until one succeeds"""
errors = []
# Get list of healthy providers, sorted by last success time healthy_providers = self._get_healthy_providers()
if not healthy_providers: # All providers unhealthy - try anyway as health check logger.warning("All providers unhealthy, attempting recovery") healthy_providers = self.providers
for provider in healthy_providers: try: result = await provider.complete(prompt, max_tokens, temperature) logger.info(f"Success with {provider.name}") return result
except Exception as e: error_msg = f"{provider.name} failed: {str(e)}" errors.append(error_msg) logger.warning(error_msg)
# Check if this was a rate limit health = provider.get_health() if health.rate_limit_reset: wait_time = health.rate_limit_reset - time.time() logger.info(f"{provider.name} rate limited for {wait_time:.0f}s")
# Try next provider continue
# All providers failed raise RuntimeError( f"All providers failed. Errors: {'; '.join(errors)}" )
def _get_healthy_providers(self) -> list[AIProvider]: """Get healthy providers, sorted by preference""" healthy = []
for provider in self.providers: health = provider.get_health() if health.is_healthy: healthy.append(provider)
# Sort by last success time (prefer recently successful providers) healthy.sort(key=lambda p: p.get_health().last_success, reverse=True)
return healthy
async def complete_with_retry( self, prompt: str, max_tokens: int = 1000, temperature: float = 0.7, max_retries: int = 3 ) -> CompletionResult: """Complete with additional retry logic at the router level"""
last_error = None
for attempt in range(max_retries): try: return await self.complete(prompt, max_tokens, temperature)
except RuntimeError as e: last_error = e if attempt < max_retries - 1: # Wait before retry wait_time = 2 ** attempt # Exponential backoff logger.info(f"All providers failed, retrying in {wait_time}s") await asyncio.sleep(wait_time)
raise last_errorUsage Example
Hereβs how I use it in production:
import osfrom openai_provider import OpenAIProviderfrom anthropic_provider import AnthropicProviderfrom fallback_router import FallbackRouter
# Initialize providersproviders = [ OpenAIProvider( api_key=os.environ["OPENAI_API_KEY"], model="gpt-4o-mini" ), AnthropicProvider( api_key=os.environ["ANTHROPIC_API_KEY"], model="claude-3-5-haiku-latest" ),]
# Create routerrouter = FallbackRouter(providers)
async def generate_text(prompt: str) -> str: """Generate text with automatic fallback""" try: result = await router.complete_with_retry( prompt=prompt, max_tokens=500, temperature=0.7, max_retries=3 ) return result.content
except RuntimeError as e: # All providers failed - implement your fallback # (cached response, queue for later, etc.) logger.error(f"Generation failed: {e}") return "Sorry, I'm experiencing high load. Please try again."Testing the Fallback
I tested by intentionally triggering rate limits:
import asyncioimport pytest
async def test_fallback_on_rate_limit(): """Test that router falls back when rate limited"""
# Mock provider that raises rate limit class MockRateLimitedProvider(AIProvider): def __init__(self, name): self.name = name
async def complete(self, prompt, max_tokens, temperature): raise openai.RateLimitError("Rate limit exceeded")
def get_health(self): return ProviderHealth( name=self.name, is_healthy=True, last_success=time.time(), consecutive_failures=0, rate_limit_reset=None )
def record_success(self, latency_ms): pass
def record_failure(self, error): pass
class MockWorkingProvider(AIProvider): def __init__(self, name): self.name = name
async def complete(self, prompt, max_tokens, temperature): return CompletionResult( content="Success!", tokens_used=10, provider=self.name, model="mock", latency_ms=100 )
def get_health(self): return ProviderHealth( name=self.name, is_healthy=True, last_success=time.time(), consecutive_failures=0, rate_limit_reset=None )
def record_success(self, latency_ms): pass
def record_failure(self, error): pass
# Test setup router = FallbackRouter([ MockRateLimitedProvider("failed_provider"), MockWorkingProvider("working_provider") ])
result = await router.complete("test prompt")
assert result.content == "Success!" assert result.provider == "working_provider"
if __name__ == "__main__": asyncio.run(test_fallback_on_rate_limit())Monitoring and Observability
I added logging to track provider health:
import loggingfrom datetime import datetime
logger = logging.getLogger(__name__)
class ProviderMonitor: def __init__(self, router: FallbackRouter): self.router = router
def log_health_status(self): """Log current health of all providers""" for provider in self.router.providers: health = provider.get_health() logger.info( f"Provider {health.name}: " f"healthy={health.is_healthy}, " f"failures={health.consecutive_failures}, " f"rate_limited_until={health.rate_limit_reset}" )
def get_metrics(self) -> dict: """Get metrics for monitoring system""" metrics = {} for provider in self.router.providers: health = provider.get_health() metrics[f"provider_{health.name}_healthy"] = 1 if health.is_healthy else 0 metrics[f"provider_{health.name}_failures"] = health.consecutive_failures return metricsSummary
In this post, I showed how to implement multi-provider fallback for AI APIs. The key components are: provider abstraction (common interface for all providers), health tracking (know which providers are available), and automatic failover (switch providers on rate limit).
The result is a resilient system that handles rate limits gracefully. When OpenAI returns a 429, Anthropic picks up the request. Your users never see an error, and you get automatic load balancing across providers.
Final Words + More Resources
My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me
Here are also the most important links from this article along with some further resources that will help you in this scope:
- π¨βπ» OpenAI Rate Limits
- π¨βπ» Anthropic Rate Limits
- π¨βπ» Tenacity Library
Oh, and if you found these resources useful, donβt forget to support me by starring the repo on GitHub!
Comments