How to Handle Agent Timeout and Failure Recovery in Multi-Agent Systems? (CrewAI vs LangGraph)
I deployed my first multi-agent system to production last month. Everything worked perfectly in development. But the moment real API calls started happening, my agents started hanging. One agent would timeout, and the entire pipeline would freeze. No error messages, no recovery, just a frozen process eating up resources.
The error logs showed nothing useful:
[2026-04-10 14:32:15] Agent researcher_agent started[2026-04-10 14:32:45] WARNING: API call taking longer than expected[2026-04-10 14:33:15] ... (silence for 5 minutes)[2026-04-10 14:38:22] Connection timeout (but the process never recovered)After switching frameworks and implementing proper failure handling, I learned that timeout and failure recovery are not optional features—they’re survival requirements for production multi-agent systems.
Why Agents Fail in Production
Multi-agent systems face unique failure modes that single-agent systems don’t:
Cascading Failures: One agent’s timeout can block downstream agents. If Agent A never completes, Agent B waits forever. Agent C waits on Agent B. Your entire pipeline hangs.
Invisible Failures: Without proper instrumentation, you can’t tell which agent failed or why. You just know “something broke.”
Resource Exhaustion: Hung agents consume memory and connection pools. Over time, your system degrades until it crashes completely.
Here’s what a typical failure cascade looks like:
┌─────────────┐│ Agent A │ ← API timeout (60s)│ (Research) │└──────┬──────┘ │ BLOCKED ▼┌─────────────┐│ Agent B │ ← Waiting forever│ (Analysis) │└──────┬──────┘ │ BLOCKED ▼┌─────────────┐│ Agent C │ ← Never even starts│ (Report) │└─────────────┘
Result: Entire pipeline hangs for 5-10 minutesMy Initial Mistake with CrewAI
I started with CrewAI because it promised a simple API. Define agents, give them tasks, run the crew. Here’s what my initial code looked like:
from crewai import Agent, Task, Crew
researcher = Agent( role="Researcher", goal="Find information about the topic", backstory="You are a helpful research assistant", llm="gpt-4")
writer = Agent( role="Writer", goal="Write a blog post", backstory="You are a skilled writer", llm="gpt-4")
research_task = Task( description="Research {topic}", agent=researcher)
write_task = Task( description="Write a blog post based on research", agent=writer)
crew = Crew( agents=[researcher, writer], tasks=[research_task, write_task])
# This will hang forever if researcher times outresult = crew.kickoff()The problem? If the researcher agent’s API call hangs or times out, the entire crew hangs. There’s no built-in way to:
- See which agent failed
- Retry just that agent
- Set agent-level timeouts
- Implement circuit breakers
As one developer on Reddit noted: “CrewAI broke down the moment one agent timed out and the whole crew hung… I couldn’t see where it failed.”
Implementing Timeout Handling in CrewAI
To make CrewAI production-ready, you need to wrap it with your own timeout and retry logic:
import signalfrom contextlib import contextmanagerfrom typing import Optional, Anyfrom crewai import Crew
class TimeoutError(Exception): pass
@contextmanagerdef timeout_handler(seconds: int, error_message: str = "Operation timed out"): """Context manager for timeout handling.""" def signal_handler(signum, frame): raise TimeoutError(error_message)
signal.signal(signal.SIGALRM, signal_handler) signal.alarm(seconds) try: yield finally: signal.alarm(0)
def run_crew_with_timeout( crew: Crew, timeout_seconds: int = 300, retries: int = 2) -> Optional[Any]: """ Run a CrewAI crew with timeout and retry logic.
Args: crew: The CrewAI crew to run timeout_seconds: Maximum time per attempt retries: Number of retry attempts
Returns: Crew result or None if all attempts fail """ last_error = None
for attempt in range(retries + 1): try: with timeout_handler(timeout_seconds, f"Crew timed out after {timeout_seconds}s"): result = crew.kickoff() print(f"Crew completed successfully on attempt {attempt + 1}") return result
except TimeoutError as e: last_error = e print(f"Attempt {attempt + 1} timed out: {e}") if attempt < retries: print("Retrying...") continue
except Exception as e: last_error = e print(f"Attempt {attempt + 1} failed: {e}") if attempt < retries: print("Retrying...") continue
print(f"All {retries + 1} attempts failed. Last error: {last_error}") return None
# Usageresult = run_crew_with_timeout( crew=crew, timeout_seconds=180, # 3 minutes max retries=2)
if result is None: print("Crew failed after all retries, implementing fallback...")else: print(f"Result: {result}")This approach works, but it’s fragile. Signal-based timeouts don’t work well in threaded environments, and you still can’t see which agent failed—only that the crew failed.
A Better Approach: LangGraph with Node-Level Control
LangGraph was built with these production concerns in mind. Instead of a black-box crew execution, you define a graph of nodes (agents) with explicit state management and error handling at each step.
Here’s the same workflow in LangGraph:
from typing import TypedDict, Annotatedfrom langgraph.graph import StateGraph, ENDfrom langchain_openai import ChatOpenAIimport operator
# Define state that persists across nodesclass AgentState(TypedDict): topic: str research_results: str blog_post: str errors: Annotated[list, operator.add] retry_count: int
# Create the graphworkflow = StateGraph(AgentState)
# Define nodes with explicit error handlingdef research_node(state: AgentState) -> dict: """Research agent with built-in timeout.""" try: llm = ChatOpenAI(model="gpt-4", request_timeout=60)
response = llm.invoke([ {"role": "system", "content": "You are a research assistant."}, {"role": "user", "content": f"Research this topic: {state['topic']}"} ])
return { "research_results": response.content, "errors": [] # Clear any previous errors }
except Exception as e: error_msg = f"Research agent failed: {str(e)}" print(error_msg) return { "research_results": "", "errors": [error_msg] }
def write_node(state: AgentState) -> dict: """Writing agent that handles missing research.""" if not state.get("research_results"): # Research failed, write with available info print("Warning: No research results, writing with topic only") context = state["topic"] else: context = state["research_results"]
try: llm = ChatOpenAI(model="gpt-4", request_timeout=60)
response = llm.invoke([ {"role": "system", "content": "You are a skilled writer."}, {"role": "user", "content": f"Write a blog post based on: {context}"} ])
return { "blog_post": response.content, "errors": [] }
except Exception as e: error_msg = f"Write agent failed: {str(e)}" print(error_msg) return { "blog_post": "", "errors": [error_msg] }
def error_router(state: AgentState) -> str: """Decide whether to retry or end based on errors.""" if state.get("errors") and state.get("retry_count", 0) < 2: return "retry" elif state.get("errors"): return "fail" else: return "continue"
# Add nodes to workflowworkflow.add_node("research", research_node)workflow.add_node("write", write_node)workflow.add_node("increment_retry", lambda s: {"retry_count": s.get("retry_count", 0) + 1})
# Define edges with conditional routingworkflow.set_entry_point("research")workflow.add_conditional_edges( "research", error_router, { "continue": "write", "retry": "increment_retry", "fail": END })workflow.add_edge("increment_retry", "research")workflow.add_edge("write", END)
# Compile and runapp = workflow.compile()
result = app.invoke({ "topic": "Agent timeout handling", "research_results": "", "blog_post": "", "errors": [], "retry_count": 0})
print(f"Final blog post: {result.get('blog_post', 'Failed to generate')}")print(f"Errors encountered: {result.get('errors', [])}")The key difference: I can see exactly which node failed and why. Each agent is isolated, errors are captured in state, and I can route around failures or retry specific nodes.
As the Reddit user reported: “Switched to LangGraph, at least I could see exactly which node failed.”
Adding a Circuit Breaker Pattern
For production systems, you want to fail fast when external APIs are having issues. A circuit breaker prevents your system from repeatedly trying an operation that’s likely to fail.
import timefrom enum import Enumfrom dataclasses import dataclassfrom typing import Optional, Callable
class CircuitState(Enum): CLOSED = "closed" # Normal operation OPEN = "open" # Failing, reject all calls HALF_OPEN = "half_open" # Testing if recovered
@dataclassclass CircuitBreaker: """ Circuit breaker for agent API calls.
Prevents cascade failures when external APIs are down. """ failure_threshold: int = 3 recovery_timeout: int = 60 # seconds state: CircuitState = CircuitState.CLOSED failure_count: int = 0 last_failure_time: Optional[float] = None
def can_execute(self) -> bool: """Check if operation can proceed.""" if self.state == CircuitState.CLOSED: return True
if self.state == CircuitState.OPEN: # Check if recovery timeout has passed if (time.time() - self.last_failure_time) > self.recovery_timeout: self.state = CircuitState.HALF_OPEN return True return False
# HALF_OPEN: allow one test request return True
def record_success(self): """Record successful operation.""" self.failure_count = 0 self.state = CircuitState.CLOSED
def record_failure(self): """Record failed operation.""" self.failure_count += 1 self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN print(f"Circuit breaker OPEN after {self.failure_count} failures")
def execute(self, operation: Callable, *args, **kwargs): """Execute operation with circuit breaker protection.""" if not self.can_execute(): raise Exception("Circuit breaker is OPEN - failing fast")
try: result = operation(*args, **kwargs) self.record_success() return result
except Exception as e: self.record_failure() raise
# Usage in agent nodebreaker = CircuitBreaker(failure_threshold=3, recovery_timeout=60)
def research_with_circuit_breaker(state: AgentState) -> dict: """Research agent with circuit breaker protection.""" try: # This will fail fast if API is having issues result = breaker.execute( lambda: call_research_api(state["topic"]) ) return {"research_results": result, "errors": []}
except Exception as e: if "Circuit breaker is OPEN" in str(e): # Fail gracefully, use cached/fallback data return { "research_results": "Using cached research data...", "errors": [f"Circuit breaker active: {e}"] } raise
def call_research_api(topic: str) -> str: """Simulated API call that might fail.""" # Your actual API call here llm = ChatOpenAI(model="gpt-4", request_timeout=60) response = llm.invoke([{"role": "user", "content": f"Research: {topic}"}]) return response.contentThe Bounded Agent Pattern
The most reliable production pattern I’ve found is bounded agents—specialized agents with tight scopes and explicit constraints.
from dataclasses import dataclassfrom typing import Optional, Listfrom datetime import datetime
@dataclassclass AgentConstraints: """Define boundaries for agent execution.""" max_tokens: int = 4000 timeout_seconds: int = 30 max_retries: int = 2 required_inputs: List[str] = None fallback_behavior: str = "skip" # or "retry", "fail", "use_cache"
class BoundedAgent: """ Agent with explicit boundaries and failure handling.
Key principle: Do ONE thing well, fail gracefully. """
def __init__( self, name: str, description: str, constraints: AgentConstraints, llm: ChatOpenAI ): self.name = name self.description = description self.constraints = constraints or AgentConstraints() self.llm = llm self.circuit_breaker = CircuitBreaker()
def validate_inputs(self, state: dict) -> bool: """Verify required inputs exist.""" if not self.constraints.required_inputs: return True
for input_key in self.constraints.required_inputs: if input_key not in state or not state[input_key]: print(f"{self.name}: Missing required input '{input_key}'") return False
return True
def execute(self, state: dict) -> dict: """Execute agent with all constraints enforced."""
# 1. Validate inputs if not self.validate_inputs(state): return self._fallback(state, "Invalid inputs")
# 2. Check circuit breaker if not self.circuit_breaker.can_execute(): return self._fallback(state, "Circuit breaker open")
# 3. Execute with timeout and retries for attempt in range(self.constraints.max_retries + 1): try: result = self._execute_with_timeout(state) self.circuit_breaker.record_success() return result
except TimeoutError: print(f"{self.name} timed out on attempt {attempt + 1}") if attempt == self.constraints.max_retries: return self._fallback(state, "Max timeouts reached")
except Exception as e: print(f"{self.name} failed on attempt {attempt + 1}: {e}") self.circuit_breaker.record_failure() if attempt == self.constraints.max_retries: return self._fallback(state, str(e))
return self._fallback(state, "Unknown failure")
def _execute_with_timeout(self, state: dict) -> dict: """Execute LLM call with timeout.""" # Use threading or asyncio for actual timeout # Simplified here for clarity response = self.llm.invoke( self._build_prompt(state), max_tokens=self.constraints.max_tokens ) return {f"{self.name}_output": response.content}
def _build_prompt(self, state: dict) -> list: """Build prompt from state.""" # Implement based on agent's specific task return [{"role": "user", "content": str(state)}]
def _fallback(self, state: dict, reason: str) -> dict: """Handle failure based on configured behavior.""" if self.constraints.fallback_behavior == "skip": return { f"{self.name}_output": None, f"{self.name}_error": reason, f"{self.name}_skipped": True } elif self.constraints.fallback_behavior == "use_cache": cached = state.get(f"{self.name}_cached", "") return { f"{self.name}_output": cached, f"{self.name}_error": f"Used cache: {reason}" } else: return { f"{self.name}_output": "", f"{self.name}_error": reason }
# Define specialized bounded agentsresearch_agent = BoundedAgent( name="researcher", description="Finds relevant information about a topic", constraints=AgentConstraints( max_tokens=2000, timeout_seconds=30, max_retries=2, required_inputs=["topic"], fallback_behavior="skip" ), llm=ChatOpenAI(model="gpt-4"))
analysis_agent = BoundedAgent( name="analyst", description="Analyzes research results", constraints=AgentConstraints( max_tokens=3000, timeout_seconds=45, max_retries=1, required_inputs=["researcher_output"], fallback_behavior="use_cache" ), llm=ChatOpenAI(model="gpt-4"))
# Execute with confidence that failures won't cascadestate = {"topic": "Agent timeout handling", "analyst_cached": "Previous analysis..."}state.update(research_agent.execute(state))state.update(analysis_agent.execute(state))
print(f"Research result: {state.get('researcher_output')}")print(f"Analysis result: {state.get('analyst_output')}")print(f"Any errors: {state.get('researcher_error'), state.get('analyst_error')}")Key Principles for Production Agents
Based on my experience and the patterns above, here are the essential practices:
Always set explicit timeouts. Never rely on default timeouts—they’re often too long or non-existent. Set them at the API level, agent level, and workflow level.
Make failure visible. Use state management to track errors. Log which agent failed, when, and why. This is where LangGraph’s node-based approach shines over CrewAI’s crew abstraction.
Use circuit breakers. When an external service starts failing, stop hitting it repeatedly. Let it recover, then test with a single request.
Specialize your agents. Each agent should do one thing well. This makes timeout predictions more accurate and limits blast radius when things fail.
Plan for partial success. Your system should produce useful output even when some agents fail. Design your workflow to route around failures.
Quick Reference: Timeout Configurations
# LLM API timeouts (set at client level)llm = ChatOpenAI( model="gpt-4", request_timeout=60, # Single API call timeout max_retries=2 # Retries at API client level)
# Agent-level timeoutsagent_timeout = 120 # Total agent execution time
# Workflow-level timeoutsworkflow_timeout = 300 # Entire pipeline timeout
# Circuit breaker thresholdsfailure_threshold = 3 # Failures before circuit opensrecovery_timeout = 60 # Time before testing recoveryWhen to Use Which Framework
Choose CrewAI when:
- You’re prototyping or learning
- Your agents are simple and don’t have external dependencies
- You’re okay wrapping it with your own error handling
Choose LangGraph when:
- You need visibility into which agent failed
- You’re building for production
- You need fine-grained control over retries and error routing
- You want state management across agent steps
The Reddit comparison sums it up: CrewAI is great for getting started quickly. LangGraph is better when you need production reliability.
Final Words + More Resources
My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me
Here are also the most important links from this article along with some further resources that will help you in this scope:
- 👨💻 Reddit Discussion: CrewAI vs LangGraph comparison
- 👨💻 LangGraph How-Tos and Best Practices
- 👨💻 CrewAI Documentation
Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!
Comments