Skip to content

How Should I Design AI Agents to Handle Failures Gracefully?

Problem

My AI agent crashed halfway through a 30-minute task. No error handling. No retry logic. No way to resume. I lost all progress.

Terminal window
# What happened
INFO: Agent started processing 1000 documents
INFO: Processed 523 documents successfully
ERROR: Connection timeout to OpenAI API
INFO: Container killed by cloud provider (idle too long)
# Result: All 523 processed documents - GONE
# No checkpoint. No state saved. Start from zero.

I spent three weeks building the happy path (agent gets input, does the thing, succeeds). Then I spent nine weeks retrofitting error handling, retry logic, and fallback behavior. I did it backwards.

This post shows how to design AI agents for failure first, so when (not if) something goes wrong, your agent can recover, resume, or degrade gracefully.

Environment

  • Python 3.12
  • LangGraph for agent orchestration
  • PostgreSQL for state persistence
  • Redis for caching
  • tenacity for retry logic

The Wrong Way: Happy Path First

Here’s how I used to build agents:

naive_agent.py
class NaiveAgent:
def __init__(self, llm_client):
self.llm = llm_client
async def process(self, documents: list[str]) -> list[str]:
results = []
for doc in documents:
# No error handling
# No state saving
# No retry logic
response = await self.llm.generate(doc)
results.append(response)
return results

This works great in demos. In production, it failed:

Terminal window
# Production failures I encountered
ERROR: Rate limit exceeded (429)
ERROR: Connection reset by peer
ERROR: Context length exceeded
ERROR: Model temporarily unavailable
ERROR: Container OOM killed
ERROR: Cloud provider killed container (idle timeout)

Each failure meant starting over. I lost hours of processing time.

The Right Way: Design the Unhappy Path First

Before writing any agent logic, I map out every failure scenario:

failure_scenarios.md
## Failure Scenarios
### Transient Failures (retry)
- API rate limits (429)
- Network timeouts
- Model temporarily unavailable
### Permanent Failures (fallback)
- Invalid input
- Context length exceeded
- Authentication errors
### Infrastructure Failures (checkpoint + resume)
- Container killed
- Memory exceeded
- Cloud provider timeout
### Partial Failures (continue with degraded mode)
- One tool unavailable
- Primary model down
- External API failed

Then I build handlers for each scenario before the happy path.

Persistent State with LangGraph Checkpointing

The biggest lesson: in-memory state dies with your container. I needed persistent state management.

Before: In-Memory State

memory_state.py
class InMemoryAgent:
def __init__(self):
self.state = {} # Dies when container dies
async def process(self, input: str):
self.state["input"] = input
self.state["step1_result"] = await self.step1(input)
# If container dies here, step1_result is lost
self.state["step2_result"] = await self.step2(self.state["step1_result"])
return self.state["step2_result"]

After: LangGraph with PostgreSQL Checkpointing

persistent_agent.py
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class AgentState(TypedDict):
input: str
step1_result: str
step2_result: str
errors: Annotated[list[str], operator.add]
# Define the graph
workflow = StateGraph(AgentState)
async def step1(state: AgentState) -> dict:
"""First processing step"""
try:
result = await llm.generate(state["input"])
return {"step1_result": result}
except Exception as e:
return {"errors": [f"step1 failed: {e}"]}
async def step2(state: AgentState) -> dict:
"""Second processing step"""
if "step1_result" not in state:
return {"errors": ["step1 result missing, skipping step2"]}
try:
result = await llm.generate(state["step1_result"])
return {"step2_result": result}
except Exception as e:
return {"errors": [f"step2 failed: {e}"]}
workflow.add_node("step1", step1)
workflow.add_node("step2", step2)
workflow.add_edge("step1", "step2")
workflow.add_edge("step2", END)
workflow.set_entry_point("step1")
# Setup PostgreSQL checkpointing
checkpointer = PostgresSaver(connection_string)
# Compile with persistence
app = workflow.compile(checkpointer=checkpointer)

Now when I run the agent:

