Skip to content

Why Am I Hitting AI API Rate Limits and How Do I Handle Them?

Problem

I was building an AI-powered code assistant when I hit a wall. Every few minutes, my app would crash with:

Error output
Error: Rate limit exceeded. Please try again later.
HTTP 429: Too Many Requests

I wasn’t spamming the API. I wasn’t doing anything unusual. Just normal usage with reasonable intervals between requests. Yet I kept hitting rate limits.

This is happening to developers everywhere. The problem is getting worse, not better. Here’s why and what to do about it.

What Happened?

My application made API calls to analyze code. Nothing fancy:

my_ai_client.py
import openai
def analyze_code(code: str):
response = openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": f"Review this code: {code}"}]
)
return response.choices[0].message.content

This worked fine during testing. But when I started processing multiple files in a loop, the errors appeared. I added a simple delay:

naive_fix.py
import time
for file in files:
time.sleep(1) # Wait 1 second between requests
result = analyze_code(file.content)

Still failed. I increased the delay to 5 seconds. Still failed. I was confused.

Why Rate Limits Are Hitting Everyone

The problem isn’t just you. AI providers are struggling with a perfect storm of demand.

The Agentic Workload Explosion

AI agents don’t make single API calls. They make hundreds or thousands of sequential calls. A single agent session can consume as much quota as hundreds of traditional users.

Traditional vs Agentic Usage
Traditional App:
User -> 1 API call -> Response
AI Agent:
User -> Agent -> API call 1 -> Response 1
-> API call 2 -> Response 2
-> API call 3 -> Response 3
-> ... (hundreds more)

Context Window Bloat

Modern models support 128k+ context windows. Sending that much data repeatedly consumes exponentially more compute resources than simple prompts. When an agent reads your entire codebase with 128k context, that’s like making dozens of normal API calls in terms of resource usage.

Infrastructure Strain

Nvidia NIM API endpoints and other provider infrastructure is buckling under load. Rolling timeouts and latency spikes trigger retry cascades that make everything worse.

Provider Response

Google and others are responding by:

  • Reducing free tier allowances
  • Implementing stricter rolling windows
  • Adding undocumented throttling during peak hours

Legitimate users are caught in the crossfire of rate limiting designed to stop abuse.

The Wrong Way to Handle Rate Limits

I made every mistake in the book.

Mistake 1: Immediate Retry on 429

bad_retry.py
# DON'T DO THIS
def call_api(prompt):
try:
return client.chat.completions.create(...)
except RateLimitError:
return call_api(prompt) # Immediate retry - BAD!

This wastes API quota on requests that will fail. It also contributes to provider overload and may get your API key temporarily blocked.

Mistake 2: Fixed Retry Delays

bad_delay.py
# DON'T DO THIS
import time
def call_api_with_retry(prompt):
for attempt in range(5):
try:
return client.chat.completions.create(...)
except RateLimitError:
time.sleep(5) # Fixed delay - BAD!

When many clients hit limits simultaneously, they all retry at the same time. This creates a thundering herd problem.

Mistake 3: Ignoring Rate Limit Headers

Most APIs return helpful headers:

Response headers
X-RateLimit-Limit: 60
X-RateLimit-Remaining: 3
X-RateLimit-Reset: 1709827200
Retry-After: 12

Ignoring these means you’re flying blind. You could proactively slow down before hitting the limit.

The Right Way: Exponential Backoff with Jitter

The gold standard is exponential backoff with randomized jitter:

Backoff formula
wait_time = base_delay * (2 ^ attempt) + random_jitter

Here’s a working implementation:

retry_with_backoff.py
import asyncio
import random
from typing import Callable, TypeVar
T = TypeVar('T')
async def retry_with_backoff(
func: Callable[..., T],
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
) -> T:
"""Retry with exponential backoff and jitter."""
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func()
except Exception as e:
# Check for rate limit error (429)
if hasattr(e, 'status_code') and e.status_code == 429:
last_exception = e
if attempt < max_retries:
# Calculate delay: 1s, 2s, 4s, 8s, 16s...
delay = min(base_delay * (2 ** attempt), max_delay)
# Add jitter to prevent thundering herd
delay = random.uniform(0, delay)
print(f"Rate limited. Retrying in {delay:.2f}s")
await asyncio.sleep(delay)
else:
raise
else:
# Non-rate-limit errors should not be retried
raise
raise last_exception
# Usage
async def call_api():
return await client.chat.completions.create(...)
result = await retry_with_backoff(call_api)

Why jitter? It prevents the thundering herd problem. When 100 clients all hit the rate limit at once, jitter spreads their retries across time instead of all retrying simultaneously.

Proactive Rate Limit Monitoring

Don’t wait for 429 errors. Track your usage against limits in real-time:

rate_limit_aware_client.py
import time
from dataclasses import dataclass
@dataclass
class RateLimitInfo:
limit: int
remaining: int
reset_time: float # Unix timestamp
class RateLimitAwareClient:
"""Proactively manage rate limits before hitting them."""
def __init__(self, requests_per_minute: int = 60):
self.requests_per_minute = requests_per_minute
self.min_interval = 60.0 / requests_per_minute
self.last_request_time = 0.0
self.rate_limit_info = None
def wait_if_needed(self):
"""Wait if approaching rate limit."""
now = time.time()
elapsed = now - self.last_request_time
# Enforce minimum interval between requests
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
# Check rate limit info from headers
if self.rate_limit_info:
if self.rate_limit_info.remaining <= 2:
wait_time = self.rate_limit_info.reset_time - now
if wait_time > 0:
print(f"Approaching limit. Waiting {wait_time:.1f}s")
time.sleep(wait_time + 1)
self.last_request_time = time.time()
def update_from_headers(self, headers: dict):
"""Track limits from response headers."""
try:
self.rate_limit_info = RateLimitInfo(
limit=int(headers.get('X-RateLimit-Limit', 0)),
remaining=int(headers.get('X-RateLimit-Remaining', 0)),
reset_time=int(headers.get('X-RateLimit-Reset', 0))
)
except (ValueError, TypeError):
pass

Circuit Breakers for API Calls

When the API is struggling, stop hammering it. A circuit breaker prevents cascading failures:

Circuit breaker states
(failures < threshold)
+------------------------+
| v
+-------+ failure +---------+ timeout +-----------+
| CLOSED| ------------> | OPEN | ------------> | HALF_OPEN |
+-------+ +---------+ +-----------+
^ | |
| | success | failure
| v v
+------------------------+ back to OPEN
circuit_breaker.py
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
"""Stop trying when API is down."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0
def can_execute(self) -> bool:
"""Check if we should try the API."""
now = time.time()
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# Has enough time passed to try again?
if now - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
return True
return False
# HALF_OPEN: allow one test request
return True
def record_success(self):
"""API call worked."""
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
def record_failure(self):
"""API call failed."""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN

Request Queue for Graceful Degradation

Queue requests instead of failing immediately:

rate_limited_queue.ts
interface QueueItem<T> {
request: () => Promise<T>;
resolve: (value: T) => void;
reject: (error: Error) => void;
attempts: number;
}
class RateLimitedQueue {
private queue: QueueItem<any>[] = [];
private processing = false;
private requestTimestamps: number[] = [];
constructor(
private maxRequestsPerMinute: number,
private maxRetries: number = 5,
private baseDelayMs: number = 1000,
) {}
async enqueue<T>(request: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
this.queue.push({ request, resolve, reject, attempts: 0 });
this.processQueue();
});
}
private async processQueue(): Promise<void> {
if (this.processing || this.queue.length === 0) return;
this.processing = true;
while (this.queue.length > 0) {
// Clean old timestamps (older than 1 minute)
const now = Date.now();
this.requestTimestamps = this.requestTimestamps.filter(
ts => now - ts < 60000
);
// Wait if at rate limit
if (this.requestTimestamps.length >= this.maxRequestsPerMinute) {
const oldestTimestamp = Math.min(...this.requestTimestamps);
const waitTime = 60000 - (now - oldestTimestamp) + 100;
await this.sleep(waitTime);
continue;
}
const item = this.queue.shift();
if (!item) break;
try {
this.requestTimestamps.push(Date.now());
const result = await item.request();
item.resolve(result);
} catch (error: any) {
if (error.status === 429 && item.attempts < this.maxRetries) {
item.attempts++;
const delay = this.calculateBackoff(item.attempts);
await this.sleep(delay);
this.queue.unshift(item); // Re-add to front
} else {
item.reject(error);
}
}
}
this.processing = false;
}
private calculateBackoff(attempt: number): number {
const delay = Math.min(
this.baseDelayMs * Math.pow(2, attempt - 1),
60000
);
return delay * (0.75 + Math.random() * 0.5); // Jitter
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Usage
const queue = new RateLimitedQueue(50); // 50 requests per minute
// All requests are automatically rate-limited and queued
const result = await queue.enqueue(() =>
openai.chat.completions.create({
model: "gpt-4",
messages: [{ role: "user", content: "Hello" }]
})
);

Multi-Provider Distribution

Spread requests across multiple providers for redundancy:

Multi-provider architecture
+---> OpenAI (GPT-4)
|
Your App --> Router --+---> Anthropic (Claude)
|
+---> Google (Gemini)
|
+---> Local Model (fallback)

When one provider hits limits, route to another. This also helps with cost optimization and reduces single points of failure.

Key Takeaways

I learned these lessons the hard way:

  1. Exponential backoff with jitter - The standard for handling rate limits. Prevents thundering herd.

  2. Monitor rate limit headers - Don’t wait for 429 errors. Track X-RateLimit-Remaining proactively.

  3. Use circuit breakers - Stop trying when the API is down. Fail fast after threshold failures.

  4. Queue requests - Don’t fail immediately. Queue and retry with backoff.

  5. Multi-provider distribution - Don’t rely on a single API. Spread load across providers.

Rate limits aren’t going away. They’ll likely become more restrictive as demand outpaces infrastructure. Build your applications to expect rate limits as normal operating conditions, not error cases.

Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments