Why Production Teams Migrate from LangGraph to Custom Python
Purpose
I started building AI agents with LangGraph about six months ago. The framework helped me prototype quickly. But when I moved to production, I hit walls. This post explains why production teams migrate from agent frameworks to custom Python, and what triggers the switch.
Problem
I built a multi-agent system using LangGraph for a content automation pipeline. It worked great in development. In production, I encountered issues that required fighting the framework instead of working with it.
ERROR: StateGraph memory serialization failed - Custom memory backend not supported by checkpointer - Retry logic inconsistent with business requirements - Token usage optimization blocked by framework layersThe framework imposed opinions that didn’t match my production needs. I spent more time working around LangGraph than building features.
The Migration Pattern
Here’s what I learned from Reddit discussions and my own experience: this isn’t a framework failure. It’s an evolution.
Phase 1: Prototype with LangGraph (fast, learn quickly)Phase 2: Hit production requirements (memory, retries, cost control)Phase 3: Fight framework abstractionsPhase 4: Migrate to custom PythonA Reddit user put it well: “Most teams start with a framework, then slowly remove it. Frameworks are great for prototyping orchestration. But once agents hit real workloads, teams usually want tighter control.”
Pain Point 1: Memory Management
LangGraph imposes opinionated memory patterns through its checkpointer system. When I needed a custom memory backend, I had to implement framework-specific adapters.
from langgraph import StateGraphfrom typing import TypedDict, Any, List
class AgentState(TypedDict): messages: List[Any] memory: Any # Framework memory doesn't match our needs
# The problem: framework expects checkpointer interfacedef custom_memory_node(state): # Framework doesn't support our memory pattern # So we hack around it... from my_app.memory import RedisMemoryBackend
external_memory = RedisMemoryBackend() state["memory"] = external_memory.load(state["session_id"]) return state
# This works but feels wrong# We're fighting the framework, not using itThe migration target was cleaner:
from dataclasses import dataclassfrom typing import Protocol
class MemoryBackend(Protocol): def retrieve(self, session_id: str) -> dict: ... def store(self, session_id: str, data: dict) -> None: ...
@dataclassclass ProductionAgent: memory: MemoryBackend session_id: str
def get_context(self) -> dict: return self.memory.retrieve(self.session_id)
def save_context(self, data: dict) -> None: self.memory.store(self.session_id, data)Now I can swap Redis for PostgreSQL, add caching layers, or implement TTL without framework constraints.
Pain Point 2: Retry and Failure Handling
LangGraph’s retry logic didn’t match my business requirements. I needed custom backoff strategies and circuit breakers.
from langgraph.pregel import RetryPolicy
# Framework retry options are limitedretry_policy = RetryPolicy( max_attempts=3, initial_interval=1.0, # Can't easily customize: # - Per-error-type backoff # - Circuit breaker integration # - Rate limit aware retries)
# I ended up wrapping the framework callasync def safe_graph_invoke(graph, input_data): for attempt in range(5): try: return await graph.ainvoke(input_data) except RateLimitError: await asyncio.sleep(60 * (attempt + 1)) except TimeoutError: if attempt == 4: raise await asyncio.sleep(2 ** attempt)With custom Python, retry logic became explicit and testable:
from tenacity import retry, stop_after_attempt, wait_exponentialfrom circuit_breaker import CircuitBreaker
class ProductionAgent: def __init__(self): self.circuit_breaker = CircuitBreaker( failure_threshold=5, recovery_timeout=60 )
@retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10) ) async def run(self, query: str) -> Response: if self.circuit_breaker.is_open(): return await self.fallback(query)
try: response = await self._execute(query) self.circuit_breaker.record_success() return response except (TimeoutError, APIError) as e: self.circuit_breaker.record_failure() raisePain Point 3: Tool Execution Control
Framework tool interfaces felt restrictive. I needed custom validation, sanitization, and parallel execution patterns.
from langchain.tools import Tool
# Framework expects specific interfacedef my_tool(input: str) -> str: # Input validation? Sanitization? Logging? # Framework hooks are limited return process(input)
tool = Tool( name="my_tool", func=my_tool, description="Does something")
# Parallel tool execution with dependencies?# The framework has opinions about execution orderCustom Python gave me explicit control:
from dataclasses import dataclassfrom typing import Callable, Anyimport asyncio
@dataclassclass ToolResult: success: bool output: Any error: str | None
class ToolExecutor: def __init__(self, tools: dict[str, Callable]): self.tools = tools self.validators = {}
def register_validator(self, tool_name: str, validator: Callable): self.validators[tool_name] = validator
async def execute(self, tool_name: str, **params) -> ToolResult: # Pre-execution validation if tool_name in self.validators: if not self.validators[tool_name](params): return ToolResult(False, None, "Validation failed")
# Sanitize inputs params = self._sanitize(params)
# Execute with timeout try: result = await asyncio.wait_for( self.tools[tool_name](**params), timeout=30.0 ) return ToolResult(True, result, None) except asyncio.TimeoutError: return ToolResult(False, None, "Timeout")
async def execute_parallel(self, calls: list[tuple[str, dict]]) -> list[ToolResult]: tasks = [self.execute(name, **params) for name, params in calls] return await asyncio.gather(*tasks)Pain Point 4: Cost and Latency Optimization
Framework overhead adds latency. Token usage patterns are harder to optimize when you don’t control the execution flow.
Framework overhead: 50-150ms per graph stepCustom implementation: 5-20ms per step# Framework: hard to intercept token usagefrom langgraph import StateGraph
# Custom: explicit token trackingclass TokenTracker: def __init__(self, max_tokens: int = 100000): self.max_tokens = max_tokens self.used = 0
def check_budget(self, estimated: int) -> bool: return self.used + estimated <= self.max_tokens
def record(self, actual: int): self.used += actual
class ProductionAgent: def __init__(self, tracker: TokenTracker): self.tracker = tracker
async def run(self, query: str) -> Response: # Estimate tokens before calling LLM estimated = self._estimate_tokens(query)
if not self.tracker.check_budget(estimated): return Response(error="Token budget exceeded")
response = await self.llm.generate(query) self.tracker.record(response.usage.total_tokens)
return responseWhen to Migrate
Not every project needs custom Python. Here’s my decision framework:
Stay with framework if:- Prototyping or learning- Simple single-agent workflows- Standard memory and retry requirements- Time-to-market is priority
Migrate to custom if:- Multi-agent with complex coordination- Custom memory backends (vector DBs, graph databases)- Business-specific retry and failure handling- Cost optimization at scale- Need full control over execution flowThe Migration Process
I migrated incrementally, not all at once:
Week 1: Extract memory layer from frameworkWeek 2: Replace retry logic with custom implementationWeek 3: Migrate tool execution to explicit handlersWeek 4: Remove framework dependency entirelyThe result: 40% less code, 3x faster execution, and full control over behavior.
Summary
Teams migrate from LangGraph to custom Python when they hit limitations in memory management, retry handling, tool execution control, and cost/latency optimization. Frameworks provide excellent prototyping speed but introduce abstraction layers that become obstacles for production-scale agents.
The migration is a natural evolution, not a framework failure. Start with frameworks for learning and prototyping. Plan for custom builds when production requirements crystallize. You’ll need to implement scaffolding yourself, but you’ll have a much better understanding of how your agents work.
Final Words + More Resources
My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me
Here are also the most important links from this article along with some further resources that will help you in this scope:
Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!
Comments