AI Agent Error Recovery: Retry Logic, Circuit Breakers, Fallback Models
Production AI agents fail in specific, predictable ways. Rate limit errors from the provider. Timeout errors on slow tool calls. Occasional API outages. Model responses that don't parse correctly. Cascading failures when one service dependency goes down.
Most early-stage agent code handles none of this. It calls the API, gets a response, assumes success. In development, that works. In production with real users, you need actual error recovery patterns.
This guide covers the specific patterns that matter most: retry with exponential backoff, circuit breakers, fallback model routing, and graceful degradation. All with real code.
The failure taxonomy
Before patterns, it helps to categorize the failures you'll actually encounter:
Transient failures. Temporary rate limit errors (429), occasional server errors (503), brief network timeouts. These resolve on their own and retry is appropriate.
Persistent failures. Provider outage, quota exhausted for the month, invalid API key. Retrying won't help. You need fallback logic or user-facing communication.
Bad input failures. Content policy violations (400), malformed requests, context window exceeded. Retrying with the same input will just fail again. You need to modify the request or route it differently.
Partial failures. The API returned a response but it wasn't in the expected format, or the tool call syntax was wrong. These are often recoverable with a correction prompt.
Timeout failures. The request timed out before the model responded. Could be transient (retry) or could indicate a request that's too complex for the model (need to simplify).
Each failure category needs different handling. A single except Exception: retry block is worse than no error handling at all because it retries everything, including failures where retrying wastes time and money.
Retry with exponential backoff
For transient failures, exponential backoff with jitter is the standard approach. It prevents the "thundering herd" problem where many clients retry simultaneously after a rate limit, which just creates another rate limit.
import asyncio
import random
import logging
from typing import TypeVar, Callable, Awaitable
T = TypeVar('T')
logger = logging.getLogger(__name__)
async def retry_with_backoff(
func: Callable[[], Awaitable[T]],
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
retryable_status_codes: set[int] = {429, 500, 502, 503, 504},
) -> T:
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func()
except Exception as exc:
last_exception = exc
# Check if this is a retryable error
status_code = getattr(exc, 'status_code', None)
if status_code and status_code not in retryable_status_codes:
logger.warning(
f"Non-retryable error (status {status_code}): {exc}"
)
raise
if attempt == max_retries:
break
# Exponential backoff with full jitter
delay = min(
base_delay * (2 ** attempt) + random.uniform(0, 1),
max_delay
)
logger.info(
f"Attempt {attempt + 1}/{max_retries} failed: {exc}. "
f"Retrying in {delay:.2f}s"
)
await asyncio.sleep(delay)
raise last_exception
Usage:
response = await retry_with_backoff(
lambda: anthropic_client.messages.create(
model="claude-3-7-sonnet-20250219",
max_tokens=1000,
messages=messages
)
)
The full jitter (random.uniform(0, 1)) is important. Without it, all clients that hit a rate limit at the same time will retry at the same intervals, recreating the spike. The jitter spreads retries out over time.
Handling rate limit headers
Anthropic and most other providers return headers that tell you when you can retry. Using these is much better than guessing:
import anthropic
from datetime import datetime
async def call_with_rate_limit_respect(
client: anthropic.AsyncAnthropic,
**kwargs
) -> anthropic.Message:
for attempt in range(4):
try:
return await client.messages.create(**kwargs)
except anthropic.RateLimitError as e:
if attempt == 3:
raise
# Anthropic returns retry-after in the response headers
# The anthropic SDK wraps this in the exception
retry_after = getattr(e, 'retry_after', None)
if retry_after:
wait_seconds = float(retry_after) + random.uniform(0, 0.5)
logger.info(f"Rate limited. Waiting {wait_seconds:.1f}s per header")
await asyncio.sleep(wait_seconds)
else:
# Fallback: exponential backoff if no header
delay = 2 ** attempt + random.uniform(0, 1)
await asyncio.sleep(delay)
OpenAI's client provides x-ratelimit-reset-requests and x-ratelimit-reset-tokens headers. If you're managing your own HTTP client (not using the SDK), parse these directly.
Circuit breaker pattern
When a service is consistently failing, retrying every request wastes time and adds load. The circuit breaker pattern short-circuits failed requests immediately when the failure rate exceeds a threshold, then periodically tries to recover.
import time
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing: reject requests immediately
HALF_OPEN = "half_open" # Testing: allow a probe request through
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: float = 60.0,
):
self.failure_threshold = failure_threshold
self.success_threshold = success_threshold
self.timeout = timeout
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0.0
self._lock = Lock()
def can_proceed(self) -> bool:
with self._lock:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# Check if timeout has elapsed
if time.time() - self.last_failure_time >= self.timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
return True
return False
# HALF_OPEN: allow through
return True
def record_success(self) -> None:
with self._lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
elif self.state == CircuitState.CLOSED:
self.failure_count = max(0, self.failure_count - 1)
def record_failure(self) -> None:
with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
if (self.state == CircuitState.CLOSED and
self.failure_count >= self.failure_threshold):
self.state = CircuitState.OPEN
logger.warning(
f"Circuit breaker opened after {self.failure_count} failures"
)
elif self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
class CircuitOpenError(Exception):
pass
async def call_with_circuit_breaker(
circuit: CircuitBreaker,
func: Callable[[], Awaitable[T]]
) -> T:
if not circuit.can_proceed():
raise CircuitOpenError("Circuit breaker is open. Skipping request.")
try:
result = await func()
circuit.record_success()
return result
except Exception as exc:
circuit.record_failure()
raise
Keep one circuit breaker instance per external service, not per request:
# Module-level circuit breakers
anthropic_circuit = CircuitBreaker(failure_threshold=5, timeout=30.0)
openai_circuit = CircuitBreaker(failure_threshold=5, timeout=30.0)
Fallback model routing
When your primary model fails or the circuit is open, routing to a fallback model keeps your agent working. This pattern is especially useful when you're calling expensive frontier models and can fall back to a cheaper model for less critical tasks.
from dataclasses import dataclass
from typing import Optional
@dataclass
class ModelConfig:
model: str
client: any
circuit: CircuitBreaker
max_tokens: int
cost_per_mtok_input: float # Cost per million input tokens
cost_per_mtok_output: float
class FallbackRouter:
def __init__(self, models: list[ModelConfig]):
# Ordered by preference (first = primary)
self.models = models
async def call(
self,
messages: list[dict],
max_tokens: int = 1000,
system: Optional[str] = None
) -> tuple[any, ModelConfig]:
errors = []
for model_config in self.models:
if not model_config.circuit.can_proceed():
logger.info(
f"Skipping {model_config.model}: circuit open"
)
continue
try:
kwargs = {
"model": model_config.model,
"max_tokens": min(max_tokens, model_config.max_tokens),
"messages": messages,
}
if system:
kwargs["system"] = system
response = await retry_with_backoff(
lambda: model_config.client.messages.create(**kwargs),
max_retries=2
)
model_config.circuit.record_success()
return response, model_config
except CircuitOpenError:
continue
except Exception as exc:
model_config.circuit.record_failure()
errors.append(f"{model_config.model}: {exc}")
logger.warning(
f"Model {model_config.model} failed: {exc}. Trying fallback."
)
raise Exception(
f"All models failed. Errors: {'; '.join(errors)}"
)
A real router configuration:
import anthropic
import openai
router = FallbackRouter([
ModelConfig(
model="claude-3-7-sonnet-20250219",
client=anthropic.AsyncAnthropic(),
circuit=CircuitBreaker(failure_threshold=5, timeout=30.0),
max_tokens=8096,
cost_per_mtok_input=3.0,
cost_per_mtok_output=15.0,
),
ModelConfig(
model="gpt-4o",
client=openai.AsyncOpenAI(),
circuit=CircuitBreaker(failure_threshold=5, timeout=30.0),
max_tokens=4096,
cost_per_mtok_input=2.5,
cost_per_mtok_output=10.0,
),
# Last resort: cheaper, faster, less capable
ModelConfig(
model="claude-3-5-haiku-20241022",
client=anthropic.AsyncAnthropic(),
circuit=CircuitBreaker(failure_threshold=10, timeout=15.0),
max_tokens=4096,
cost_per_mtok_input=0.80,
cost_per_mtok_output=4.0,
),
])
When Claude 3.7 Sonnet is rate limited or down, the router transparently falls over to GPT-4o. If that fails too, Claude Haiku handles it. The user gets a response, just possibly from a less capable model.
Handling partial response failures
Sometimes the API returns a response but it's not what you expected: malformed JSON in a structured output request, a tool call with wrong parameters, a response that's cut off. These need different handling than API-level failures.
import json
from pydantic import BaseModel, ValidationError
class StructuredResponse(BaseModel):
action: str
parameters: dict
reasoning: str
async def call_with_structured_retry(
client,
messages: list[dict],
schema: type[BaseModel],
max_correction_attempts: int = 2
) -> BaseModel:
for attempt in range(max_correction_attempts + 1):
response = await client.messages.create(
model="claude-3-7-sonnet-20250219",
max_tokens=1000,
messages=messages
)
content = response.content[0].text
try:
# Try to parse the response
parsed = json.loads(content)
return schema(**parsed)
except (json.JSONDecodeError, ValidationError) as e:
if attempt == max_correction_attempts:
raise ValueError(
f"Failed to get valid structured response after "
f"{max_correction_attempts + 1} attempts: {e}"
)
# Add a correction message and retry
messages = messages + [
{"role": "assistant", "content": content},
{
"role": "user",
"content": (
f"Your response wasn't valid JSON matching the expected schema. "
f"Error: {e}\n\n"
f"Please respond again with only valid JSON matching this schema:\n"
f"{schema.model_json_schema()}"
)
}
]
Graceful degradation
When everything fails, you need a last resort that keeps the user experience from completely breaking. Graceful degradation means having a response prepared for total failure:
async def agent_response_with_fallback(
router: FallbackRouter,
messages: list[dict],
task_context: str
) -> dict:
try:
response, model_used = await router.call(messages)
return {
"success": True,
"content": response.content[0].text,
"model": model_used.model
}
except Exception as exc:
logger.error(f"All models failed for task: {task_context}. Error: {exc}")
# Return a meaningful degraded response instead of crashing
return {
"success": False,
"content": (
"I'm having trouble processing your request right now due to "
"service issues. Your request has been saved and will be "
"processed as soon as service is restored."
),
"model": None,
"queued_for_retry": True
}
Queue the failed request for later processing rather than losing it. A simple Redis-backed queue works fine:
async def queue_failed_request(
redis_client,
session_id: str,
messages: list[dict],
task_context: str
) -> None:
payload = json.dumps({
"session_id": session_id,
"messages": messages,
"task_context": task_context,
"queued_at": time.time()
})
await redis_client.rpush("agent_retry_queue", payload)
A background worker processes this queue when service is restored.
Observability for error recovery
Track these metrics to understand how your error recovery is performing:
- Retry rate per model. How often are you retrying? High retry rate indicates a systemic issue.
- Circuit breaker open events. How often do circuits open and for how long?
- Fallback activation rate. What percentage of requests fall back to secondary or tertiary models?
- Total failure rate. After all retries and fallbacks, what percentage of requests still fail?
- p99 latency with retries. Retries add latency. Make sure the tail latency is acceptable.
A Prometheus counter setup:
from prometheus_client import Counter, Histogram
api_calls_total = Counter(
'ai_agent_api_calls_total',
'Total API calls',
['model', 'status'] # status: success, retry, fallback, failure
)
api_latency = Histogram(
'ai_agent_api_latency_seconds',
'API call latency',
['model']
)
For the rate limiting strategies that complement this error handling (token bucketing, multi-key rotation), the rate limiting guide covers the prevention side. And for state recovery when an agent crash happens mid-task, the state management guide covers checkpointing and resumable sessions.