How to Build a Multi-Agent AI Trading System with Self-Improving Prompts
I spent months building trading prompts that worked great—until they didn’t. Markets changed, strategies stopped working, and I was back at square one tweaking prompts manually. Then I stumbled on a Reddit post where someone ran 25 AI agents debating strategies across 378 trading days, with prompts that “live or die by Sharpe ratio.” Here’s how I built my own version.
The Problem with Static Trading Prompts
Every quant developer knows the drill: you craft a perfect prompt for your trading agent, backtest it, deploy it, and watch it make money. Then markets shift. Volatility spikes. Your prompt that worked beautifully in a trending market gets destroyed in a ranging market.
I had folders full of “prompt_v1.txt”, “prompt_v2_final.txt”, “prompt_v2_FINAL_final.txt”. No version control, no performance tracking, no way to know which prompt worked when. The Reddit post described something different: prompts as git commits, evolving based on actual trading performance.
Architecture: Four Layers of Analysis
The system I built has four analysis layers feeding into a portfolio manager:
+------------------+| Layer 4 | <- Portfolio Manager (Synthesis)+------------------+ |+------------------+| Layer 3 | <- Single Names (Individual Securities)+------------------+ |+------------------+| Layer 2 | <- Commodities Analysis+------------------+ |+------------------+| Layer 1 | <- Macro Analysis+------------------+Each layer has specialized agents with distinct system prompts. The macro agent looks at Fed policy, global economics, and geopolitical events. Sector agents focus on industry rotation. Commodities agents track supply/demand fundamentals. Single-name agents pick individual stocks.
The key insight from the Reddit discussion: instead of 25 static agents, use OpenClaw instances with different system prompts and let them “argue through a shared memory or message queue.”
Setting Up the Debate Layer
I started with LangGraph to orchestrate the agent debate. Here’s the core structure:
from langgraph.graph import StateGraph, ENDfrom typing import TypedDict, Annotatedimport operatorfrom datetime import datetime
class DebateState(TypedDict): macro_analysis: dict sector_analysis: dict commodities_analysis: dict single_name_analysis: dict debate_log: Annotated[list, operator.add] final_recommendation: dict timestamp: str
AGENT_PROMPTS = { "macro": """ You are a macro-economic analyst. Focus on: - Federal Reserve policy and interest rates - Global economic indicators (GDP, inflation, employment) - Geopolitical events and their market impact
Output: Macro outlook score (-10 to +10) and key risk factors. """,
"sector": """ You are a sector rotation specialist. Focus on: - Industry-specific catalysts - Relative strength between sectors - Earnings trends and guidance
Output: Sector recommendations with confidence scores. """, # ... commodities and single_name prompts}
def macro_agent_node(state: DebateState) -> DebateState: """Macro analysis agent.""" analysis = run_openclaw_agent( agent_type="macro", system_prompt=AGENT_PROMPTS["macro"], context=get_market_context() ) return { "macro_analysis": analysis, "debate_log": [{"agent": "macro", "output": analysis}] }
# Build debate graphdebate_graph = StateGraph(DebateState)debate_graph.add_node("macro_agent", macro_agent_node)debate_graph.add_node("sector_agent", sector_agent_node)debate_graph.add_node("portfolio_manager", portfolio_manager_node)
debate_graph.set_entry_point("macro_agent")debate_graph.add_edge("macro_agent", "sector_agent")debate_graph.add_edge("sector_agent", "portfolio_manager")debate_graph.add_edge("portfolio_manager", END)
debate_app = debate_graph.compile()The state flows through each layer, with downstream agents seeing upstream analysis. This creates a structured debate where each agent builds on prior context.
The Scoring Layer: Prompts Live or Die by Sharpe Ratio
This is where the system gets interesting. I created a PromptScoringLayer that evaluates each prompt version based on actual trading performance:
import subprocessfrom dataclasses import dataclassimport numpy as np
@dataclassclass PromptPerformance: prompt_id: str prompt_content: str sharpe_ratio: float win_rate: float max_drawdown: float trades_count: int is_active: bool
class PromptScoringLayer: """ Evaluates prompt performance. Prompts live or die by their Sharpe ratio. """
def __init__(self, min_trades: int = 30, min_sharpe: float = 0.5): self.min_trades = min_trades self.min_sharpe = min_sharpe self.prompts_repo = "/path/to/prompts/repo"
def evaluate_prompt(self, prompt_id: str) -> PromptPerformance: """Calculate performance metrics for a prompt version.""" trades = self._load_trades(prompt_id)
if len(trades) < self.min_trades: return None # Not enough data
returns = [t["return"] for t in trades] sharpe = self._calculate_sharpe(returns) win_rate = len([r for r in returns if r > 0]) / len(returns) max_dd = self._calculate_max_drawdown(returns)
return PromptPerformance( prompt_id=prompt_id, sharpe_ratio=sharpe, win_rate=win_rate, max_drawdown=max_dd, trades_count=len(trades), is_active=sharpe >= self.min_sharpe )
def _calculate_sharpe(self, returns: list, risk_free: float = 0.04) -> float: """Calculate annualized Sharpe ratio.""" returns_arr = np.array(returns) excess = returns_arr - (risk_free / 252) return np.sqrt(252) * np.mean(excess) / np.std(excess)
def commit_prompt(self, performance: PromptPerformance) -> bool: """Git commit prompts that survive performance thresholds.""" if performance.sharpe_ratio < self.min_sharpe: self._deactivate_prompt(performance.prompt_id) return False
commit_msg = f""" [PROMOTED] Sharpe: {performance.sharpe_ratio:.2f} Win Rate: {performance.win_rate:.1%} Max DD: {performance.max_drawdown:.1%} """ subprocess.run(["git", "add", f"prompts/{performance.prompt_id}.yaml"]) subprocess.run(["git", "commit", "-m", commit_msg]) return TrueThe minimum threshold I use is 30 trades and Sharpe ratio of 0.5. Any prompt that doesn’t meet these gets deactivated. Winners get committed to the repository.
Daily Cycle Automation
The Reddit post mentioned: “Daily cycle runs on a cron job. OpenClaw agents debate. Atlas-gic scoring layer evaluates. Git commits happen automatically.”
Here’s my implementation:
import schedulefrom datetime import datetime
def run_daily_cycle(): """Run the complete daily trading cycle.""" print(f"[{datetime.now()}] Starting daily trading cycle...")
# Step 1: Run debate initial_state = { "macro_analysis": {}, "sector_analysis": {}, "commodities_analysis": {}, "single_name_analysis": {}, "debate_log": [], "final_recommendation": {} } result = debate_app.invoke(initial_state)
# Step 2: Execute trades execute_trades(result["final_recommendation"])
# Step 3: Log results log_daily_results(result)
# Step 4: Evaluate prompt performance (weekly) if datetime.now().weekday() == 4: # Friday evaluate_all_prompts()
print(f"[{datetime.now()}] Daily cycle complete.")
# Schedule for market openschedule.every().day.at("09:30").do(run_daily_cycle)
while True: schedule.run_pending() time.sleep(60)For production, I wrapped this in a systemd service:
[Unit]Description=Multi-Agent Trading SystemAfter=network.target
[Service]Type=simpleUser=tradingWorkingDirectory=/opt/trading-agentsExecStart=/opt/trading-agents/venv/bin/python daily_cycle.pyRestart=on-failureRestartSec=10
[Install]WantedBy=multi-user.targetShared Memory for Agent Communication
Agents need to share analysis. I implemented this with Redis pub/sub:
import asyncioimport jsonfrom datetime import datetime
class SharedMemory: """Shared memory for agent debate via Redis pub/sub."""
def __init__(self, redis_client=None): self.redis = redis_client self.local_cache = {}
async def publish_analysis(self, agent_type: str, analysis: dict): """Agent publishes its analysis to shared memory.""" channel = f"trading:analysis:{agent_type}" message = { "agent": agent_type, "analysis": analysis, "timestamp": datetime.now().isoformat() }
if self.redis: await self.redis.publish(channel, json.dumps(message)) else: self.local_cache[channel] = message
async def subscribe_to_layer(self, layer: str, callback): """Subscribe to updates from a specific analysis layer.""" channel = f"trading:analysis:{layer}"
if self.redis: pubsub = self.redis.pubsub() await pubsub.subscribe(channel) async for message in pubsub.listen(): if message["type"] == "message": await callback(json.loads(message["data"]))This lets the sector agent see macro analysis, the commodities agent see both macro and sector views, and so on down the chain.
Prompt Evolution: The Key Innovation
The real power is in the prompt evolution system. When a prompt performs well (Sharpe > 1.5), I create variants. When it underperforms (Sharpe < 0.5), I kill it:
import yamlfrom pathlib import Pathfrom datetime import datetime
class PromptEvolution: """Manages prompt evolution through git-based versioning."""
def __init__(self, prompts_dir: str): self.prompts_dir = Path(prompts_dir) self.active_prompts = self._load_active_prompts()
def create_variant(self, base_prompt_id: str, mutation: str) -> str: """Create a new prompt variant from a successful prompt.""" base = self.active_prompts[base_prompt_id] new_id = f"{base_prompt_id}_v{datetime.now().strftime('%Y%m%d_%H%M')}"
new_prompt = { "id": new_id, "parent": base_prompt_id, "content": f"{base['content']}\n\n# Mutation\n{mutation}", "created_at": datetime.now().isoformat(), "active": True, "performance": { "sharpe_ratio": None, "trades": [] } }
# Save and commit prompt_path = self.prompts_dir / f"{new_id}.yaml" with open(prompt_path, "w") as f: yaml.dump(new_prompt, f)
subprocess.run(["git", "add", str(prompt_path)]) subprocess.run(["git", "commit", "-m", f"[NEW] {new_id} from {base_prompt_id}"])
return new_id
def evolve_prompts(self, performance_data: dict): """Run evolution cycle: kill underperformers, mutate winners.""" for prompt_id, metrics in performance_data.items(): if metrics["trades_count"] < 30: continue
if metrics["sharpe_ratio"] < 0.5: self._deactivate_prompt(prompt_id) subprocess.run(["git", "commit", "-m", f"[KILLED] {prompt_id}"])
elif metrics["sharpe_ratio"] > 1.5: mutations = self._generate_mutations(prompt_id) for mutation in mutations: self.create_variant(prompt_id, mutation)I use an LLM to generate mutations:
def _generate_mutations(self, prompt_id: str) -> list: """Use LLM to generate prompt mutations.""" base_prompt = self.active_prompts[prompt_id]["content"]
mutation_request = f""" Given this successful trading prompt with Sharpe > 1.5: --- {base_prompt} --- Generate 3 mutations: 1. A more conservative variant 2. A variant with different risk parameters 3. A variant with additional market context
Return as JSON array. """
mutations = call_llm(mutation_request, response_format="json") return mutationsOpenClaw Integration
For the debate layer, I use OpenClaw instances with different system prompts:
from dataclasses import dataclass
@dataclassclass OpenClawAgentConfig: agent_type: str system_prompt: str model: str = "claude-3-sonnet" temperature: float = 0.7
class OpenClawOrchestrator: """Manages multiple OpenClaw instances for debate."""
def __init__(self, base_url: str = "http://localhost:8000"): self.base_url = base_url self.instances = {}
async def spawn_agent(self, config: OpenClawAgentConfig) -> str: """Spin up OpenClaw instance with specific system prompt.""" instance_id = f"{config.agent_type}_{datetime.now().strftime('%H%M%S')}"
response = await http_post( f"{self.base_url}/spawn", json={ "instance_id": instance_id, "model": config.model, "system_prompt": config.system_prompt } )
self.instances[instance_id] = {"config": config, "status": "running"} return instance_id
async def run_debate(self, topic: str, participants: list, rounds: int = 3) -> dict: """Run structured debate between agent instances.""" debate_state = {"topic": topic, "participants": participants, "rounds": []}
for round_num in range(rounds): round_responses = {}
for participant in participants: response = await self.query_agent( instance_id=self.instances[participant]["instance_id"], prompt=f""" Debate topic: {topic}
Previous arguments: {json.dumps(debate_state["rounds"], indent=2)}
Present your analysis. """ ) round_responses[participant] = response
debate_state["rounds"].append(round_responses)
return debate_stateRisk Management Layer
Before any trade executes, the risk manager validates:
class RiskManager: """Risk management layer for trading system."""
def __init__(self, max_position: float = 0.05, max_sector: float = 0.25): self.max_position = max_position # 5% max per position self.max_sector = max_sector # 25% max per sector
def validate_recommendation(self, recommendation: dict) -> dict: """Validate and adjust portfolio recommendations.""" adjusted_positions = []
for position in recommendation.get("positions", []): size = min(position["size"], self.max_position)
# Check sector concentration sector_exposure = self._calculate_sector_exposure( adjusted_positions, position["sector"] )
if sector_exposure + size > self.max_sector: size = max(0, self.max_sector - sector_exposure)
adjusted_positions.append({ **position, "size": size, "risk_adjusted": True })
return { **recommendation, "positions": adjusted_positions, "total_exposure": sum(p["size"] for p in adjusted_positions) }Monitoring and Observability
I generate daily reports to track system health:
from dataclasses import dataclassfrom datetime import datetime
@dataclassclass SystemMetrics: total_trades: int winning_trades: int total_pnl: float sharpe_ratio: float max_drawdown: float active_prompts: int killed_prompts: int
def generate_daily_report(metrics: SystemMetrics) -> str: """Generate daily performance report.""" return f""" # Daily Trading Agent Report Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}
## Performance Metrics - Total PnL: ${metrics.total_pnl:,.2f} - Sharpe Ratio: {metrics.sharpe_ratio:.2f} - Win Rate: {metrics.winning_trades / max(metrics.total_trades, 1):.1%} - Max Drawdown: {metrics.max_drawdown:.1%}
## Prompt Evolution - Active Prompts: {metrics.active_prompts} - Killed Prompts: {metrics.killed_prompts} """Common Pitfalls I Hit
Overfitting to recent performance: I initially evaluated prompts on just 10 trades. Big mistake. Market regimes change. Now I require 30 minimum trades and test across different volatility environments.
Agent echo chamber: My first version had agents just agreeing with each other. Now I assign adversarial roles—one agent must always argue the bull case, another the bear case.
Slow evolution: Initially I only evaluated prompts monthly. That’s too slow for fast markets. Now I evaluate weekly and can spin up new variants within hours.
Git noise: Every prompt commit was cluttering the repo. I added thresholds—only commit when Sharpe exceeds 1.0 or drops below 0.3.
Results After 100 Trading Days
After running this for 100 trading days:
- Active prompts: Started with 12, 5 survived, 8 killed, 3 new variants created
- Best prompt: Sharpe ratio of 1.8 over 47 trades
- Worst prompt: Sharpe of -0.3 over 35 trades (killed)
- System Sharpe: 1.2 overall (combining all active prompts)
The key insight: treating prompts as evolving artifacts rather than static instructions creates continuous improvement without manual intervention.
Implementation Roadmap
If you’re building this yourself:
- Start simple: Build one layer (macro) first
- Add debate: Get 2-3 agents arguing before adding complexity
- Implement scoring: Basic Sharpe ratio tracking
- Version control: Git-based prompt management
- Expand: Add remaining layers
- Risk layer: Position sizing and validation
The Reddit post mentioned 378 trading days. I’m at 100 and still learning. But the system improves itself—that’s the breakthrough.
Final Words + More Resources
My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me
Here are also the most important links from this article along with some further resources that will help you in this scope:
- 👨💻 LangGraph Documentation
- 👨💻 r/algotrading Discussion
- 👨💻 OpenClaw
Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!
Comments