run_with_checkpoint.py
import uuid
# Generate unique thread ID for this run
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
# Run agent
result = await app.ainvoke(
{"input": "Process this document"},
config=config
)
# If it crashes, I can resume from the checkpoint
# Get current state
state = await app.aget_state(config)
print(f"Current step: {state.values}")
print(f"Next step: {state.next}")

When the container crashes:

resume_from_crash.py
# Container crashed at step2
# Resume from last checkpoint
thread_id = "previous-thread-id" # Saved somewhere
config = {"configurable": {"thread_id": thread_id}}
# Get state - shows exactly where we stopped
state = await app.aget_state(config)
# Output: {'input': 'Process this document', 'step1_result': '...'}
# Resume execution from where it stopped
result = await app.ainvoke(None, config=config)

The checkpoint saves every state transition to PostgreSQL. When my cloud provider kills the container after 15 minutes, I just resume from the last checkpoint.

Retry Logic with Exponential Backoff

Not all failures need checkpoints. Transient failures (rate limits, timeouts) need retries with backoff.

My First Attempt: Simple Retry

simple_retry.py
async def call_with_retry(prompt: str, max_retries: int = 3):
for attempt in range(max_retries):
try:
return await llm.generate(prompt)
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2) # Always 2 seconds - BAD

This failed under rate limiting. I got more 429 errors because I didn’t wait long enough.

Better: Exponential Backoff with tenacity

retry_with_backoff.py
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log
)
import logging
logger = logging.getLogger(__name__)
# Define retryable exceptions
RETRYABLE_EXCEPTIONS = (
ConnectionError,
TimeoutError,
# Add your API-specific retryable errors
)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=2, max=60),
retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS),
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True
)
async def call_llm_with_retry(prompt: str) -> str:
"""Call LLM with automatic retry on transient failures"""
return await llm.generate(prompt)
# Usage
try:
result = await call_llm_with_retry("Analyze this document")
except Exception as e:
logger.error(f"All retries failed: {e}")
# Handle permanent failure

When I test with simulated rate limits:

Terminal window
# Retry behavior
Attempt 1: FAILED (429 rate limit)
Wait 2 seconds...
Attempt 2: FAILED (429 rate limit)
Wait 4 seconds...
Attempt 3: FAILED (429 rate limit)
Wait 8 seconds...
Attempt 4: SUCCESS
# Total wait: 14 seconds, but eventually succeeds

Even Better: Retry + Circuit Breaker

If the API is down for an extended period, retrying wastes resources. I added a circuit breaker:

circuit_breaker_agent.py
from circuit_breaker import CircuitBreaker
from datetime import timedelta
class ResilientLLMClient:
def __init__(self, llm_client, fallback_model=None):
self.llm = llm_client
self.fallback = fallback_model
self.circuit_breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=timedelta(seconds=60),
expected_exception=Exception
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30),
reraise=True
)
async def generate(self, prompt: str) -> str:
# Check circuit breaker first
if self.circuit_breaker.is_open():
if self.fallback:
return await self.fallback.generate(prompt)
raise Exception("Circuit breaker open, no fallback available")
try:
result = await self.llm.generate(prompt)
self.circuit_breaker.record_success()
return result
except Exception as e:
self.circuit_breaker.record_failure()
raise
# Usage
client = ResilientLLMClient(
llm_client=primary_llm,
fallback_model=backup_llm # Cheaper/faster fallback
)
# If primary fails 5 times, switch to fallback for 60 seconds
result = await client.generate("Process this")

Graceful Degradation Patterns

Sometimes the best response is a degraded response, not an error.

Pattern 1: Fallback Models

