Skip to content

How Do You Stop AI Agents From Infinite Loops?

Problem

I deployed an autonomous AI agent to handle a batch file processing task. When I checked back two hours later, the agent was still running, stuck in an endless reasoning loop. My API costs had exploded to $400 for a task that should have taken $5.

Here’s what I found in the logs:

Terminal window
$ grep "iteration" agent.log | wc -l
2847
$ grep "token_usage" agent.log | tail -5
token_usage: 127892 (iteration 2843)
token_usage: 127892 (iteration 2844)
token_usage: 127892 (iteration 2845)
token_usage: 127892 (iteration 2846)
token_usage: 127892 (iteration 2847)

The agent had called the LLM API 2,847 times with a full 128k context window. Over and over. Same input. Same output. No progress.

What happened?

I built a simple autonomous agent using LangGraph to process a directory of files. The agent would:

  1. Read a file
  2. Process it with an LLM
  3. Write the result
  4. Move to the next file

But when the agent encountered an ambiguous file, it got stuck:

naive_agent.py
from langgraph.graph import StateGraph, END
def process_file(state):
file = state["current_file"]
content = read_file(file)
# This can loop forever if the LLM is unsure
response = llm.invoke(f"Process this file: {content}")
if "DONE" in response:
return {"status": "complete"}
else:
# Try again... and again... and again
return {"status": "retry"}
workflow = StateGraph(AgentState)
workflow.add_node("process", process_file)
workflow.add_edge("process", "process") # No exit condition!
workflow.add_edge("process", END)

The agent had no hard termination condition. When the LLM couldn’t determine if the file was β€œdone,” it just kept retrying with the same input.

This is exactly what happened with OpenClaw at scale. A Reddit discussion revealed:

β€œWhen OpenClaw gets confused, it enters an endless reasoning loop… Takes its entire 128k context window and slams it into the API. Over. And over. And over.”

β€œMillions of ghost agents, running 24/7 on old computers sitting in closets”

The result? What amounts to a decentralized, global DDoS attack on LLM APIs.

Why does this happen?

Autonomous AI agents loop forever for four reasons:

1. Task Ambiguity

The agent cannot determine a clear completion condition. When I gave my agent the instruction β€œprocess this file,” I didn’t define what β€œdone” looks like.

2. Context Saturation

The agent fills its context window and resubmits it repeatedly. Each iteration costs the same but produces no progress.

3. Progress Blindness

No mechanism detects that reasoning is circling. The agent doesn’t know it’s stuck.

4. Retry Escalation

Errors trigger indefinite retry attempts without backoff or termination.

How I fixed it

I implemented multiple layers of termination conditions. Single guards fail; multi-layer defense works.

Layer 1: Hard Iteration Cap

The simplest and most important guard. Set a maximum number of reasoning steps per task. No exceptions.

iteration_cap.py
from dataclasses import dataclass
from typing import Optional
@dataclass
class AgentConfig:
max_iterations: int = 50 # Hard cap, no exceptions
iteration_warning_threshold: int = 30 # Warn before hitting cap
class CappedAgent:
def __init__(self, config: AgentConfig):
self.config = config
self.iteration_count = 0
def process(self, task):
self.iteration_count = 0
while self.iteration_count < self.config.max_iterations:
self.iteration_count += 1
# Warning threshold
if self.iteration_count == self.config.iteration_warning_threshold:
self.logger.warning(
f"Approaching iteration cap: {self.iteration_count}/{self.config.max_iterations}"
)
result = self.step(task)
if result.is_complete:
return result
# Hit the cap
raise RuntimeError(
f"Agent exceeded maximum iterations ({self.config.max_iterations}). "
f"Task may be ambiguous or require human intervention."
)

When I tested this:

Terminal window
$ python capped_agent.py
WARNING: Approaching iteration cap: 30/50
ERROR: Agent exceeded maximum iterations (50). Task may be ambiguous.

The agent now terminates instead of running forever.

Layer 2: Token Budget Enforcement

Track cumulative token usage. Kill the agent when the budget is exhausted.

token_budget.py
import tiktoken
from typing import Optional
class TokenBudget:
def __init__(
self,
max_tokens: int = 500_000, # $50 worth of tokens
warn_at_percent: float = 0.8
):
self.max_tokens = max_tokens
self.warn_threshold = max_tokens * warn_at_percent
self.used_tokens = 0
self.encoder = tiktoken.encoding_for_model("gpt-4")
def count_tokens(self, text: str) -> int:
return len(self.encoder.encode(text))
def track_usage(self, input_tokens: int, output_tokens: int) -> None:
self.used_tokens += input_tokens + output_tokens
if self.used_tokens >= self.warn_threshold:
percent_used = (self.used_tokens / self.max_tokens) * 100
self.logger.warning(
f"Token budget at {percent_used:.1f}% ({self.used_tokens:,}/{self.max_tokens:,})"
)
if self.used_tokens >= self.max_tokens:
raise RuntimeError(
f"Token budget exhausted: {self.used_tokens:,} tokens used. "
f"Increase budget or simplify task."
)
def remaining(self) -> int:
return max(0, self.max_tokens - self.used_tokens)
class BudgetAwareAgent:
def __init__(self, budget: TokenBudget):
self.budget = budget
async def reason(self, prompt: str) -> str:
# Check budget before call
input_tokens = self.budget.count_tokens(prompt)
if input_tokens > self.budget.remaining():
raise RuntimeError("Insufficient token budget for this request")
# Make LLM call
response = await self.llm.invoke(prompt)
# Track usage
output_tokens = self.budget.count_tokens(response)
self.budget.track_usage(input_tokens, output_tokens)
return response

Now I get cost control:

Terminal window
$ python budget_agent.py
WARNING: Token budget at 80.0% (400,000/500,000)
ERROR: Token budget exhausted: 502,341 tokens used.

Layer 3: Progress Detection

Compare consecutive reasoning outputs. If similarity exceeds threshold, the agent is stuck.

progress_detection.py
from difflib import SequenceMatcher
from typing import List, Optional
from dataclasses import dataclass
@dataclass
class ProgressCheck:
iteration: int
output: str
similarity_to_previous: float
class ProgressDetector:
def __init__(
self,
similarity_threshold: float = 0.9, # 90% similarity = stuck
history_size: int = 5
):
self.similarity_threshold = similarity_threshold
self.history_size = history_size
self.output_history: List[str] = []
def check_progress(self, current_output: str) -> ProgressCheck:
"""Check if the agent is making progress or spinning."""
similarity = 0.0
if self.output_history:
# Compare to most recent output
similarity = SequenceMatcher(
None,
self.output_history[-1],
current_output
).ratio()
# Add to history
self.output_history.append(current_output)
if len(self.output_history) > self.history_size:
self.output_history.pop(0)
return ProgressCheck(
iteration=len(self.output_history),
output=current_output,
similarity_to_previous=similarity
)
def is_stuck(self, current_output: str) -> bool:
"""Return True if agent appears stuck."""
check = self.check_progress(current_output)
if check.similarity_to_previous >= self.similarity_threshold:
self.logger.warning(
f"Output similarity {check.similarity_to_previous:.1%} exceeds threshold. "
f"Agent may be stuck."
)
return True
return False
class ProgressAwareAgent:
def __init__(self):
self.progress_detector = ProgressDetector()
async def step(self, prompt: str) -> str:
response = await self.llm.invoke(prompt)
if self.progress_detector.is_stuck(response):
raise RuntimeError(
"Agent stuck: consecutive outputs too similar. "
"Possible infinite loop detected."
)
return response

Testing this:

Terminal window
$ python progress_agent.py
WARNING: Output similarity 94.2% exceeds threshold. Agent may be stuck.
ERROR: Agent stuck: consecutive outputs too similar. Possible infinite loop detected.

Layer 4: Semantic Completion Check

Before each iteration, ask the LLM: β€œIs the task done?” Require explicit confirmation.

completion_check.py
from pydantic import BaseModel
from typing import Optional
class CompletionCheck(BaseModel):
is_complete: bool
confidence: float # 0.0 to 1.0
reason: str
class CompletionValidator:
def __init__(self, min_confidence: float = 0.8):
self.min_confidence = min_confidence
async def check_completion(
self,
original_task: str,
current_state: str
) -> CompletionCheck:
"""Ask LLM to evaluate if task is complete."""
prompt = f"""Given this task and current state, is the task complete?
Task: {original_task}
Current State: {current_state}
Respond with:
- is_complete: true/false
- confidence: 0.0-1.0
- reason: brief explanation
"""
response = await self.llm.invoke(prompt)
check = CompletionCheck.parse_raw(response)
return check
class ValidatedAgent:
def __init__(self):
self.validator = CompletionValidator()
async def process(self, task: str):
state = await self.initialize(task)
for iteration in range(self.max_iterations):
# Do work
state = await self.step(state)
# Check if done
completion = await self.validator.check_completion(task, state)
if completion.is_complete and completion.confidence >= self.min_confidence:
return state
self.logger.info(
f"Iteration {iteration}: {completion.confidence:.0%} confident, "
f"{completion.reason}"
)
raise RuntimeError("Max iterations reached without completion")

Layer 5: Time-Based Circuit Breaker

Absolute timeout regardless of progress. Prevents zombie agents running for days.

time_breaker.py
import asyncio
from datetime import datetime, timedelta
from typing import Optional
class TimeCircuitBreaker:
def __init__(
self,
max_runtime_minutes: int = 30,
warn_at_minutes: int = 20
):
self.max_runtime = timedelta(minutes=max_runtime_minutes)
self.warn_runtime = timedelta(minutes=warn_at_minutes)
self.start_time: Optional[datetime] = None
def start(self) -> None:
self.start_time = datetime.now()
def check(self) -> None:
if not self.start_time:
return
elapsed = datetime.now() - self.start_time
if elapsed >= self.max_runtime:
raise RuntimeError(
f"Agent exceeded maximum runtime of {self.max_runtime}. "
f"Terminating to prevent zombie agent."
)
if elapsed >= self.warn_runtime:
remaining = self.max_runtime - elapsed
self.logger.warning(
f"Approaching runtime limit. {remaining.seconds // 60} minutes remaining."
)
class TimeoutAgent:
def __init__(self):
self.breaker = TimeCircuitBreaker(max_runtime_minutes=30)
async def process(self, task: str):
self.breaker.start()
while True:
self.breaker.check()
result = await self.step(task)
if result.is_complete:
return result
await asyncio.sleep(1) # Small delay between iterations

The complete solution

I combined all five layers into a production-ready agent:

production_agent.py
from dataclasses import dataclass
from typing import Optional, Any
import asyncio
from datetime import datetime
@dataclass
class TerminationGuards:
max_iterations: int = 50
max_tokens: int = 500_000
max_runtime_minutes: int = 30
similarity_threshold: float = 0.9
completion_confidence: float = 0.8
class ProductionAgent:
def __init__(self, guards: TerminationGuards):
self.guards = guards
# Layer 1: Iteration cap
self.iteration = 0
# Layer 2: Token budget
self.tokens_used = 0
# Layer 3: Progress detector
self.last_output: Optional[str] = None
# Layer 5: Time breaker
self.start_time: Optional[datetime] = None
async def process(self, task: str) -> Any:
"""Process task with all termination guards active."""
# Initialize time breaker
self.start_time = datetime.now()
while True:
# Layer 1: Check iteration cap
self.iteration += 1
if self.iteration > self.guards.max_iterations:
raise RuntimeError(
f"Iteration cap exceeded: {self.iteration}/{self.guards.max_iterations}"
)
# Layer 2: Check token budget
if self.tokens_used > self.guards.max_tokens:
raise RuntimeError(
f"Token budget exhausted: {self.tokens_used:,}/{self.guards.max_tokens:,}"
)
# Layer 5: Check runtime
elapsed = datetime.now() - self.start_time
if elapsed.total_seconds() > self.guards.max_runtime_minutes * 60:
raise RuntimeError(
f"Runtime exceeded: {elapsed.total_seconds() / 60:.1f} minutes"
)
# Execute step
result = await self.step(task)
# Layer 3: Check progress (stuck detection)
if self.last_output and self._similarity(result.output, self.last_output) > self.guards.similarity_threshold:
raise RuntimeError("Agent stuck: consecutive outputs too similar")
self.last_output = result.output
# Layer 4: Check completion
if result.is_complete and result.confidence >= self.guards.completion_confidence:
return result
# Log progress
self.logger.info(
f"Iteration {self.iteration}: "
f"tokens={self.tokens_used:,}, "
f"runtime={elapsed.total_seconds():.0f}s, "
f"confidence={result.confidence:.0%}"
)
def _similarity(self, a: str, b: str) -> float:
from difflib import SequenceMatcher
return SequenceMatcher(None, a, b).ratio()
# Usage
agent = ProductionAgent(TerminationGuards(
max_iterations=50,
max_tokens=500_000,
max_runtime_minutes=30
))
result = await agent.process("Process all files in /data/input")

Common mistakes

I made these mistakes before I understood the problem:

1. Relying on single termination condition

One guard is not enough. Iteration caps fail when tasks genuinely need more steps. Token budgets fail when tasks are token-heavy but bounded. Progress detection fails when outputs legitimately vary. Always use multiple guards.

2. Setting caps too high

If your iteration cap is 100, your task design is probably wrong. Most well-defined tasks should complete in 10-30 iterations.

3. No progress metrics

You cannot improve what you do not measure. Track iteration counts, token usage, and runtime for every agent execution.

4. Ignoring error patterns

Repeated errors often signal a loop. When the same error occurs 3+ times, something is wrong.

5. Per-session limits only

Session limits prevent one agent from running forever. But millions of agents with 50-iteration caps can still create problems. Add daily and weekly limits.

Summary

In this post, I showed how autonomous AI agents can enter infinite reasoning loops that waste API costs and strain infrastructure. The OpenClaw incident demonstrated this at scale: millions of confused agents slamming 128k context windows into APIs continuously.

The solution is multi-layered termination conditions:

  1. Hard iteration cap - maximum reasoning steps per task
  2. Token budget enforcement - kill when budget exhausted
  3. Progress detection - detect when outputs stop changing
  4. Semantic completion check - require explicit β€œdone” confirmation
  5. Time-based circuit breaker - absolute timeout regardless of progress

Single guards fail. Multi-layer defense works. Your agents will get stuck eventually. The question is whether you have the guards in place to catch them.

Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments