How Should I Design AI Agents to Handle Failures Gracefully?
Problem
My AI agent crashed halfway through a 30-minute task. No error handling. No retry logic. No way to resume. I lost all progress.
# What happenedINFO: Agent started processing 1000 documentsINFO: Processed 523 documents successfullyERROR: Connection timeout to OpenAI APIINFO: Container killed by cloud provider (idle too long)
# Result: All 523 processed documents - GONE# No checkpoint. No state saved. Start from zero.I spent three weeks building the happy path (agent gets input, does the thing, succeeds). Then I spent nine weeks retrofitting error handling, retry logic, and fallback behavior. I did it backwards.
This post shows how to design AI agents for failure first, so when (not if) something goes wrong, your agent can recover, resume, or degrade gracefully.
Environment
- Python 3.12
- LangGraph for agent orchestration
- PostgreSQL for state persistence
- Redis for caching
- tenacity for retry logic
The Wrong Way: Happy Path First
Hereβs how I used to build agents:
class NaiveAgent: def __init__(self, llm_client): self.llm = llm_client
async def process(self, documents: list[str]) -> list[str]: results = [] for doc in documents: # No error handling # No state saving # No retry logic response = await self.llm.generate(doc) results.append(response) return resultsThis works great in demos. In production, it failed:
# Production failures I encounteredERROR: Rate limit exceeded (429)ERROR: Connection reset by peerERROR: Context length exceededERROR: Model temporarily unavailableERROR: Container OOM killedERROR: Cloud provider killed container (idle timeout)Each failure meant starting over. I lost hours of processing time.
The Right Way: Design the Unhappy Path First
Before writing any agent logic, I map out every failure scenario:
## Failure Scenarios
### Transient Failures (retry)- API rate limits (429)- Network timeouts- Model temporarily unavailable
### Permanent Failures (fallback)- Invalid input- Context length exceeded- Authentication errors
### Infrastructure Failures (checkpoint + resume)- Container killed- Memory exceeded- Cloud provider timeout
### Partial Failures (continue with degraded mode)- One tool unavailable- Primary model down- External API failedThen I build handlers for each scenario before the happy path.
Persistent State with LangGraph Checkpointing
The biggest lesson: in-memory state dies with your container. I needed persistent state management.
Before: In-Memory State
class InMemoryAgent: def __init__(self): self.state = {} # Dies when container dies
async def process(self, input: str): self.state["input"] = input self.state["step1_result"] = await self.step1(input) # If container dies here, step1_result is lost self.state["step2_result"] = await self.step2(self.state["step1_result"]) return self.state["step2_result"]After: LangGraph with PostgreSQL Checkpointing
from langgraph.checkpoint.postgres import PostgresSaverfrom langgraph.graph import StateGraph, ENDfrom typing import TypedDict, Annotatedimport operator
class AgentState(TypedDict): input: str step1_result: str step2_result: str errors: Annotated[list[str], operator.add]
# Define the graphworkflow = StateGraph(AgentState)
async def step1(state: AgentState) -> dict: """First processing step""" try: result = await llm.generate(state["input"]) return {"step1_result": result} except Exception as e: return {"errors": [f"step1 failed: {e}"]}
async def step2(state: AgentState) -> dict: """Second processing step""" if "step1_result" not in state: return {"errors": ["step1 result missing, skipping step2"]}
try: result = await llm.generate(state["step1_result"]) return {"step2_result": result} except Exception as e: return {"errors": [f"step2 failed: {e}"]}
workflow.add_node("step1", step1)workflow.add_node("step2", step2)workflow.add_edge("step1", "step2")workflow.add_edge("step2", END)workflow.set_entry_point("step1")
# Setup PostgreSQL checkpointingcheckpointer = PostgresSaver(connection_string)
# Compile with persistenceapp = workflow.compile(checkpointer=checkpointer)Now when I run the agent:
import uuid
# Generate unique thread ID for this runthread_id = str(uuid.uuid4())config = {"configurable": {"thread_id": thread_id}}
# Run agentresult = await app.ainvoke( {"input": "Process this document"}, config=config)
# If it crashes, I can resume from the checkpoint# Get current statestate = await app.aget_state(config)print(f"Current step: {state.values}")print(f"Next step: {state.next}")When the container crashes:
# Container crashed at step2# Resume from last checkpoint
thread_id = "previous-thread-id" # Saved somewhereconfig = {"configurable": {"thread_id": thread_id}}
# Get state - shows exactly where we stoppedstate = await app.aget_state(config)# Output: {'input': 'Process this document', 'step1_result': '...'}
# Resume execution from where it stoppedresult = await app.ainvoke(None, config=config)The checkpoint saves every state transition to PostgreSQL. When my cloud provider kills the container after 15 minutes, I just resume from the last checkpoint.
Retry Logic with Exponential Backoff
Not all failures need checkpoints. Transient failures (rate limits, timeouts) need retries with backoff.
My First Attempt: Simple Retry
async def call_with_retry(prompt: str, max_retries: int = 3): for attempt in range(max_retries): try: return await llm.generate(prompt) except Exception as e: if attempt == max_retries - 1: raise await asyncio.sleep(2) # Always 2 seconds - BADThis failed under rate limiting. I got more 429 errors because I didnβt wait long enough.
Better: Exponential Backoff with tenacity
from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, before_sleep_log)import logging
logger = logging.getLogger(__name__)
# Define retryable exceptionsRETRYABLE_EXCEPTIONS = ( ConnectionError, TimeoutError, # Add your API-specific retryable errors)
@retry( stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=60), retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS), before_sleep=before_sleep_log(logger, logging.WARNING), reraise=True)async def call_llm_with_retry(prompt: str) -> str: """Call LLM with automatic retry on transient failures""" return await llm.generate(prompt)
# Usagetry: result = await call_llm_with_retry("Analyze this document")except Exception as e: logger.error(f"All retries failed: {e}") # Handle permanent failureWhen I test with simulated rate limits:
# Retry behaviorAttempt 1: FAILED (429 rate limit)Wait 2 seconds...Attempt 2: FAILED (429 rate limit)Wait 4 seconds...Attempt 3: FAILED (429 rate limit)Wait 8 seconds...Attempt 4: SUCCESS
# Total wait: 14 seconds, but eventually succeedsEven Better: Retry + Circuit Breaker
If the API is down for an extended period, retrying wastes resources. I added a circuit breaker:
from circuit_breaker import CircuitBreakerfrom datetime import timedelta
class ResilientLLMClient: def __init__(self, llm_client, fallback_model=None): self.llm = llm_client self.fallback = fallback_model self.circuit_breaker = CircuitBreaker( failure_threshold=5, recovery_timeout=timedelta(seconds=60), expected_exception=Exception )
@retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=30), reraise=True ) async def generate(self, prompt: str) -> str: # Check circuit breaker first if self.circuit_breaker.is_open(): if self.fallback: return await self.fallback.generate(prompt) raise Exception("Circuit breaker open, no fallback available")
try: result = await self.llm.generate(prompt) self.circuit_breaker.record_success() return result except Exception as e: self.circuit_breaker.record_failure() raise
# Usageclient = ResilientLLMClient( llm_client=primary_llm, fallback_model=backup_llm # Cheaper/faster fallback)
# If primary fails 5 times, switch to fallback for 60 secondsresult = await client.generate("Process this")Graceful Degradation Patterns
Sometimes the best response is a degraded response, not an error.
Pattern 1: Fallback Models
class DegradableAgent: def __init__(self): self.models = [ ("claude-opus-4", True), # Primary, expensive ("claude-sonnet-4", True), # Secondary, cheaper ("claude-haiku-4", True), # Tertiary, fastest ("rule-based", False), # Fallback, no LLM ]
async def process(self, input: str) -> str: for model_name, is_llm in self.models: try: if is_llm: return await self.call_llm(model_name, input) else: return self.rule_based_fallback(input) except Exception as e: logger.warning(f"{model_name} failed: {e}") continue
# All models failed return self.error_response(input)
def rule_based_fallback(self, input: str) -> str: """No LLM available, use simple rules""" if "summary" in input.lower(): return "I cannot generate a full summary right now. Please try again later." if "translate" in input.lower(): return "Translation service temporarily unavailable." return "I'm experiencing technical difficulties. Please try again in a few minutes."Pattern 2: Partial Results with Caching
class CachingAgent: def __init__(self, cache, llm): self.cache = cache # Redis self.llm = llm
async def process_batch(self, items: list[str]) -> dict: results = {} errors = []
for i, item in enumerate(items): cache_key = f"item:{hash(item)}"
# Check cache first cached = await self.cache.get(cache_key) if cached: results[item] = cached continue
try: result = await self.llm.generate(item) results[item] = result await self.cache.set(cache_key, result, ttl=3600) except Exception as e: errors.append((item, str(e))) # Continue processing other items
return { "results": results, "errors": errors, "success_rate": len(results) / len(items) }
# Usageoutput = await agent.process_batch(documents)# Returns:# {# "results": {"doc1": "...", "doc2": "..."},# "errors": [("doc3", "timeout")],# "success_rate": 0.67# }Pattern 3: Checkpoint Progress for Long Tasks
class LongTaskAgent: def __init__(self, checkpointer, llm): self.checkpointer = checkpointer self.llm = llm
async def process_large_dataset( self, dataset_id: str, checkpoint_interval: int = 10 ): # Load checkpoint if exists checkpoint = await self.checkpointer.load(dataset_id) start_index = checkpoint.get("last_index", 0) results = checkpoint.get("results", [])
items = await self.load_dataset(dataset_id)
for i in range(start_index, len(items)): try: result = await self.llm.generate(items[i]) results.append(result)
# Checkpoint every N items if (i + 1) % checkpoint_interval == 0: await self.checkpointer.save(dataset_id, { "last_index": i + 1, "results": results, "timestamp": datetime.utcnow() }) logger.info(f"Checkpointed at item {i + 1}")
except Exception as e: # Save checkpoint before crashing await self.checkpointer.save(dataset_id, { "last_index": i, "results": results, "error": str(e) }) raise
# Clear checkpoint on success await self.checkpointer.clear(dataset_id) return resultsInfrastructure Considerations
Problem: Cloud Providers Kill Long-Running Containers
# AWS Lambda: 15 min max# Google Cloud Functions: 9 min max# Azure Functions: 10 min max (default)# Cloud Run: 60 min max (configurable)My agent took 45 minutes. Cloud Run killed it at 30 minutes (default timeout).
Solution: Queue-Based Architecture
from dataclasses import dataclassfrom typing import Optionalimport json
@dataclassclass Task: id: str input: str checkpoint: Optional[dict] = None attempt: int = 0
class QueueBasedAgent: def __init__(self, task_queue, checkpointer, max_runtime_seconds=300): self.queue = task_queue self.checkpointer = checkpointer self.max_runtime = max_runtime_seconds
async def worker_loop(self): """Process tasks from queue with time limit""" start_time = time.time()
while time.time() - start_time < self.max_runtime: task = await self.queue.poll() if not task: break
try: result = await self.process_with_checkpoint(task) await self.queue.complete(task.id, result) except Exception as e: # Re-queue for another worker task.attempt += 1 if task.attempt < 5: await self.queue.requeue(task) else: await self.queue.fail(task.id, str(e))
async def process_with_checkpoint(self, task: Task) -> str: # Load from checkpoint state = task.checkpoint or {}
# Process with state result = await self.do_work(task.input, state)
return resultHealth Checks and Graceful Shutdown
import signalimport asyncio
class HealthyAgent: def __init__(self): self.shutdown_requested = False self.current_task = None
# Handle shutdown signals signal.signal(signal.SIGTERM, self._request_shutdown) signal.signal(signal.SIGINT, self._request_shutdown)
def _request_shutdown(self, signum, frame): logger.info(f"Received signal {signum}, requesting graceful shutdown") self.shutdown_requested = True
async def run_forever(self): while not self.shutdown_requested: task = await self.queue.poll() if task: self.current_task = task try: await self.process_with_checkpoint(task) finally: self.current_task = None
# Graceful shutdown: save checkpoint if self.current_task: await self.checkpointer.save(self.current_task.id, self.current_task.state) logger.info("Saved checkpoint before shutdown")
async def health_check(self) -> dict: return { "status": "healthy" if not self.shutdown_requested else "shutting_down", "current_task": self.current_task.id if self.current_task else None, "uptime_seconds": time.time() - self.start_time }Complete Resilient Agent
Putting it all together:
from langgraph.checkpoint.postgres import PostgresSaverfrom langgraph.graph import StateGraph, ENDfrom tenacity import retry, stop_after_attempt, wait_exponentialfrom circuit_breaker import CircuitBreakerimport logging
logger = logging.getLogger(__name__)
class ResilientAgent: def __init__( self, primary_llm, fallback_llm, db_connection_string: str ): # LLM clients with fallback self.llm = ResilientLLMClient(primary_llm, fallback_llm)
# Persistence self.checkpointer = PostgresSaver(db_connection_string)
# Build workflow self.app = self._build_workflow()
def _build_workflow(self) -> StateGraph: workflow = StateGraph(AgentState)
workflow.add_node("validate", self.validate_input) workflow.add_node("process", self.process_with_retry) workflow.add_node("postprocess", self.postprocess)
workflow.add_edge("validate", "process") workflow.add_edge("process", "postprocess") workflow.add_edge("postprocess", END) workflow.set_entry_point("validate")
return workflow.compile(checkpointer=self.checkpointer)
@retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=30) ) async def process_with_retry(self, state: AgentState) -> dict: """Process with automatic retry""" try: result = await self.llm.generate(state["validated_input"]) return {"result": result, "errors": []} except Exception as e: logger.error(f"Processing failed: {e}") return {"errors": [str(e)]}
async def run(self, input: str, thread_id: str = None) -> dict: """Run agent with full resilience""" thread_id = thread_id or str(uuid.uuid4()) config = {"configurable": {"thread_id": thread_id}}
try: result = await self.app.ainvoke( {"input": input}, config=config ) return {"success": True, "result": result, "thread_id": thread_id} except Exception as e: # Can resume with thread_id logger.error(f"Agent failed, can resume with thread_id: {thread_id}") return { "success": False, "error": str(e), "thread_id": thread_id, "can_resume": True }
async def resume(self, thread_id: str) -> dict: """Resume from checkpoint""" config = {"configurable": {"thread_id": thread_id}} state = await self.app.aget_state(config)
if not state.values: raise ValueError(f"No checkpoint found for thread {thread_id}")
# Resume from where it stopped result = await self.app.ainvoke(None, config=config) return {"success": True, "result": result}Summary
In this post, I showed how to design AI agents for failure. The key principles:
- Design the unhappy path first - Map every failure scenario before writing agent logic
- Use persistent state - LangGraph checkpointing with PostgreSQL saves progress across container restarts
- Implement retry with backoff - tenacity library with exponential backoff handles transient failures
- Add circuit breakers - Stop retrying when services are down for extended periods
- Build graceful degradation - Fallback models, partial results, and rule-based backups
- Handle infrastructure limits - Queue-based architecture for long tasks, health checks for graceful shutdown
My first agent took 3 weeks to build and 9 weeks to make production-ready. My current agents take 2 weeks total because I design for failure from day one.
The production failures will happen. The question is whether your agent can handle them.
Final Words + More Resources
My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me
Here are also the most important links from this article along with some further resources that will help you in this scope:
- π¨βπ» Reddit Discussion on AI Agent Failures
- π¨βπ» LangGraph Checkpointing
- π¨βπ» Tenacity Retry Library
- π¨βπ» Circuit Breaker Pattern
Oh, and if you found these resources useful, donβt forget to support me by starring the repo on GitHub!
Comments