Skip to content

Why Do AI Agents Break in Production When They Work Fine Locally?

Problem

My AI agent worked perfectly on my laptop. I deployed it to production and it crashed halfway through a task with no way to resume.

[2026-03-12 14:32:15] Agent starting task: analyze_sales_data
[2026-03-12 14:32:18] Agent calling tool: fetch_database
[2026-03-12 14:32:45] Agent reasoning: found anomaly in Q3 data
[2026-03-12 14:33:02] Agent calling tool: send_email
[2026-03-12 14:35:00] ERROR: Container killed (timeout exceeded)
[2026-03-12 14:35:00] Task state: LOST

The agent died at step 5 of 8. All progress gone. No resume mechanism. No idea why it made certain decisions.

Environment

  • Python 3.12
  • LangGraph for agent orchestration
  • AWS Lambda for deployment (15 min timeout)
  • PostgreSQL for state persistence
  • Redis for caching

What Happened?

I built a data analysis agent locally. It ran great:

local_agent.py
class DataAnalysisAgent:
def __init__(self):
self.state = {} # In-memory state
self.llm = ChatOpenAI(model="gpt-4")
async def analyze(self, task: str):
# Step 1: Fetch data
self.state["data"] = await self.fetch_data(task)
# Step 2: Analyze
self.state["analysis"] = await self.llm.generate(
f"Analyze: {self.state['data']}"
)
# Step 3: Send report
await self.send_email(self.state["analysis"])
return self.state["analysis"]

On my laptop, this worked every time. When I deployed to AWS Lambda:

Terminal window
# Local test - works perfectly
python agent.py --task "analyze Q3 sales"
# Output: Analysis complete, email sent
# Production - crashes
aws lambda invoke --function-name data-analyzer response.json
# Error: Task timed out after 900 seconds
# State: GONE (in-memory state lost when container killed)

Why This Breaks in Production

I found four reasons my agent failed in production:

1. In-Memory State Evaporates

BAD_state.py
class Agent:
def __init__(self):
self.state = {} # Dies with container
async def run(self):
self.state["step1"] = await self.do_step1()
self.state["step2"] = await self.do_step2()
# If container dies here, state is GONE
self.state["step3"] = await self.do_step3()

When Lambda kills my container at 15 minutes, self.state disappears. There’s no way to recover.

2. Cloud Platforms Kill Containers

I didn’t realize how aggressive cloud platforms are:

PlatformKill TriggerWhat Happens
AWS Lambda15 min timeoutContainer destroyed
Cloud RunIdle (no requests for 15 min)Container scaled to zero
ECS/FargateMemory limit exceededTask killed

My agent needed 18 minutes. Lambda killed it at 15.

3. No Resume Mechanism

BAD_no_resume.py
async def run_agent(task):
# No checkpoint system
result1 = await step1(task) # If crash here...
result2 = await step2(result1) # Can't skip to here
result3 = await step3(result2)
return result3

When the agent crashes at step 2, I have to start from step 1 again. No way to resume from where it stopped.

4. Zero Observability

BAD_no_logs.py
class Agent:
async def run(self, task):
# No logging
# No tracing
# No cost tracking
result = await self.llm.generate(task)
return result

When production failed, I couldn’t answer:

  • What step was the agent on?
  • What data did it fetch?
  • Why did it make that decision?
  • How much did it cost before crashing?

How I Fixed It

I tried adding logging first. That didn’t solve the state problem.

attempt1_logging.py
import logging
class Agent:
async def run(self, task):
logging.info(f"Starting task: {task}")
result = await self.llm.generate(task)
logging.info(f"Result: {result}")
return result

This gave me logs but didn’t help resume tasks. I needed a different approach.

Solution 1: Persistent State with Checkpoints

I added LangGraph’s checkpoint system:

GOOD_persistent_state.py
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph
# Define state schema
class AgentState(TypedDict):
task: str
data: Optional[dict]
analysis: Optional[str]
current_step: int
# Create graph with checkpointing
checkpointer = PostgresSaver(connection_string)
graph = StateGraph(AgentState)
graph.add_node("fetch_data", fetch_data_node)
graph.add_node("analyze", analyze_node)
graph.add_node("send_email", send_email_node)
# Compile with checkpointer
app = graph.compile(checkpointer=checkpointer)
# Run with thread ID (enables resume)
config = {"configurable": {"thread_id": "task-123"}}
result = await app.ainvoke({"task": "analyze Q3"}, config)

Now when the agent crashes:

Terminal window
# Check what state was saved
SELECT * FROM checkpoints WHERE thread_id = 'task-123';
# Output
thread_id | current_step | data | analysis
-----------|--------------|----------------|----------
task-123 | 2 | {fetched...} | NULL

I can resume from step 2 instead of starting over.

Solution 2: Design for Interruption

I broke long tasks into smaller chunks:

GOOD_interruptible.py
from langgraph.graph import StateGraph
class ChunkedAgent:
async def run(self, task: str):
# Break into 5-minute chunks
chunks = await self.plan_chunks(task, max_duration=300)
for i, chunk in enumerate(chunks):
# Each chunk saves state before starting
await self.save_checkpoint(i, chunk)
# Execute chunk
result = await self.execute_chunk(chunk)
# Save result before next chunk
await self.save_result(i, result)

Each chunk stays under Lambda’s timeout. If one chunk fails, I resume from the last checkpoint.

Solution 3: Observability First

I added decision tracking before anything else:

GOOD_observability.py
from dataclasses import dataclass
from datetime import datetime
@dataclass
class AgentDecision:
timestamp: datetime
step: int
action: str
reasoning: str
tokens_used: int
cost_usd: float
class ObservableAgent:
def __init__(self):
self.decisions = []
async def decide(self, context: dict) -> str:
reasoning = await self.llm.generate(context)
decision = AgentDecision(
timestamp=datetime.now(),
step=self.current_step,
action=reasoning.action,
reasoning=reasoning.thought_process,
tokens_used=reasoning.tokens,
cost_usd=reasoning.cost
)
self.decisions.append(decision)
await self.db.insert("agent_decisions", decision)
return reasoning.action

Now I can query:

SELECT step, action, cost_usd
FROM agent_decisions
WHERE task_id = 'task-123'
ORDER BY timestamp;

Solution 4: Failure-First Architecture

I stopped designing happy paths. I started with failure modes:

GOOD_failure_first.py
class ResilientAgent:
async def run(self, task: str):
# Check: Can we resume?
existing = await self.load_checkpoint(task.id)
if existing:
return await self.resume(existing)
# Check: Will this timeout?
estimated = await self.estimate_duration(task)
if estimated > self.timeout_limit:
return await self.chunk_and_run(task)
# Check: Is LLM available?
if not await self.llm_health_check():
return await self.use_fallback(task)
# Now run with checkpoints
return await self.run_with_checkpoints(task)

Now I verify resume capability before starting.

The Complete Production-Ready Pattern

Here’s what I use now:

production_agent.py
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph
class ProductionAgent:
def __init__(self):
# Persistent state
self.checkpointer = PostgresSaver(DB_URL)
# Observability
self.logger = StructuredLogger()
self.metrics = MetricsClient()
# Failure handling
self.fallback = RuleBasedFallback()
async def run(self, task: str, thread_id: str):
# 1. Check for existing state
existing = await self.checkpointer.get(thread_id)
if existing and existing.current_step > 0:
self.logger.info("Resuming from checkpoint",
step=existing.current_step)
return await self.resume(existing)
# 2. Estimate duration
estimate = await self.estimate(task)
if estimate.duration > MAX_DURATION:
return await self.run_chunked(task, thread_id)
# 3. Run with checkpoints at each step
try:
result = await self.run_with_checkpoints(
task, thread_id
)
self.metrics.increment("agent.success")
return result
except TimeoutError:
self.logger.error("Agent timed out",
thread_id=thread_id,
last_step=self.current_step)
raise ResumableError(thread_id)
except Exception as e:
self.logger.error("Agent failed", error=str(e))
return await self.fallback.execute(task)

When I test with simulated failures:

Terminal window
# Test 1: Kill agent mid-run
python test_agent.py --kill-at-step 3
# Result: State saved at step 2
# Resume from step 3 works
# Test 2: Simulate timeout
python test_agent.py --timeout 60
# Result: Chunked execution
# All chunks complete within limits
# Test 3: LLM unavailable
python test_agent.py --llm-down
# Result: Fallback executed
# Task completed (with simpler logic)

The Real Lesson

I spent weeks debugging production failures. The framework choice (LangGraph vs CrewAI vs AutoGen) didn’t matter. What mattered was:

  1. State persistence - Can I resume after crash?
  2. Observability - Can I see what happened?
  3. Failure handling - What happens when things break?
  4. Time limits - Will this fit in my container’s lifetime?

A Reddit user FragrantBox4293 said it well: “The stack matters way less than people think. Python vs TypeScript, CrewAI vs LangGraph, these debates are mostly noise. The hard part is the infrastructure around your agent.”

Summary

In this post, I showed why AI agents break in production when they work locally. The key point is that local environments mask infrastructure problems: in-memory state disappears on crash, cloud platforms kill containers mid-execution, and there’s no way to resume or debug.

I fixed this by adding persistent state with checkpoints, designing for interruption, adding observability first, and building failure-first architecture. These infrastructure patterns matter more than which framework you choose.

Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments