Skip to content

How to Wrap Claude Code CLI for Multi-Agent Orchestration

Purpose

This post shows how to wrap Claude Code CLI and Codex for multi-agent orchestration. The key insight: don’t rebuild what the CLI tools already provide.

Problem

I wanted to build a multi-agent system. I started with LangGraph, implementing everything from scratch:

my-agent.py
class MultiAgentSystem:
def __init__(self):
self.llm_client = OpenAI()
self.tools = self.build_tools() # Weeks of work
self.memory = self.build_memory() # Complex
self.reasoning = self.build_reasoning() # Very complex

Two weeks later, I was still debugging tool calling edge cases:

error.log
ERROR: Tool call parsing failed for response: {"content": "I'll help you with...", "tool_calls": null}
ERROR: Context window exceeded for conversation: 128000 tokens used, max 100000
ERROR: Retry failed after 3 attempts: Rate limit exceeded

I realized I was rebuilding capabilities that Claude Code CLI already has built-in.

Environment

  • Python 3.12
  • Claude Code CLI (latest)
  • Codex CLI (optional, for dual-engine setup)
  • PostgreSQL for task persistence
  • Redis for caching

The Insight

A Reddit comment changed my approach:

“The AI engines handle all the hard stuff natively including tool use, file editing, memory, multi-step reasoning. The gateway just adds what they’re missing: routing, cron scheduling, messaging integration, and a multi-agent org system.”

Claude Code CLI already provides:

  • Native tool calling
  • File system operations
  • Context management
  • Multi-step reasoning
  • Memory handling

Why rebuild all of that?

Architecture Overview

The wrapper architecture has three layers:

architecture-diagram.txt
+-------------------+ +-------------------+
| Claude Code CLI | | Codex CLI |
| (Native Tools) | | (Alternative) |
+-------------------+ +-------------------+
| |
+--------+---------------+
|
v
+-----------------------------------+
| Gateway Daemon |
| (Routing, Scheduling, Coords) |
+-----------------------------------+
|
v
+-----------------------------------+
| Multi-Agent Organization |
| (Registry, Discovery, Comms) |
+-----------------------------------+

The gateway is thin. It only handles what CLI tools don’t provide.

What I Tried First (Wrong Approach)

I initially built a heavy gateway that tried to do too much:

heavy-gateway.py
class HeavyGateway:
def process(self, request):
prompt = self.optimize_prompt(request) # CLI does this
context = self.manage_context(prompt) # CLI does this
tools = self.select_tools(request) # CLI does this
response = self.call_llm(prompt, context) # CLI does this
result = self.post_process(response) # Unnecessary
memory = self.update_memory(result) # CLI does this
return result

This was redundant. The CLI already handles prompt optimization, context management, tool selection, and memory.

The Right Approach: Thin Gateway

I simplified to a thin gateway that delegates to CLI:

thin-gateway.py
class ThinGateway:
def process(self, request):
agent = self.router.select(request)
return agent.execute(request) # CLI does the rest

This works because the CLI handles all the complexity internally.

Basic Gateway Implementation

Here’s my working gateway daemon:

gateway_daemon.py
#!/usr/bin/env python3
import asyncio
import subprocess
import json
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
class EngineType(Enum):
CLAUDE_CODE = "claude-code"
CODEX = "codex"
@dataclass
class AgentConfig:
name: str
engine: EngineType
capabilities: List[str]
max_concurrent_tasks: int = 1
class CLIWrapper:
def __init__(self, engine: EngineType):
self.engine = engine
self.process = None
async def execute(self, prompt: str, tools: List[str] = None) -> str:
"""Execute a command via the CLI tool"""
cmd = [self.engine.value]
if tools:
cmd.extend(["--tools", ",".join(tools)])
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate(
input=prompt.encode(),
timeout=300 # 5 minute timeout
)
if proc.returncode != 0:
raise Exception(f"CLI error: {stderr.decode()}")
return stdout.decode()
except asyncio.TimeoutError:
proc.kill()
raise Exception("CLI execution timed out")
class AgentRouter:
def __init__(self, agents: Dict[str, AgentConfig]):
self.agents = agents
self.wrappers = {
name: CLIWrapper(config.engine)
for name, config in agents.items()
}
def select_agent(self, task_type: str) -> CLIWrapper:
"""Select the best agent for a task type"""
for name, config in self.agents.items():
if task_type in config.capabilities:
return self.wrappers[name]
# Default to Claude Code
return self.wrappers.get("default")
class GatewayDaemon:
def __init__(self, agents: Dict[str, AgentConfig]):
self.router = AgentRouter(agents)
self.scheduler = CronScheduler()
self.messaging = MessagingClient()
async def process_task(self, task: dict):
"""Process a single task through the gateway"""
agent = self.router.select_agent(task["type"])
return await agent.execute(task["prompt"])
async def run_scheduled_tasks(self):
"""Run tasks from the scheduler"""
while True:
tasks = self.scheduler.get_pending_tasks()
for task in tasks:
result = await self.process_task(task)
await self.messaging.notify_result(task["id"], result)
await asyncio.sleep(60) # Check every minute

When I run this with agent configs:

main.py
if __name__ == "__main__":
agents = {
"code_analyst": AgentConfig(
name="code_analyst",
engine=EngineType.CLAUDE_CODE,
capabilities=["code_review", "analysis"]
),
"code_generator": AgentConfig(
name="code_generator",
engine=EngineType.CODEX,
capabilities=["code_generation", "refactoring"]
)
}
gateway = GatewayDaemon(agents)
asyncio.run(gateway.run_scheduled_tasks())

Multi-Agent Workflow

For complex tasks requiring multiple agents:

workflow.py
from typing import List, Dict, Any
import asyncio
class MultiAgentWorkflow:
"""Coordinate multiple agents for complex tasks"""
def __init__(self, gateway: GatewayDaemon):
self.gateway = gateway
self.shared_context = {}
async def execute_parallel(
self,
tasks: List[dict]
) -> List[dict]:
"""Execute multiple independent tasks in parallel"""
results = await asyncio.gather(*[
self.gateway.process_task(task)
for task in tasks
])
return results
async def execute_sequential(
self,
tasks: List[dict],
pass_context: bool = True
) -> List[dict]:
"""Execute tasks in sequence, optionally passing context"""
results = []
for i, task in enumerate(tasks):
if pass_context and i > 0:
task["context"] = self.shared_context
result = await self.gateway.process_task(task)
results.append(result)
# Update shared context
self.shared_context[f"task_{i}_result"] = result
return results
async def execute_with_coordination(
self,
initial_task: dict,
coordination_prompt: str
) -> dict:
"""Execute a task, then use an agent to coordinate next steps"""
# Step 1: Execute initial task
initial_result = await self.gateway.process_task(initial_task)
# Step 2: Use coordination agent to decide next steps
coordination_task = {
"type": "analysis",
"prompt": f"{coordination_prompt}\n\nResult: {initial_result}"
}
coordination_result = await self.gateway.process_task(coordination_task)
return {
"initial_result": initial_result,
"coordination": coordination_result
}

Example workflow for code review:

example.py
async def run_code_review_workflow(gateway: GatewayDaemon):
workflow = MultiAgentWorkflow(gateway)
# Sequential workflow: analyze -> generate -> review
tasks = [
{
"type": "analysis",
"prompt": "Analyze the requirements for this feature: ..."
},
{
"type": "code_generation",
"prompt": "Generate code based on the analysis"
},
{
"type": "code_review",
"prompt": "Review the generated code for issues"
}
]
results = await workflow.execute_sequential(tasks)
return results

Cron Scheduling

The gateway adds scheduling that CLI tools don’t have:

scheduler.py
from croniter import croniter
from datetime import datetime
import asyncio
class CronScheduler:
"""Schedule tasks using cron expressions"""
def __init__(self):
self.scheduled_tasks = []
def add_task(
self,
cron_expression: str,
task: dict
):
"""Add a task with a cron schedule"""
self.scheduled_tasks.append({
"cron": cron_expression,
"task": task,
"last_run": None
})
def get_pending_tasks(self) -> List[dict]:
"""Get tasks that are due to run"""
pending = []
now = datetime.now()
for scheduled in self.scheduled_tasks:
cron = croniter(scheduled["cron"], scheduled["last_run"])
if cron.get_next(datetime) <= now:
pending.append(scheduled["task"])
scheduled["last_run"] = now
return pending
# Example usage
scheduler = CronScheduler()
# Schedule code review every morning at 9 AM
scheduler.add_task(
"0 9 * * *",
{
"type": "code_review",
"prompt": "Review all code changes from the last 24 hours"
}
)
# Schedule weekly architecture review
scheduler.add_task(
"0 10 * * 1", # Monday at 10 AM
{
"type": "analysis",
"prompt": "Analyze system architecture for improvements"
}
)

Common Mistakes I Made

Mistake 1: Re-implementing Tool Calling

I wasted time building a tool caller from scratch:

wrong-tool-caller.py
# Wrong approach
class ToolCaller:
def __init__(self, llm_client):
self.llm = llm_client
def call_tool(self, tool_name, params):
# Weeks of work to get this right
response = self.llm.generate(...)
tool_call = self.parse_tool_call(response)
result = self.execute_tool(tool_call)
return self.format_result(result)

The CLI already does this:

right-tool-caller.py
# Right approach
result = subprocess.run(
["claude-code", "--tool", tool_name, "--params", json.dumps(params)],
capture_output=True
)

Mistake 2: Building My Own Context Manager

I tried to manage context manually:

wrong-context.py
# Wrong: Building your own context manager
class ContextManager:
def __init__(self):
self.history = []
self.max_tokens = 100000
def add_message(self, msg):
self.history.append(msg)
if self.count_tokens() > self.max_tokens:
self.summarize_old_messages()

The CLI handles context automatically:

right-context.py
# Right: CLI handles context automatically
process = subprocess.Popen(
["claude-code", "--interactive"],
stdin=PIPE, stdout=PIPE
)
# CLI maintains context across commands

Mistake 3: Not Handling CLI Failures

I didn’t add error handling initially:

wrong-error.py
# Wrong: No error handling
result = subprocess.run(["claude-code", "task"])

I added robust error handling:

right-error.py
# Right: Robust error handling
try:
result = subprocess.run(
["claude-code", "task"],
capture_output=True,
timeout=300 # 5 minute timeout
)
if result.returncode != 0:
logger.error(f"CLI failed: {result.stderr}")
# Fallback to alternative engine
result = subprocess.run(["codex", "task"])
except subprocess.TimeoutExpired:
logger.error("CLI timed out")
# Implement retry logic or escalation

Benefits and Trade-offs

Benefits

  1. Immediate Functionality: Tool calling works day one
  2. Reduced Complexity: ~90% less code to maintain
  3. Flexibility: Swap AI engines easily (Claude Code vs Codex)
  4. Cost Efficiency: Faster development (weeks -> days)

Trade-offs

  1. Less Control: Bound by CLI tool capabilities
  2. Abstraction Leaks: CLI tool bugs affect your system
  3. Inter-process Communication: Need to manage CLI processes

When to Use This Approach

Use the wrapper approach when:

  • You need multi-agent orchestration quickly
  • You want to leverage existing CLI capabilities
  • You need to swap between different AI engines
  • You want to focus on orchestration, not infrastructure

Build from scratch when:

  • You need fine-grained control over tool calling
  • You have specific requirements CLI tools don’t support
  • You’re building a platform for others to extend

Summary

In this post, I showed how to wrap Claude Code CLI for multi-agent orchestration. The key insight: don’t rebuild what CLI tools already provide. The gateway only handles routing, scheduling, and coordination - the CLI handles tool calling, context management, and reasoning.

Start with a simple single-agent wrapper, then gradually add coordination logic as your multi-agent needs evolve. Resist the temptation to rebuild what the CLI tools already provide.

Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments