How Do You Stop AI Agents From Infinite Loops?
Problem
I deployed an autonomous AI agent to handle a batch file processing task. When I checked back two hours later, the agent was still running, stuck in an endless reasoning loop. My API costs had exploded to $400 for a task that should have taken $5.
Hereβs what I found in the logs:
$ grep "iteration" agent.log | wc -l2847
$ grep "token_usage" agent.log | tail -5token_usage: 127892 (iteration 2843)token_usage: 127892 (iteration 2844)token_usage: 127892 (iteration 2845)token_usage: 127892 (iteration 2846)token_usage: 127892 (iteration 2847)The agent had called the LLM API 2,847 times with a full 128k context window. Over and over. Same input. Same output. No progress.
What happened?
I built a simple autonomous agent using LangGraph to process a directory of files. The agent would:
- Read a file
- Process it with an LLM
- Write the result
- Move to the next file
But when the agent encountered an ambiguous file, it got stuck:
from langgraph.graph import StateGraph, END
def process_file(state): file = state["current_file"] content = read_file(file)
# This can loop forever if the LLM is unsure response = llm.invoke(f"Process this file: {content}")
if "DONE" in response: return {"status": "complete"} else: # Try again... and again... and again return {"status": "retry"}
workflow = StateGraph(AgentState)workflow.add_node("process", process_file)workflow.add_edge("process", "process") # No exit condition!workflow.add_edge("process", END)The agent had no hard termination condition. When the LLM couldnβt determine if the file was βdone,β it just kept retrying with the same input.
This is exactly what happened with OpenClaw at scale. A Reddit discussion revealed:
βWhen OpenClaw gets confused, it enters an endless reasoning loopβ¦ Takes its entire 128k context window and slams it into the API. Over. And over. And over.β
βMillions of ghost agents, running 24/7 on old computers sitting in closetsβ
The result? What amounts to a decentralized, global DDoS attack on LLM APIs.
Why does this happen?
Autonomous AI agents loop forever for four reasons:
1. Task Ambiguity
The agent cannot determine a clear completion condition. When I gave my agent the instruction βprocess this file,β I didnβt define what βdoneβ looks like.
2. Context Saturation
The agent fills its context window and resubmits it repeatedly. Each iteration costs the same but produces no progress.
3. Progress Blindness
No mechanism detects that reasoning is circling. The agent doesnβt know itβs stuck.
4. Retry Escalation
Errors trigger indefinite retry attempts without backoff or termination.
How I fixed it
I implemented multiple layers of termination conditions. Single guards fail; multi-layer defense works.
Layer 1: Hard Iteration Cap
The simplest and most important guard. Set a maximum number of reasoning steps per task. No exceptions.
from dataclasses import dataclassfrom typing import Optional
@dataclassclass AgentConfig: max_iterations: int = 50 # Hard cap, no exceptions iteration_warning_threshold: int = 30 # Warn before hitting cap
class CappedAgent: def __init__(self, config: AgentConfig): self.config = config self.iteration_count = 0
def process(self, task): self.iteration_count = 0
while self.iteration_count < self.config.max_iterations: self.iteration_count += 1
# Warning threshold if self.iteration_count == self.config.iteration_warning_threshold: self.logger.warning( f"Approaching iteration cap: {self.iteration_count}/{self.config.max_iterations}" )
result = self.step(task)
if result.is_complete: return result
# Hit the cap raise RuntimeError( f"Agent exceeded maximum iterations ({self.config.max_iterations}). " f"Task may be ambiguous or require human intervention." )When I tested this:
$ python capped_agent.pyWARNING: Approaching iteration cap: 30/50ERROR: Agent exceeded maximum iterations (50). Task may be ambiguous.The agent now terminates instead of running forever.
Layer 2: Token Budget Enforcement
Track cumulative token usage. Kill the agent when the budget is exhausted.
import tiktokenfrom typing import Optional
class TokenBudget: def __init__( self, max_tokens: int = 500_000, # $50 worth of tokens warn_at_percent: float = 0.8 ): self.max_tokens = max_tokens self.warn_threshold = max_tokens * warn_at_percent self.used_tokens = 0 self.encoder = tiktoken.encoding_for_model("gpt-4")
def count_tokens(self, text: str) -> int: return len(self.encoder.encode(text))
def track_usage(self, input_tokens: int, output_tokens: int) -> None: self.used_tokens += input_tokens + output_tokens
if self.used_tokens >= self.warn_threshold: percent_used = (self.used_tokens / self.max_tokens) * 100 self.logger.warning( f"Token budget at {percent_used:.1f}% ({self.used_tokens:,}/{self.max_tokens:,})" )
if self.used_tokens >= self.max_tokens: raise RuntimeError( f"Token budget exhausted: {self.used_tokens:,} tokens used. " f"Increase budget or simplify task." )
def remaining(self) -> int: return max(0, self.max_tokens - self.used_tokens)
class BudgetAwareAgent: def __init__(self, budget: TokenBudget): self.budget = budget
async def reason(self, prompt: str) -> str: # Check budget before call input_tokens = self.budget.count_tokens(prompt)
if input_tokens > self.budget.remaining(): raise RuntimeError("Insufficient token budget for this request")
# Make LLM call response = await self.llm.invoke(prompt)
# Track usage output_tokens = self.budget.count_tokens(response) self.budget.track_usage(input_tokens, output_tokens)
return responseNow I get cost control:
$ python budget_agent.pyWARNING: Token budget at 80.0% (400,000/500,000)ERROR: Token budget exhausted: 502,341 tokens used.Layer 3: Progress Detection
Compare consecutive reasoning outputs. If similarity exceeds threshold, the agent is stuck.
from difflib import SequenceMatcherfrom typing import List, Optionalfrom dataclasses import dataclass
@dataclassclass ProgressCheck: iteration: int output: str similarity_to_previous: float
class ProgressDetector: def __init__( self, similarity_threshold: float = 0.9, # 90% similarity = stuck history_size: int = 5 ): self.similarity_threshold = similarity_threshold self.history_size = history_size self.output_history: List[str] = []
def check_progress(self, current_output: str) -> ProgressCheck: """Check if the agent is making progress or spinning."""
similarity = 0.0 if self.output_history: # Compare to most recent output similarity = SequenceMatcher( None, self.output_history[-1], current_output ).ratio()
# Add to history self.output_history.append(current_output) if len(self.output_history) > self.history_size: self.output_history.pop(0)
return ProgressCheck( iteration=len(self.output_history), output=current_output, similarity_to_previous=similarity )
def is_stuck(self, current_output: str) -> bool: """Return True if agent appears stuck.""" check = self.check_progress(current_output)
if check.similarity_to_previous >= self.similarity_threshold: self.logger.warning( f"Output similarity {check.similarity_to_previous:.1%} exceeds threshold. " f"Agent may be stuck." ) return True
return False
class ProgressAwareAgent: def __init__(self): self.progress_detector = ProgressDetector()
async def step(self, prompt: str) -> str: response = await self.llm.invoke(prompt)
if self.progress_detector.is_stuck(response): raise RuntimeError( "Agent stuck: consecutive outputs too similar. " "Possible infinite loop detected." )
return responseTesting this:
$ python progress_agent.pyWARNING: Output similarity 94.2% exceeds threshold. Agent may be stuck.ERROR: Agent stuck: consecutive outputs too similar. Possible infinite loop detected.Layer 4: Semantic Completion Check
Before each iteration, ask the LLM: βIs the task done?β Require explicit confirmation.
from pydantic import BaseModelfrom typing import Optional
class CompletionCheck(BaseModel): is_complete: bool confidence: float # 0.0 to 1.0 reason: str
class CompletionValidator: def __init__(self, min_confidence: float = 0.8): self.min_confidence = min_confidence
async def check_completion( self, original_task: str, current_state: str ) -> CompletionCheck: """Ask LLM to evaluate if task is complete."""
prompt = f"""Given this task and current state, is the task complete?
Task: {original_task}
Current State: {current_state}
Respond with:- is_complete: true/false- confidence: 0.0-1.0- reason: brief explanation"""
response = await self.llm.invoke(prompt) check = CompletionCheck.parse_raw(response)
return check
class ValidatedAgent: def __init__(self): self.validator = CompletionValidator()
async def process(self, task: str): state = await self.initialize(task)
for iteration in range(self.max_iterations): # Do work state = await self.step(state)
# Check if done completion = await self.validator.check_completion(task, state)
if completion.is_complete and completion.confidence >= self.min_confidence: return state
self.logger.info( f"Iteration {iteration}: {completion.confidence:.0%} confident, " f"{completion.reason}" )
raise RuntimeError("Max iterations reached without completion")Layer 5: Time-Based Circuit Breaker
Absolute timeout regardless of progress. Prevents zombie agents running for days.
import asynciofrom datetime import datetime, timedeltafrom typing import Optional
class TimeCircuitBreaker: def __init__( self, max_runtime_minutes: int = 30, warn_at_minutes: int = 20 ): self.max_runtime = timedelta(minutes=max_runtime_minutes) self.warn_runtime = timedelta(minutes=warn_at_minutes) self.start_time: Optional[datetime] = None
def start(self) -> None: self.start_time = datetime.now()
def check(self) -> None: if not self.start_time: return
elapsed = datetime.now() - self.start_time
if elapsed >= self.max_runtime: raise RuntimeError( f"Agent exceeded maximum runtime of {self.max_runtime}. " f"Terminating to prevent zombie agent." )
if elapsed >= self.warn_runtime: remaining = self.max_runtime - elapsed self.logger.warning( f"Approaching runtime limit. {remaining.seconds // 60} minutes remaining." )
class TimeoutAgent: def __init__(self): self.breaker = TimeCircuitBreaker(max_runtime_minutes=30)
async def process(self, task: str): self.breaker.start()
while True: self.breaker.check()
result = await self.step(task)
if result.is_complete: return result
await asyncio.sleep(1) # Small delay between iterationsThe complete solution
I combined all five layers into a production-ready agent:
from dataclasses import dataclassfrom typing import Optional, Anyimport asynciofrom datetime import datetime
@dataclassclass TerminationGuards: max_iterations: int = 50 max_tokens: int = 500_000 max_runtime_minutes: int = 30 similarity_threshold: float = 0.9 completion_confidence: float = 0.8
class ProductionAgent: def __init__(self, guards: TerminationGuards): self.guards = guards
# Layer 1: Iteration cap self.iteration = 0
# Layer 2: Token budget self.tokens_used = 0
# Layer 3: Progress detector self.last_output: Optional[str] = None
# Layer 5: Time breaker self.start_time: Optional[datetime] = None
async def process(self, task: str) -> Any: """Process task with all termination guards active."""
# Initialize time breaker self.start_time = datetime.now()
while True: # Layer 1: Check iteration cap self.iteration += 1 if self.iteration > self.guards.max_iterations: raise RuntimeError( f"Iteration cap exceeded: {self.iteration}/{self.guards.max_iterations}" )
# Layer 2: Check token budget if self.tokens_used > self.guards.max_tokens: raise RuntimeError( f"Token budget exhausted: {self.tokens_used:,}/{self.guards.max_tokens:,}" )
# Layer 5: Check runtime elapsed = datetime.now() - self.start_time if elapsed.total_seconds() > self.guards.max_runtime_minutes * 60: raise RuntimeError( f"Runtime exceeded: {elapsed.total_seconds() / 60:.1f} minutes" )
# Execute step result = await self.step(task)
# Layer 3: Check progress (stuck detection) if self.last_output and self._similarity(result.output, self.last_output) > self.guards.similarity_threshold: raise RuntimeError("Agent stuck: consecutive outputs too similar")
self.last_output = result.output
# Layer 4: Check completion if result.is_complete and result.confidence >= self.guards.completion_confidence: return result
# Log progress self.logger.info( f"Iteration {self.iteration}: " f"tokens={self.tokens_used:,}, " f"runtime={elapsed.total_seconds():.0f}s, " f"confidence={result.confidence:.0%}" )
def _similarity(self, a: str, b: str) -> float: from difflib import SequenceMatcher return SequenceMatcher(None, a, b).ratio()
# Usageagent = ProductionAgent(TerminationGuards( max_iterations=50, max_tokens=500_000, max_runtime_minutes=30))
result = await agent.process("Process all files in /data/input")Common mistakes
I made these mistakes before I understood the problem:
1. Relying on single termination condition
One guard is not enough. Iteration caps fail when tasks genuinely need more steps. Token budgets fail when tasks are token-heavy but bounded. Progress detection fails when outputs legitimately vary. Always use multiple guards.
2. Setting caps too high
If your iteration cap is 100, your task design is probably wrong. Most well-defined tasks should complete in 10-30 iterations.
3. No progress metrics
You cannot improve what you do not measure. Track iteration counts, token usage, and runtime for every agent execution.
4. Ignoring error patterns
Repeated errors often signal a loop. When the same error occurs 3+ times, something is wrong.
5. Per-session limits only
Session limits prevent one agent from running forever. But millions of agents with 50-iteration caps can still create problems. Add daily and weekly limits.
Summary
In this post, I showed how autonomous AI agents can enter infinite reasoning loops that waste API costs and strain infrastructure. The OpenClaw incident demonstrated this at scale: millions of confused agents slamming 128k context windows into APIs continuously.
The solution is multi-layered termination conditions:
- Hard iteration cap - maximum reasoning steps per task
- Token budget enforcement - kill when budget exhausted
- Progress detection - detect when outputs stop changing
- Semantic completion check - require explicit βdoneβ confirmation
- Time-based circuit breaker - absolute timeout regardless of progress
Single guards fail. Multi-layer defense works. Your agents will get stuck eventually. The question is whether you have the guards in place to catch them.
Final Words + More Resources
My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me
Here are also the most important links from this article along with some further resources that will help you in this scope:
- π¨βπ» Reddit: OpenClaw Infrastructure Discussion
- π¨βπ» Circuit Breaker Pattern
- π¨βπ» LangGraph Documentation
Oh, and if you found these resources useful, donβt forget to support me by starring the repo on GitHub!
Comments