Skip to content

How to Handle Agent Timeout and Failure Recovery in Multi-Agent Systems? (CrewAI vs LangGraph)

I deployed my first multi-agent system to production last month. Everything worked perfectly in development. But the moment real API calls started happening, my agents started hanging. One agent would timeout, and the entire pipeline would freeze. No error messages, no recovery, just a frozen process eating up resources.

The error logs showed nothing useful:

production_logs.txt
[2026-04-10 14:32:15] Agent researcher_agent started
[2026-04-10 14:32:45] WARNING: API call taking longer than expected
[2026-04-10 14:33:15] ... (silence for 5 minutes)
[2026-04-10 14:38:22] Connection timeout (but the process never recovered)

After switching frameworks and implementing proper failure handling, I learned that timeout and failure recovery are not optional features—they’re survival requirements for production multi-agent systems.

Why Agents Fail in Production

Multi-agent systems face unique failure modes that single-agent systems don’t:

Cascading Failures: One agent’s timeout can block downstream agents. If Agent A never completes, Agent B waits forever. Agent C waits on Agent B. Your entire pipeline hangs.

Invisible Failures: Without proper instrumentation, you can’t tell which agent failed or why. You just know “something broke.”

Resource Exhaustion: Hung agents consume memory and connection pools. Over time, your system degrades until it crashes completely.

Here’s what a typical failure cascade looks like:

cascade_diagram.txt
┌─────────────┐
│ Agent A │ ← API timeout (60s)
│ (Research) │
└──────┬──────┘
│ BLOCKED
┌─────────────┐
│ Agent B │ ← Waiting forever
│ (Analysis) │
└──────┬──────┘
│ BLOCKED
┌─────────────┐
│ Agent C │ ← Never even starts
│ (Report) │
└─────────────┘
Result: Entire pipeline hangs for 5-10 minutes

My Initial Mistake with CrewAI

I started with CrewAI because it promised a simple API. Define agents, give them tasks, run the crew. Here’s what my initial code looked like:

crew_initial.py
from crewai import Agent, Task, Crew
researcher = Agent(
role="Researcher",
goal="Find information about the topic",
backstory="You are a helpful research assistant",
llm="gpt-4"
)
writer = Agent(
role="Writer",
goal="Write a blog post",
backstory="You are a skilled writer",
llm="gpt-4"
)
research_task = Task(
description="Research {topic}",
agent=researcher
)
write_task = Task(
description="Write a blog post based on research",
agent=writer
)
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task]
)
# This will hang forever if researcher times out
result = crew.kickoff()

The problem? If the researcher agent’s API call hangs or times out, the entire crew hangs. There’s no built-in way to:

  • See which agent failed
  • Retry just that agent
  • Set agent-level timeouts
  • Implement circuit breakers

As one developer on Reddit noted: “CrewAI broke down the moment one agent timed out and the whole crew hung… I couldn’t see where it failed.”

Implementing Timeout Handling in CrewAI

To make CrewAI production-ready, you need to wrap it with your own timeout and retry logic:

crew_timeout_handler.py
import signal
from contextlib import contextmanager
from typing import Optional, Any
from crewai import Crew
class TimeoutError(Exception):
pass
@contextmanager
def timeout_handler(seconds: int, error_message: str = "Operation timed out"):
"""Context manager for timeout handling."""
def signal_handler(signum, frame):
raise TimeoutError(error_message)
signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
def run_crew_with_timeout(
crew: Crew,
timeout_seconds: int = 300,
retries: int = 2
) -> Optional[Any]:
"""
Run a CrewAI crew with timeout and retry logic.
Args:
crew: The CrewAI crew to run
timeout_seconds: Maximum time per attempt
retries: Number of retry attempts
Returns:
Crew result or None if all attempts fail
"""
last_error = None
for attempt in range(retries + 1):
try:
with timeout_handler(timeout_seconds,
f"Crew timed out after {timeout_seconds}s"):
result = crew.kickoff()
print(f"Crew completed successfully on attempt {attempt + 1}")
return result
except TimeoutError as e:
last_error = e
print(f"Attempt {attempt + 1} timed out: {e}")
if attempt < retries:
print("Retrying...")
continue
except Exception as e:
last_error = e
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < retries:
print("Retrying...")
continue
print(f"All {retries + 1} attempts failed. Last error: {last_error}")
return None
# Usage
result = run_crew_with_timeout(
crew=crew,
timeout_seconds=180, # 3 minutes max
retries=2
)
if result is None:
print("Crew failed after all retries, implementing fallback...")
else:
print(f"Result: {result}")

