AI Agent Error Handling Best Practices: Production-Ready Resilience Patterns
When building AI agents for production, robust error handling isn’t optional—it’s the difference between a system that runs reliably at scale versus one that fails spectacularly in front of users. Through building systems like ClawdHub (our 13K+ line AI agent orchestration platform) and AgentAgent (multi-agent coordination system), we’ve learned that AI agent error handling best practices require a fundamentally different approach than traditional software error handling.
AI agents face unique challenges: unpredictable LLM responses, rate limits, network instability, and the need to maintain conversation context even when things go wrong. Traditional try-catch blocks aren’t enough when your agent needs to recover gracefully from a malformed API response while preserving a multi-turn conversation state.
Why AI Agents Need Specialized Error Handling
AI agents operate in an inherently uncertain environment. Unlike traditional applications where inputs are predictable, AI agents deal with:
Dynamic Response Formats: LLMs don’t always return data in expected formats, even with structured prompts. Your agent might receive JSON when expecting plain text, or vice versa.
Rate Limiting and Quotas: API providers impose limits that can trigger mid-conversation. Your agent needs to handle these gracefully without losing context.
Partial Failures: An agent might successfully process part of a request but fail on a specific step. Complete rollback isn’t always appropriate.
Context Dependencies: Error recovery often requires understanding what the agent was trying to accomplish, not just what function failed.
In our AgentAgent system, we learned this the hard way. Early versions would crash entire agent sessions when a single API call failed, losing hours of accumulated context. The solution required rethinking error boundaries at the agent level, not just the function level.
Error Classification and Response Strategies
The first step in implementing effective error handling is categorizing errors by their recovery patterns. We use a three-tier classification system:
Transient Errors (Retry)
These errors are temporary and likely to resolve on retry:
- Network timeouts
- Rate limiting (429 responses)
- Temporary API unavailability (502, 503, 504)
- LLM provider overload
import asyncio
import aiohttp
from typing import Optional, Callable, Any
import logging
class TransientErrorHandler:
def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
self.max_retries = max_retries
self.base_delay = base_delay
self.logger = logging.getLogger(__name__)
async def execute_with_retry(
self,
func: Callable,
*args,
**kwargs
) -> Any:
"""Execute function with exponential backoff retry logic"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return await func(*args, **kwargs)
except (
aiohttp.ClientTimeout,
aiohttp.ClientResponseError,
ConnectionError
) as e:
last_exception = e
if attempt == self.max_retries:
self.logger.error(f"Max retries exceeded: {e}")
break
# Exponential backoff with jitter
delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
self.logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
await asyncio.sleep(delay)
raise last_exception
Recoverable Errors (Fallback)
These errors require alternative approaches but don’t necessarily stop the agent:
- Malformed LLM responses
- Missing required fields in responses
- Invalid tool parameters
- Context window exceeded
class RecoverableErrorHandler:
def __init__(self, fallback_strategies: dict):
self.fallback_strategies = fallback_strategies
self.logger = logging.getLogger(__name__)
async def handle_malformed_response(
self,
response: str,
expected_format: str,
context: dict
) -> dict:
"""Handle malformed LLM responses with structured fallback"""
try:
# Attempt to parse as expected
if expected_format == "json":
return json.loads(response)
except json.JSONDecodeError:
self.logger.warning("JSON parsing failed, attempting repair")
# Try to extract JSON from mixed content
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group())
except json.JSONDecodeError:
pass
# Fallback to structured re-prompting
return await self._request_structured_retry(response, context)
async def _request_structured_retry(self, failed_response: str, context: dict) -> dict:
"""Request a properly formatted response from the LLM"""
retry_prompt = f"""
The previous response was not in the expected JSON format:
{failed_response}
Please provide the response as valid JSON only, with this structure:
{{
"action": "string",
"parameters": {{}},
"reasoning": "string"
}}
"""
# Re-query with explicit format requirements
# Implementation would use your LLM client here
pass
Fatal Errors (Graceful Degradation)
These errors require immediate escalation or system shutdown:
- Authentication failures
- Quota exhausted
- Critical system dependencies unavailable
- Security violations
Circuit Breaker Pattern for AI Services
The circuit breaker pattern is crucial for AI agents because LLM provider issues can cascade quickly. When Claude API starts returning errors, you don’t want to hammer it with retries—you want to fail fast and potentially switch to a fallback provider.
import time
from enum import Enum
from typing import Optional, Callable
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
timeout: int = 60,
expected_exception: Exception = Exception
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func: Callable, *args, **kwargs):
"""Execute function through circuit breaker"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to try resetting"""
return (
time.time() - self.last_failure_time >= self.timeout
if self.last_failure_time else False
)
def _on_success(self):
"""Reset circuit breaker on successful call"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failure and potentially open circuit"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage in agent system
claude_circuit = CircuitBreaker(
failure_threshold=3,
timeout=30,
expected_exception=aiohttp.ClientError
)
async def query_llm_with_circuit_breaker(prompt: str) -> str:
"""Query LLM with circuit breaker protection"""
try:
return await claude_circuit.call(claude_api.query, prompt)
except Exception:
# Fallback to alternative provider or cached response
return await fallback_llm_provider.query(prompt)
Context Preservation During Failures
One of the most critical aspects of AI agent error handling is preserving conversation context when errors occur. Users shouldn’t lose their progress because of a temporary API failure.
import json
from typing import Dict, List, Any
from dataclasses import dataclass, asdict
import asyncio
@dataclass
class ConversationState:
messages: List[Dict[str, str]]
context_variables: Dict[str, Any]
current_task: Optional[str]
completed_steps: List[str]
def to_dict(self) -> Dict:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict) -> 'ConversationState':
return cls(**data)
class ContextPreservingAgent:
def __init__(self, state_store: 'StateStore'):
self.state_store = state_store
self.conversation_state = ConversationState(
messages=[],
context_variables={},
current_task=None,
completed_steps=[]
)
async def execute_task(self, task: str, session_id: str):
"""Execute task with automatic state preservation"""
# Save initial state
await self._checkpoint_state(session_id)
try:
self.conversation_state.current_task = task
# Execute task steps
for step in self._get_task_steps(task):
await self._execute_step(step)
self.conversation_state.completed_steps.append(step)
# Checkpoint after each successful step
await self._checkpoint_state(session_id)
# Task completed successfully
self.conversation_state.current_task = None
await self._checkpoint_state(session_id)
except Exception as e:
self.logger.error(f"Task failed: {e}")
# State is preserved at last checkpoint
await self._handle_task_failure(task, e, session_id)
async def _checkpoint_state(self, session_id: str):
"""Save current conversation state"""
await self.state_store.save_state(
session_id,
self.conversation_state.to_dict()
)
async def restore_from_checkpoint(self, session_id: str):
"""Restore agent state from last checkpoint"""
state_data = await self.state_store.load_state(session_id)
if state_data:
self.conversation_state = ConversationState.from_dict(state_data)
# Resume interrupted task if any
if self.conversation_state.current_task:
await self._resume_interrupted_task()
async def _resume_interrupted_task(self):
"""Resume task from last completed step"""
remaining_steps = [
step for step in self._get_task_steps(self.conversation_state.current_task)
if step not in self.conversation_state.completed_steps
]
for step in remaining_steps:
try:
await self._execute_step(step)
self.conversation_state.completed_steps.append(step)
except Exception as e:
# Handle step failure with context
await self._handle_step_failure(step, e)
Implementing Graceful Degradation
When AI services fail, your agent should degrade gracefully rather than crash completely. This means providing alternative functionality or cached responses when possible.
class GracefulDegradationAgent:
def __init__(self):
self.capabilities = {
'llm_query': True,
'tool_execution': True,
'web_search': True,
'file_operations': True
}
self.fallback_cache = {}
async def query_with_degradation(self, prompt: str, context: dict) -> str:
"""Query with multiple fallback levels"""
# Level 1: Primary LLM service
if self.capabilities['llm_query']:
try:
return await self.primary_llm.query(prompt)
except Exception as e:
self.logger.warning(f"Primary LLM failed: {e}")
self.capabilities['llm_query'] = False
# Level 2: Secondary LLM provider
try:
return await self.secondary_llm.query(prompt)
except Exception as e:
self.logger.warning(f"Secondary LLM failed: {e}")
# Level 3: Pattern-based responses
pattern_response = self._try_pattern_matching(prompt)
if pattern_response:
return f"[Degraded Mode] {pattern_response}"
# Level 4: Cached similar responses
cached_response = self._find_cached_response(prompt)
if cached_response:
return f"[Cached] {cached_response}"
# Level 5: Honest failure with helpful message
return self._generate_helpful_failure_message(prompt, context)
def _try_pattern_matching(self, prompt: str) -> Optional[str]:
"""Simple pattern matching for common queries"""
patterns = {
r'what.*time': "I cannot access current time information in degraded mode.",
r'weather.*': "Weather information requires external API access, currently unavailable.",
r'calculate.*': "Complex calculations require full AI capabilities, currently unavailable."
}
for pattern, response in patterns.items():
if re.search(pattern, prompt.lower()):
return response
return None
def _generate_helpful_failure_message(self, prompt: str, context: dict) -> str:
"""Generate contextual failure message"""
return f"""I'm experiencing technical difficulties and cannot process your request fully.
Here's what I can tell you:
- Your request: {prompt[:100]}...
- Available capabilities: {[k for k, v in self.capabilities.items() if v]}
- You may want to try: Simplifying your request or trying again in a few minutes
I'll automatically retry full functionality shortly."""
Monitoring and Observability
Effective error handling requires comprehensive monitoring to understand failure patterns and optimize recovery strategies. In our ClawdHub system, we implemented detailed error tracking that helped us identify and fix subtle failure modes.
import structlog
from dataclasses import dataclass
from typing import Optional, Dict, Any
import time
@dataclass
class ErrorMetrics:
error_type: str
error_count: int
last_occurrence: float
recovery_success_rate: float
average_recovery_time: float
class AgentErrorMonitor:
def __init__(self):
self.logger = structlog.get_logger()
self.error_metrics: Dict[str, ErrorMetrics] = {}
self.recovery_times: Dict[str, List[float]] = {}
async def track_error(
self,
error: Exception,
context: Dict[str, Any],
recovery_attempted: bool = False
):
"""Track error occurrence and context"""
error_type = type(error).__name__
# Log structured error data
self.logger.error(
"agent_error_occurred",
error_type=error_type,
error_message=str(error),
context=context,
recovery_attempted=recovery_attempted,
timestamp=time.time()
)
# Update metrics
if error_type not in self.error_metrics:
self.error_metrics[error_type] = ErrorMetrics(
error_type=error_type,
error_count=0,
last_ More from the blog
Need help with AI?
We build production AI systems — from strategy and architecture to deployment and evaluation.
Get our AI implementation playbook
A practical guide to evaluating, planning, and deploying AI in your business. Free, no spam.
Check your inbox.
Something went wrong. Please try again.