How to Wrap Claude Code CLI for Multi-Agent Orchestration
Purpose
This post shows how to wrap Claude Code CLI and Codex for multi-agent orchestration. The key insight: don’t rebuild what the CLI tools already provide.
Problem
I wanted to build a multi-agent system. I started with LangGraph, implementing everything from scratch:
class MultiAgentSystem: def __init__(self): self.llm_client = OpenAI() self.tools = self.build_tools() # Weeks of work self.memory = self.build_memory() # Complex self.reasoning = self.build_reasoning() # Very complexTwo weeks later, I was still debugging tool calling edge cases:
ERROR: Tool call parsing failed for response: {"content": "I'll help you with...", "tool_calls": null}ERROR: Context window exceeded for conversation: 128000 tokens used, max 100000ERROR: Retry failed after 3 attempts: Rate limit exceededI realized I was rebuilding capabilities that Claude Code CLI already has built-in.
Environment
- Python 3.12
- Claude Code CLI (latest)
- Codex CLI (optional, for dual-engine setup)
- PostgreSQL for task persistence
- Redis for caching
The Insight
A Reddit comment changed my approach:
“The AI engines handle all the hard stuff natively including tool use, file editing, memory, multi-step reasoning. The gateway just adds what they’re missing: routing, cron scheduling, messaging integration, and a multi-agent org system.”
Claude Code CLI already provides:
- Native tool calling
- File system operations
- Context management
- Multi-step reasoning
- Memory handling
Why rebuild all of that?
Architecture Overview
The wrapper architecture has three layers:
+-------------------+ +-------------------+| Claude Code CLI | | Codex CLI || (Native Tools) | | (Alternative) |+-------------------+ +-------------------+ | | +--------+---------------+ | v+-----------------------------------+| Gateway Daemon || (Routing, Scheduling, Coords) |+-----------------------------------+ | v+-----------------------------------+| Multi-Agent Organization || (Registry, Discovery, Comms) |+-----------------------------------+The gateway is thin. It only handles what CLI tools don’t provide.
What I Tried First (Wrong Approach)
I initially built a heavy gateway that tried to do too much:
class HeavyGateway: def process(self, request): prompt = self.optimize_prompt(request) # CLI does this context = self.manage_context(prompt) # CLI does this tools = self.select_tools(request) # CLI does this response = self.call_llm(prompt, context) # CLI does this result = self.post_process(response) # Unnecessary memory = self.update_memory(result) # CLI does this return resultThis was redundant. The CLI already handles prompt optimization, context management, tool selection, and memory.
The Right Approach: Thin Gateway
I simplified to a thin gateway that delegates to CLI:
class ThinGateway: def process(self, request): agent = self.router.select(request) return agent.execute(request) # CLI does the restThis works because the CLI handles all the complexity internally.
Basic Gateway Implementation
Here’s my working gateway daemon:
#!/usr/bin/env python3import asyncioimport subprocessimport jsonfrom typing import Dict, List, Optionalfrom dataclasses import dataclassfrom enum import Enum
class EngineType(Enum): CLAUDE_CODE = "claude-code" CODEX = "codex"
@dataclassclass AgentConfig: name: str engine: EngineType capabilities: List[str] max_concurrent_tasks: int = 1
class CLIWrapper: def __init__(self, engine: EngineType): self.engine = engine self.process = None
async def execute(self, prompt: str, tools: List[str] = None) -> str: """Execute a command via the CLI tool""" cmd = [self.engine.value]
if tools: cmd.extend(["--tools", ",".join(tools)])
try: proc = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE )
stdout, stderr = await proc.communicate( input=prompt.encode(), timeout=300 # 5 minute timeout )
if proc.returncode != 0: raise Exception(f"CLI error: {stderr.decode()}")
return stdout.decode()
except asyncio.TimeoutError: proc.kill() raise Exception("CLI execution timed out")
class AgentRouter: def __init__(self, agents: Dict[str, AgentConfig]): self.agents = agents self.wrappers = { name: CLIWrapper(config.engine) for name, config in agents.items() }
def select_agent(self, task_type: str) -> CLIWrapper: """Select the best agent for a task type""" for name, config in self.agents.items(): if task_type in config.capabilities: return self.wrappers[name]
# Default to Claude Code return self.wrappers.get("default")
class GatewayDaemon: def __init__(self, agents: Dict[str, AgentConfig]): self.router = AgentRouter(agents) self.scheduler = CronScheduler() self.messaging = MessagingClient()
async def process_task(self, task: dict): """Process a single task through the gateway""" agent = self.router.select_agent(task["type"]) return await agent.execute(task["prompt"])
async def run_scheduled_tasks(self): """Run tasks from the scheduler""" while True: tasks = self.scheduler.get_pending_tasks() for task in tasks: result = await self.process_task(task) await self.messaging.notify_result(task["id"], result) await asyncio.sleep(60) # Check every minuteWhen I run this with agent configs:
if __name__ == "__main__": agents = { "code_analyst": AgentConfig( name="code_analyst", engine=EngineType.CLAUDE_CODE, capabilities=["code_review", "analysis"] ), "code_generator": AgentConfig( name="code_generator", engine=EngineType.CODEX, capabilities=["code_generation", "refactoring"] ) }
gateway = GatewayDaemon(agents) asyncio.run(gateway.run_scheduled_tasks())Multi-Agent Workflow
For complex tasks requiring multiple agents:
from typing import List, Dict, Anyimport asyncio
class MultiAgentWorkflow: """Coordinate multiple agents for complex tasks"""
def __init__(self, gateway: GatewayDaemon): self.gateway = gateway self.shared_context = {}
async def execute_parallel( self, tasks: List[dict] ) -> List[dict]: """Execute multiple independent tasks in parallel""" results = await asyncio.gather(*[ self.gateway.process_task(task) for task in tasks ]) return results
async def execute_sequential( self, tasks: List[dict], pass_context: bool = True ) -> List[dict]: """Execute tasks in sequence, optionally passing context""" results = []
for i, task in enumerate(tasks): if pass_context and i > 0: task["context"] = self.shared_context
result = await self.gateway.process_task(task) results.append(result)
# Update shared context self.shared_context[f"task_{i}_result"] = result
return results
async def execute_with_coordination( self, initial_task: dict, coordination_prompt: str ) -> dict: """Execute a task, then use an agent to coordinate next steps""" # Step 1: Execute initial task initial_result = await self.gateway.process_task(initial_task)
# Step 2: Use coordination agent to decide next steps coordination_task = { "type": "analysis", "prompt": f"{coordination_prompt}\n\nResult: {initial_result}" }
coordination_result = await self.gateway.process_task(coordination_task)
return { "initial_result": initial_result, "coordination": coordination_result }Example workflow for code review:
async def run_code_review_workflow(gateway: GatewayDaemon): workflow = MultiAgentWorkflow(gateway)
# Sequential workflow: analyze -> generate -> review tasks = [ { "type": "analysis", "prompt": "Analyze the requirements for this feature: ..." }, { "type": "code_generation", "prompt": "Generate code based on the analysis" }, { "type": "code_review", "prompt": "Review the generated code for issues" } ]
results = await workflow.execute_sequential(tasks) return resultsCron Scheduling
The gateway adds scheduling that CLI tools don’t have:
from croniter import croniterfrom datetime import datetimeimport asyncio
class CronScheduler: """Schedule tasks using cron expressions"""
def __init__(self): self.scheduled_tasks = []
def add_task( self, cron_expression: str, task: dict ): """Add a task with a cron schedule""" self.scheduled_tasks.append({ "cron": cron_expression, "task": task, "last_run": None })
def get_pending_tasks(self) -> List[dict]: """Get tasks that are due to run""" pending = [] now = datetime.now()
for scheduled in self.scheduled_tasks: cron = croniter(scheduled["cron"], scheduled["last_run"])
if cron.get_next(datetime) <= now: pending.append(scheduled["task"]) scheduled["last_run"] = now
return pending
# Example usagescheduler = CronScheduler()
# Schedule code review every morning at 9 AMscheduler.add_task( "0 9 * * *", { "type": "code_review", "prompt": "Review all code changes from the last 24 hours" })
# Schedule weekly architecture reviewscheduler.add_task( "0 10 * * 1", # Monday at 10 AM { "type": "analysis", "prompt": "Analyze system architecture for improvements" })Common Mistakes I Made
Mistake 1: Re-implementing Tool Calling
I wasted time building a tool caller from scratch:
# Wrong approachclass ToolCaller: def __init__(self, llm_client): self.llm = llm_client
def call_tool(self, tool_name, params): # Weeks of work to get this right response = self.llm.generate(...) tool_call = self.parse_tool_call(response) result = self.execute_tool(tool_call) return self.format_result(result)The CLI already does this:
# Right approachresult = subprocess.run( ["claude-code", "--tool", tool_name, "--params", json.dumps(params)], capture_output=True)Mistake 2: Building My Own Context Manager
I tried to manage context manually:
# Wrong: Building your own context managerclass ContextManager: def __init__(self): self.history = [] self.max_tokens = 100000
def add_message(self, msg): self.history.append(msg) if self.count_tokens() > self.max_tokens: self.summarize_old_messages()The CLI handles context automatically:
# Right: CLI handles context automaticallyprocess = subprocess.Popen( ["claude-code", "--interactive"], stdin=PIPE, stdout=PIPE)# CLI maintains context across commandsMistake 3: Not Handling CLI Failures
I didn’t add error handling initially:
# Wrong: No error handlingresult = subprocess.run(["claude-code", "task"])I added robust error handling:
# Right: Robust error handlingtry: result = subprocess.run( ["claude-code", "task"], capture_output=True, timeout=300 # 5 minute timeout ) if result.returncode != 0: logger.error(f"CLI failed: {result.stderr}") # Fallback to alternative engine result = subprocess.run(["codex", "task"])except subprocess.TimeoutExpired: logger.error("CLI timed out") # Implement retry logic or escalationBenefits and Trade-offs
Benefits
- Immediate Functionality: Tool calling works day one
- Reduced Complexity: ~90% less code to maintain
- Flexibility: Swap AI engines easily (Claude Code vs Codex)
- Cost Efficiency: Faster development (weeks -> days)
Trade-offs
- Less Control: Bound by CLI tool capabilities
- Abstraction Leaks: CLI tool bugs affect your system
- Inter-process Communication: Need to manage CLI processes
When to Use This Approach
Use the wrapper approach when:
- You need multi-agent orchestration quickly
- You want to leverage existing CLI capabilities
- You need to swap between different AI engines
- You want to focus on orchestration, not infrastructure
Build from scratch when:
- You need fine-grained control over tool calling
- You have specific requirements CLI tools don’t support
- You’re building a platform for others to extend
Summary
In this post, I showed how to wrap Claude Code CLI for multi-agent orchestration. The key insight: don’t rebuild what CLI tools already provide. The gateway only handles routing, scheduling, and coordination - the CLI handles tool calling, context management, and reasoning.
Start with a simple single-agent wrapper, then gradually add coordination logic as your multi-agent needs evolve. Resist the temptation to rebuild what the CLI tools already provide.
Final Words + More Resources
My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me
Here are also the most important links from this article along with some further resources that will help you in this scope:
- 👨💻 Reddit: OpenClaw architecture discussion
- 👨💻 Claude Code CLI Documentation
- 👨💻 Circuit Breaker Pattern
Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!
Comments