fallback_models.py
class DegradableAgent:
def __init__(self):
self.models = [
("claude-opus-4", True), # Primary, expensive
("claude-sonnet-4", True), # Secondary, cheaper
("claude-haiku-4", True), # Tertiary, fastest
("rule-based", False), # Fallback, no LLM
]
async def process(self, input: str) -> str:
for model_name, is_llm in self.models:
try:
if is_llm:
return await self.call_llm(model_name, input)
else:
return self.rule_based_fallback(input)
except Exception as e:
logger.warning(f"{model_name} failed: {e}")
continue
# All models failed
return self.error_response(input)
def rule_based_fallback(self, input: str) -> str:
"""No LLM available, use simple rules"""
if "summary" in input.lower():
return "I cannot generate a full summary right now. Please try again later."
if "translate" in input.lower():
return "Translation service temporarily unavailable."
return "I'm experiencing technical difficulties. Please try again in a few minutes."

Pattern 2: Partial Results with Caching

partial_results.py
class CachingAgent:
def __init__(self, cache, llm):
self.cache = cache # Redis
self.llm = llm
async def process_batch(self, items: list[str]) -> dict:
results = {}
errors = []
for i, item in enumerate(items):
cache_key = f"item:{hash(item)}"
# Check cache first
cached = await self.cache.get(cache_key)
if cached:
results[item] = cached
continue
try:
result = await self.llm.generate(item)
results[item] = result
await self.cache.set(cache_key, result, ttl=3600)
except Exception as e:
errors.append((item, str(e)))
# Continue processing other items
return {
"results": results,
"errors": errors,
"success_rate": len(results) / len(items)
}
# Usage
output = await agent.process_batch(documents)
# Returns:
# {
# "results": {"doc1": "...", "doc2": "..."},
# "errors": [("doc3", "timeout")],
# "success_rate": 0.67
# }

Pattern 3: Checkpoint Progress for Long Tasks

long_task_agent.py
class LongTaskAgent:
def __init__(self, checkpointer, llm):
self.checkpointer = checkpointer
self.llm = llm
async def process_large_dataset(
self,
dataset_id: str,
checkpoint_interval: int = 10
):
# Load checkpoint if exists
checkpoint = await self.checkpointer.load(dataset_id)
start_index = checkpoint.get("last_index", 0)
results = checkpoint.get("results", [])
items = await self.load_dataset(dataset_id)
for i in range(start_index, len(items)):
try:
result = await self.llm.generate(items[i])
results.append(result)
# Checkpoint every N items
if (i + 1) % checkpoint_interval == 0:
await self.checkpointer.save(dataset_id, {
"last_index": i + 1,
"results": results,
"timestamp": datetime.utcnow()
})
logger.info(f"Checkpointed at item {i + 1}")
except Exception as e:
# Save checkpoint before crashing
await self.checkpointer.save(dataset_id, {
"last_index": i,
"results": results,
"error": str(e)
})
raise
# Clear checkpoint on success
await self.checkpointer.clear(dataset_id)
return results

Infrastructure Considerations

Problem: Cloud Providers Kill Long-Running Containers

Terminal window
# AWS Lambda: 15 min max
# Google Cloud Functions: 9 min max
# Azure Functions: 10 min max (default)
# Cloud Run: 60 min max (configurable)

My agent took 45 minutes. Cloud Run killed it at 30 minutes (default timeout).

Solution: Queue-Based Architecture

queue_architecture.py
from dataclasses import dataclass
from typing import Optional
import json
@dataclass
class Task:
id: str
input: str
checkpoint: Optional[dict] = None
attempt: int = 0
class QueueBasedAgent:
def __init__(self, task_queue, checkpointer, max_runtime_seconds=300):
self.queue = task_queue
self.checkpointer = checkpointer
self.max_runtime = max_runtime_seconds
async def worker_loop(self):
"""Process tasks from queue with time limit"""
start_time = time.time()
while time.time() - start_time < self.max_runtime:
task = await self.queue.poll()
if not task:
break
try:
result = await self.process_with_checkpoint(task)
await self.queue.complete(task.id, result)
except Exception as e:
# Re-queue for another worker
task.attempt += 1
if task.attempt < 5:
await self.queue.requeue(task)
else:
await self.queue.fail(task.id, str(e))
async def process_with_checkpoint(self, task: Task) -> str:
# Load from checkpoint
state = task.checkpoint or {}
# Process with state
result = await self.do_work(task.input, state)
return result

Health Checks and Graceful Shutdown

health_check.py
import signal
import asyncio
class HealthyAgent:
def __init__(self):
self.shutdown_requested = False
self.current_task = None
# Handle shutdown signals
signal.signal(signal.SIGTERM, self._request_shutdown)
signal.signal(signal.SIGINT, self._request_shutdown)
def _request_shutdown(self, signum, frame):
logger.info(f"Received signal {signum}, requesting graceful shutdown")
self.shutdown_requested = True
async def run_forever(self):
while not self.shutdown_requested:
task = await self.queue.poll()
if task:
self.current_task = task
try:
await self.process_with_checkpoint(task)
finally:
self.current_task = None
# Graceful shutdown: save checkpoint
if self.current_task:
await self.checkpointer.save(self.current_task.id, self.current_task.state)
logger.info("Saved checkpoint before shutdown")
async def health_check(self) -> dict:
return {
"status": "healthy" if not self.shutdown_requested else "shutting_down",
"current_task": self.current_task.id if self.current_task else None,
"uptime_seconds": time.time() - self.start_time
}

Complete Resilient Agent

Putting it all together:

complete_resilient_agent.py
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph, END
from tenacity import retry, stop_after_attempt, wait_exponential
from circuit_breaker import CircuitBreaker
import logging
logger = logging.getLogger(__name__)
class ResilientAgent:
def __init__(
self,
primary_llm,
fallback_llm,
db_connection_string: str
):
# LLM clients with fallback
self.llm = ResilientLLMClient(primary_llm, fallback_llm)
# Persistence
self.checkpointer = PostgresSaver(db_connection_string)
# Build workflow
self.app = self._build_workflow()
def _build_workflow(self) -> StateGraph:
workflow = StateGraph(AgentState)
workflow.add_node("validate", self.validate_input)
workflow.add_node("process", self.process_with_retry)
workflow.add_node("postprocess", self.postprocess)
workflow.add_edge("validate", "process")
workflow.add_edge("process", "postprocess")
workflow.add_edge("postprocess", END)
workflow.set_entry_point("validate")
return workflow.compile(checkpointer=self.checkpointer)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30)
)
async def process_with_retry(self, state: AgentState) -> dict:
"""Process with automatic retry"""
try:
result = await self.llm.generate(state["validated_input"])
return {"result": result, "errors": []}
except Exception as e:
logger.error(f"Processing failed: {e}")
return {"errors": [str(e)]}
async def run(self, input: str, thread_id: str = None) -> dict:
"""Run agent with full resilience"""
thread_id = thread_id or str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
try:
result = await self.app.ainvoke(
{"input": input},
config=config
)
return {"success": True, "result": result, "thread_id": thread_id}
except Exception as e:
# Can resume with thread_id
logger.error(f"Agent failed, can resume with thread_id: {thread_id}")
return {
"success": False,
"error": str(e),
"thread_id": thread_id,
"can_resume": True
}
async def resume(self, thread_id: str) -> dict:
"""Resume from checkpoint"""
config = {"configurable": {"thread_id": thread_id}}
state = await self.app.aget_state(config)
if not state.values:
raise ValueError(f"No checkpoint found for thread {thread_id}")
# Resume from where it stopped
result = await self.app.ainvoke(None, config=config)
return {"success": True, "result": result}

Summary

In this post, I showed how to design AI agents for failure. The key principles:

  1. Design the unhappy path first - Map every failure scenario before writing agent logic
  2. Use persistent state - LangGraph checkpointing with PostgreSQL saves progress across container restarts
  3. Implement retry with backoff - tenacity library with exponential backoff handles transient failures
  4. Add circuit breakers - Stop retrying when services are down for extended periods
  5. Build graceful degradation - Fallback models, partial results, and rule-based backups
  6. Handle infrastructure limits - Queue-based architecture for long tasks, health checks for graceful shutdown

My first agent took 3 weeks to build and 9 weeks to make production-ready. My current agents take 2 weeks total because I design for failure from day one.

The production failures will happen. The question is whether your agent can handle them.

Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments