How to Implement Multi-Agent Failover for AI Coding Assistants
Problem
I was in the middle of a deep coding session when Claude went down. The outage lasted two hours. My flow was completely broken.
I had to:
- Wait for service restoration
- Re-explain my entire context to a fresh session
- Reconstruct what I was working on from memory
- Lose the accumulated understanding we’d built
Sound familiar? Single-provider dependency is a productivity risk. When your AI coding assistant goes down, you’re stuck.
What I Needed
I wanted a system that would:
- Detect when Claude (my primary) is unavailable
- Automatically switch to a backup provider
- Preserve conversation context across the switch
- Keep me coding with zero interruption
So I built a multi-agent orchestrator called “Daniel” that wraps multiple AI CLIs with automatic failover.
Architecture Overview
The key insight: wrap multiple providers behind a single interface with shared state.
+-----------------+ +-------------------+| CLI / IDE | | Shared State || Interface | | (Knowledge Base) |+--------+--------+ +---------+---------+ | | v |+--------+--------+ || Orchestrator |<----------------+| - Router || - Health || - Context |+--------+--------+ | +----+----+----+ | | | | v v v v+------+ +------+ +------+|Claude| |Codex | |Gemini|+------+ +------+ +------+When Claude fails, the orchestrator routes to Codex or Gemini with full context injection.
Core Components
1. Unified Orchestrator
The orchestrator maintains provider priority and handles failover:
from dataclasses import dataclassfrom enum import Enumfrom typing import Optionalimport subprocessimport json
class Provider(Enum): CLAUDE = "claude" CODEX = "codex" GEMINI = "gemini"
@dataclassclass ProviderStatus: provider: Provider available: bool rate_limited: bool last_error: Optional[str] latency_ms: int
@dataclassclass ConversationContext: messages: list[dict] working_directory: str current_task: str files_modified: list[str]
class MultiAgentOrchestrator: """Wraps multiple AI CLIs with automatic failover."""
def __init__(self): self.providers = [Provider.CLAUDE, Provider.CODEX, Provider.GEMINI] self.status: dict[Provider, ProviderStatus] = {} self.context = ConversationContext( messages=[], working_directory="", current_task="", files_modified=[] ) self.knowledge_base = KnowledgeBase()
def execute(self, prompt: str, cwd: str = ".") -> str: """Execute prompt with automatic failover.""" self.context = self.knowledge_base.load_context()
for provider in self._get_available_providers(): try: result = self._execute_with_provider(provider, prompt, cwd) self._update_context(prompt, result) return result except ProviderError as e: self._mark_unavailable(provider, str(e)) continue
raise AllProvidersUnavailableError("No providers available")
def _get_available_providers(self) -> list[Provider]: """Get providers sorted by availability and performance.""" available = [ p for p in self.providers if self.status.get(p, ProviderStatus(p, True, False, None, 0)).available and not self.status.get(p).rate_limited ] return sorted(available, key=lambda p: self.status[p].latency_ms)2. Health Monitoring
I needed to know when providers go down before I try to use them:
import asynciofrom datetime import datetime, timedelta
class HealthMonitor: """Monitors provider health and rate limits."""
def __init__(self, check_interval_seconds: int = 30): self.check_interval = check_interval_seconds self.provider_status: dict[Provider, ProviderStatus] = {} self.rate_limit_windows: dict[Provider, list[datetime]] = {}
async def start_monitoring(self): """Background task to monitor all providers.""" while True: for provider in Provider: await self._check_provider(provider) await asyncio.sleep(self.check_interval)
async def _check_provider(self, provider: Provider): """Check single provider health.""" try: start = datetime.now() result = await self._ping_provider(provider) latency = (datetime.now() - start).total_seconds() * 1000
self.provider_status[provider] = ProviderStatus( provider=provider, available=result.success, rate_limited=self._is_rate_limited(provider), last_error=None if result.success else result.error, latency_ms=int(latency) ) except Exception as e: self.provider_status[provider] = ProviderStatus( provider=provider, available=False, rate_limited=False, last_error=str(e), latency_ms=0 )
def _is_rate_limited(self, provider: Provider) -> bool: """Check if provider is in rate limit cooldown.""" if provider not in self.rate_limit_windows: return False
cutoff = datetime.now() - timedelta(minutes=5) self.rate_limit_windows[provider] = [ t for t in self.rate_limit_windows[provider] if t > cutoff ] return len(self.rate_limit_windows[provider]) > 33. Context Preservation (The Critical Part)
This is where most failover systems fail. Different providers have different APIs and no shared memory.
My solution: a shared knowledge base that all providers read from:
import jsonfrom pathlib import Pathfrom typing import Optional
class KnowledgeBase: """Shared knowledge base across all agents."""
def __init__(self, storage_path: str = "~/.agent_orchestrator/kb"): self.storage_path = Path(storage_path).expanduser() self.storage_path.mkdir(parents=True, exist_ok=True)
def save_context(self, context: ConversationContext): """Persist context for cross-agent access.""" context_file = self.storage_path / "current_context.json" context.messages = context.messages[-100:] # Keep last 100
with open(context_file, 'w') as f: json.dump({ "messages": context.messages, "working_directory": context.working_directory, "current_task": context.current_task, "files_modified": context.files_modified, "updated_at": datetime.now().isoformat() }, f, indent=2)
def load_context(self) -> ConversationContext: """Load existing context.""" context_file = self.storage_path / "current_context.json"
if not context_file.exists(): return ConversationContext( messages=[], working_directory="", current_task="", files_modified=[] )
with open(context_file, 'r') as f: data = json.load(f)
return ConversationContext(**data)
def summarize_recent(self, max_messages: int = 20) -> str: """Create summary of recent context for cross-provider injection.""" context = self.load_context()
if not context.messages: return "No previous context available."
recent = context.messages[-max_messages:] summary_parts = []
for msg in recent: role = msg["role"].upper() content = msg["content"][:500] # Truncate long messages summary_parts.append(f"{role}: {content}")
return "\n\n".join(summary_parts)4. Context Injection on Failover
When switching providers, I inject the context into the new provider’s prompt:
def prepare_handoff(self, from_provider: Provider, to_provider: Provider) -> str: """Prepare context for cross-provider handoff.""" context = self.knowledge_base.load_context()
handoff_prompt = f"""[SESSION HANDOFF]Switching from {from_provider.value} to {to_provider.value}
[CONVERSATION SUMMARY]{self._summarize_context(context)}
[KEY DECISIONS MADE]{self._extract_decisions(context)}
[FILES MODIFIED]{chr(10).join(context.files_modified)}
[CURRENT TASK]{context.current_task}
Please continue from where we left off.""" return handoff_promptProvider Wrappers
Each provider needs a thin adapter:
class ClaudeProvider: """Claude Code CLI wrapper."""
def __init__(self, api_key: str): self.api_key = api_key self.cli_path = "claude"
def build_command(self, prompt: str, model: str = "claude-sonnet-4") -> list[str]: return [ self.cli_path, "--model", model, "--print", prompt ]
def parse_response(self, output: str) -> str: return output.strip()class CodexProvider: """Codex CLI wrapper (OpenAI)."""
def __init__(self, api_key: str): self.api_key = api_key self.cli_path = "codex"
def build_command(self, prompt: str) -> list[str]: return [ self.cli_path, "ask", "--json", prompt ]
def parse_response(self, output: str) -> str: try: data = json.loads(output) return data.get("response", output) except json.JSONDecodeError: return outputConfiguration
Here’s my failover cascade configuration:
providers: claude: enabled: true priority: 1 # Primary provider cli_path: "claude" default_model: "claude-sonnet-4" api_key_env: "ANTHROPIC_API_KEY" rate_limit: requests_per_minute: 60 tokens_per_minute: 100000 health_check: enabled: true interval_seconds: 30 timeout_seconds: 10
codex: enabled: true priority: 2 # Secondary (failover) cli_path: "codex" api_key_env: "OPENAI_API_KEY" rate_limit: requests_per_minute: 100 tokens_per_minute: 200000
gemini: enabled: true priority: 3 # Tertiary backup cli_path: "gemini" api_key_env: "GOOGLE_API_KEY"
failover: strategy: "priority" retry_attempts: 3 retry_delay_ms: 1000 backoff_multiplier: 2 circuit_breaker: failure_threshold: 5 recovery_timeout_seconds: 60
context: storage_path: "~/.agent_orchestrator/kb" max_messages: 100 sync_across_providers: true preserve_on_failover: trueWhat Happened During the Last Outage
When Claude went down during a recent outage, here’s what my orchestrator did:
- Health monitor detected Claude failure within 5 seconds
- Router automatically switched to Codex (priority 2)
- Context from last 20 messages was injected into Codex prompt
- Codex continued the session where Claude left off
- I kept coding from my phone on Termux, completely unaware Claude was down
Zero downtime. Full context preserved.
Mistakes I Made
Mistake 1: Not Persisting Context
My first version didn’t save context. Every failover meant starting fresh.
Fix: Added the KnowledgeBase layer that persists to disk.
Mistake 2: Equal Priority for All Providers
I set all providers to equal priority. The router would round-robin, which was confusing when providers have different capabilities.
Fix: Use priority-based routing. Claude is primary, Codex is failover, Gemini is backup.
Mistake 3: No Rate Limit Tracking
Providers would get rate-limited, and I’d keep hammering them.
Fix: Added rate limit windows and cooldown periods. If a provider gets rate-limited 3 times in 5 minutes, it goes into cooldown.
Mistake 4: Forgetting to Handle Different Output Formats
Each provider outputs differently. My parser assumed Claude’s format.
Fix: Provider-specific adapters with parse_response() methods.
Mistake 5: No Circuit Breaker
When a provider was down, I’d retry forever.
Fix: Implemented circuit breaker pattern. After 5 failures, the provider goes into a 60-second cooldown.
Failover Strategy Comparison
+----------------+----------+----------+----------+| Strategy | Simple | Flexible | Cost- || | | | Optimized|+----------------+----------+----------+----------+| Priority-based | Yes | Medium | No || Round-robin | Yes | Low | No || Least-latency | Medium | Medium | No || Capability- | Complex | High | No || based | | | || Cost-aware | Complex | Medium | Yes |+----------------+----------+----------+----------+Start with priority-based. It’s simple and predictable. Add capability-based routing once you understand your usage patterns.
Setup Checklist
Before you start:
- Install all provider CLIs (Claude Code, Codex CLI, Gemini CLI)
- Configure API keys for all providers
- Set up knowledge base storage directory
- Configure failover priorities
- Test each provider individually
Initial testing:
- Verify health checks work
- Test manual provider switching
- Verify context preservation
- Simulate an outage and test failover
Summary
Single-provider dependency is a risk. I learned this the hard way when Claude went down mid-session.
The solution: a multi-agent orchestrator that:
- Wraps multiple AI CLIs behind a unified interface
- Monitors provider health in the background
- Automatically fails over when providers go down
- Preserves conversation context across providers
- Routes based on priority, capability, or cost
The key insight is that context preservation is the hardest part. Different providers have no shared memory. My solution uses a shared knowledge base that injects recent context into each provider’s prompt format.
Now when Claude goes down, I don’t even notice. The orchestrator handles it.
Final Words + More Resources
My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me
Here are also the most important links from this article along with some further resources that will help you in this scope:
- 👨💻 Daniel Multi-Agent Orchestrator
- 👨💻 Reddit: Obsidian + Claude = no more copy paste
- 👨💻 Claude Code Documentation
- 👨💻 Circuit Breaker Pattern
Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!
Comments