This approach works, but it’s fragile. Signal-based timeouts don’t work well in threaded environments, and you still can’t see which agent failed—only that the crew failed.

A Better Approach: LangGraph with Node-Level Control

LangGraph was built with these production concerns in mind. Instead of a black-box crew execution, you define a graph of nodes (agents) with explicit state management and error handling at each step.

Here’s the same workflow in LangGraph:

langgraph_agents.py
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
import operator
# Define state that persists across nodes
class AgentState(TypedDict):
topic: str
research_results: str
blog_post: str
errors: Annotated[list, operator.add]
retry_count: int
# Create the graph
workflow = StateGraph(AgentState)
# Define nodes with explicit error handling
def research_node(state: AgentState) -> dict:
"""Research agent with built-in timeout."""
try:
llm = ChatOpenAI(model="gpt-4", request_timeout=60)
response = llm.invoke([
{"role": "system", "content": "You are a research assistant."},
{"role": "user", "content": f"Research this topic: {state['topic']}"}
])
return {
"research_results": response.content,
"errors": [] # Clear any previous errors
}
except Exception as e:
error_msg = f"Research agent failed: {str(e)}"
print(error_msg)
return {
"research_results": "",
"errors": [error_msg]
}
def write_node(state: AgentState) -> dict:
"""Writing agent that handles missing research."""
if not state.get("research_results"):
# Research failed, write with available info
print("Warning: No research results, writing with topic only")
context = state["topic"]
else:
context = state["research_results"]
try:
llm = ChatOpenAI(model="gpt-4", request_timeout=60)
response = llm.invoke([
{"role": "system", "content": "You are a skilled writer."},
{"role": "user", "content": f"Write a blog post based on: {context}"}
])
return {
"blog_post": response.content,
"errors": []
}
except Exception as e:
error_msg = f"Write agent failed: {str(e)}"
print(error_msg)
return {
"blog_post": "",
"errors": [error_msg]
}
def error_router(state: AgentState) -> str:
"""Decide whether to retry or end based on errors."""
if state.get("errors") and state.get("retry_count", 0) < 2:
return "retry"
elif state.get("errors"):
return "fail"
else:
return "continue"
# Add nodes to workflow
workflow.add_node("research", research_node)
workflow.add_node("write", write_node)
workflow.add_node("increment_retry", lambda s: {"retry_count": s.get("retry_count", 0) + 1})
# Define edges with conditional routing
workflow.set_entry_point("research")
workflow.add_conditional_edges(
"research",
error_router,
{
"continue": "write",
"retry": "increment_retry",
"fail": END
}
)
workflow.add_edge("increment_retry", "research")
workflow.add_edge("write", END)
# Compile and run
app = workflow.compile()
result = app.invoke({
"topic": "Agent timeout handling",
"research_results": "",
"blog_post": "",
"errors": [],
"retry_count": 0
})
print(f"Final blog post: {result.get('blog_post', 'Failed to generate')}")
print(f"Errors encountered: {result.get('errors', [])}")

The key difference: I can see exactly which node failed and why. Each agent is isolated, errors are captured in state, and I can route around failures or retry specific nodes.

As the Reddit user reported: “Switched to LangGraph, at least I could see exactly which node failed.”

Adding a Circuit Breaker Pattern

For production systems, you want to fail fast when external APIs are having issues. A circuit breaker prevents your system from repeatedly trying an operation that’s likely to fail.

circuit_breaker.py
import time
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject all calls
HALF_OPEN = "half_open" # Testing if recovered
@dataclass
class CircuitBreaker:
"""
Circuit breaker for agent API calls.
Prevents cascade failures when external APIs are down.
"""
failure_threshold: int = 3
recovery_timeout: int = 60 # seconds
state: CircuitState = CircuitState.CLOSED
failure_count: int = 0
last_failure_time: Optional[float] = None
def can_execute(self) -> bool:
"""Check if operation can proceed."""
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# Check if recovery timeout has passed
if (time.time() - self.last_failure_time) > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
return True
return False
# HALF_OPEN: allow one test request
return True
def record_success(self):
"""Record successful operation."""
self.failure_count = 0
self.state = CircuitState.CLOSED
def record_failure(self):
"""Record failed operation."""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker OPEN after {self.failure_count} failures")
def execute(self, operation: Callable, *args, **kwargs):
"""Execute operation with circuit breaker protection."""
if not self.can_execute():
raise Exception("Circuit breaker is OPEN - failing fast")
try:
result = operation(*args, **kwargs)
self.record_success()
return result
except Exception as e:
self.record_failure()
raise
# Usage in agent node
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=60)
def research_with_circuit_breaker(state: AgentState) -> dict:
"""Research agent with circuit breaker protection."""
try:
# This will fail fast if API is having issues
result = breaker.execute(
lambda: call_research_api(state["topic"])
)
return {"research_results": result, "errors": []}
except Exception as e:
if "Circuit breaker is OPEN" in str(e):
# Fail gracefully, use cached/fallback data
return {
"research_results": "Using cached research data...",
"errors": [f"Circuit breaker active: {e}"]
}
raise
def call_research_api(topic: str) -> str:
"""Simulated API call that might fail."""
# Your actual API call here
llm = ChatOpenAI(model="gpt-4", request_timeout=60)
response = llm.invoke([{"role": "user", "content": f"Research: {topic}"}])
return response.content

The Bounded Agent Pattern

The most reliable production pattern I’ve found is bounded agents—specialized agents with tight scopes and explicit constraints.

bounded_agent.py
from dataclasses import dataclass
from typing import Optional, List
from datetime import datetime
@dataclass
class AgentConstraints:
"""Define boundaries for agent execution."""
max_tokens: int = 4000
timeout_seconds: int = 30
max_retries: int = 2
required_inputs: List[str] = None
fallback_behavior: str = "skip" # or "retry", "fail", "use_cache"
class BoundedAgent:
"""
Agent with explicit boundaries and failure handling.
Key principle: Do ONE thing well, fail gracefully.
"""
def __init__(
self,
name: str,
description: str,
constraints: AgentConstraints,
llm: ChatOpenAI
):
self.name = name
self.description = description
self.constraints = constraints or AgentConstraints()
self.llm = llm
self.circuit_breaker = CircuitBreaker()
def validate_inputs(self, state: dict) -> bool:
"""Verify required inputs exist."""
if not self.constraints.required_inputs:
return True
for input_key in self.constraints.required_inputs:
if input_key not in state or not state[input_key]:
print(f"{self.name}: Missing required input '{input_key}'")
return False
return True
def execute(self, state: dict) -> dict:
"""Execute agent with all constraints enforced."""
# 1. Validate inputs
if not self.validate_inputs(state):
return self._fallback(state, "Invalid inputs")
# 2. Check circuit breaker
if not self.circuit_breaker.can_execute():
return self._fallback(state, "Circuit breaker open")
# 3. Execute with timeout and retries
for attempt in range(self.constraints.max_retries + 1):
try:
result = self._execute_with_timeout(state)
self.circuit_breaker.record_success()
return result
except TimeoutError:
print(f"{self.name} timed out on attempt {attempt + 1}")
if attempt == self.constraints.max_retries:
return self._fallback(state, "Max timeouts reached")
except Exception as e:
print(f"{self.name} failed on attempt {attempt + 1}: {e}")
self.circuit_breaker.record_failure()
if attempt == self.constraints.max_retries:
return self._fallback(state, str(e))
return self._fallback(state, "Unknown failure")
def _execute_with_timeout(self, state: dict) -> dict:
"""Execute LLM call with timeout."""
# Use threading or asyncio for actual timeout
# Simplified here for clarity
response = self.llm.invoke(
self._build_prompt(state),
max_tokens=self.constraints.max_tokens
)
return {f"{self.name}_output": response.content}
def _build_prompt(self, state: dict) -> list:
"""Build prompt from state."""
# Implement based on agent's specific task
return [{"role": "user", "content": str(state)}]
def _fallback(self, state: dict, reason: str) -> dict:
"""Handle failure based on configured behavior."""
if self.constraints.fallback_behavior == "skip":
return {
f"{self.name}_output": None,
f"{self.name}_error": reason,
f"{self.name}_skipped": True
}
elif self.constraints.fallback_behavior == "use_cache":
cached = state.get(f"{self.name}_cached", "")
return {
f"{self.name}_output": cached,
f"{self.name}_error": f"Used cache: {reason}"
}
else:
return {
f"{self.name}_output": "",
f"{self.name}_error": reason
}
# Define specialized bounded agents
research_agent = BoundedAgent(
name="researcher",
description="Finds relevant information about a topic",
constraints=AgentConstraints(
max_tokens=2000,
timeout_seconds=30,
max_retries=2,
required_inputs=["topic"],
fallback_behavior="skip"
),
llm=ChatOpenAI(model="gpt-4")
)
analysis_agent = BoundedAgent(
name="analyst",
description="Analyzes research results",
constraints=AgentConstraints(
max_tokens=3000,
timeout_seconds=45,
max_retries=1,
required_inputs=["researcher_output"],
fallback_behavior="use_cache"
),
llm=ChatOpenAI(model="gpt-4")
)
# Execute with confidence that failures won't cascade
state = {"topic": "Agent timeout handling", "analyst_cached": "Previous analysis..."}
state.update(research_agent.execute(state))
state.update(analysis_agent.execute(state))
print(f"Research result: {state.get('researcher_output')}")
print(f"Analysis result: {state.get('analyst_output')}")
print(f"Any errors: {state.get('researcher_error'), state.get('analyst_error')}")

Key Principles for Production Agents

Based on my experience and the patterns above, here are the essential practices:

Always set explicit timeouts. Never rely on default timeouts—they’re often too long or non-existent. Set them at the API level, agent level, and workflow level.

Make failure visible. Use state management to track errors. Log which agent failed, when, and why. This is where LangGraph’s node-based approach shines over CrewAI’s crew abstraction.

Use circuit breakers. When an external service starts failing, stop hitting it repeatedly. Let it recover, then test with a single request.

Specialize your agents. Each agent should do one thing well. This makes timeout predictions more accurate and limits blast radius when things fail.

Plan for partial success. Your system should produce useful output even when some agents fail. Design your workflow to route around failures.

Quick Reference: Timeout Configurations

timeout_reference.py
# LLM API timeouts (set at client level)
llm = ChatOpenAI(
model="gpt-4",
request_timeout=60, # Single API call timeout
max_retries=2 # Retries at API client level
)
# Agent-level timeouts
agent_timeout = 120 # Total agent execution time
# Workflow-level timeouts
workflow_timeout = 300 # Entire pipeline timeout
# Circuit breaker thresholds
failure_threshold = 3 # Failures before circuit opens
recovery_timeout = 60 # Time before testing recovery

When to Use Which Framework

Choose CrewAI when:

  • You’re prototyping or learning
  • Your agents are simple and don’t have external dependencies
  • You’re okay wrapping it with your own error handling

Choose LangGraph when:

  • You need visibility into which agent failed
  • You’re building for production
  • You need fine-grained control over retries and error routing
  • You want state management across agent steps

The Reddit comparison sums it up: CrewAI is great for getting started quickly. LangGraph is better when you need production reliability.

Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments