Python Best Practices for Production AI Agents in 2026
Python is the dominant language for AI agent development, which means there's an enormous amount of code out there ranging from production-grade to "it works on my laptop but will fall apart in staging." The gap between code that runs locally and code that runs reliably in production is mostly discipline: error handling, async patterns, retry logic, and observability.
These aren't glamorous topics, but they're what separates a demo from a deployed service. Here's what Python AI agent code looks like when it's done right.
Async from the start
Every AI agent that makes model API calls should be async. This isn't a performance optimization, it's a correctness issue. Synchronous code blocks the thread while waiting for API responses. An agent that makes 5 tool calls sequentially, each taking 500ms, ties up a thread for 2.5 seconds. In an async model, those same tool calls can run concurrently if they're independent.
All the major Python AI SDKs support async: the Anthropic SDK, OpenAI SDK, and Pydantic AI all have async-first APIs as of 2026.
import asyncio
import anthropic
async def run_agent(query: str) -> str:
client = anthropic.AsyncAnthropic()
messages = [{"role": "user", "content": query}]
response = await client.messages.create(
model="claude-4-sonnet-20260518",
max_tokens=4096,
messages=messages
)
return response.content[0].text
# Running from sync context
result = asyncio.run(run_agent("What is 2 + 2?"))
The AsyncAnthropic client is drop-in identical to the sync Anthropic client but all methods are coroutines. Use it everywhere.
For concurrent tool calls, asyncio.gather runs them in parallel:
async def run_tools_in_parallel(tool_calls: list[ToolCall]) -> list[ToolResult]:
tasks = [execute_tool(tc) for tc in tool_calls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle results, including exceptions from individual tools
processed = []
for tool_call, result in zip(tool_calls, results):
if isinstance(result, Exception):
processed.append(ToolResult(
tool_use_id=tool_call.id,
is_error=True,
content=str(result)
))
else:
processed.append(result)
return processed
return_exceptions=True prevents a single failing tool from canceling all the others. Each tool runs to completion (or failure) and you handle the results individually.
Pydantic models for everything structured
The single best practice for Python agent code is using Pydantic v2 models for all structured data. Not just the API outputs, but tool parameters, intermediate state, and anything that crosses a boundary.
from pydantic import BaseModel, Field, field_validator
from typing import Optional
import re
class SearchParams(BaseModel):
query: str = Field(min_length=1, max_length=500)
max_results: int = Field(default=5, ge=1, le=50)
safe_search: bool = True
@field_validator('query')
@classmethod
def clean_query(cls, v: str) -> str:
# Remove potential injection characters
return re.sub(r'[<>"\']', '', v).strip()
class SearchResult(BaseModel):
title: str
url: str
snippet: str
relevance_score: Optional[float] = None
@field_validator('url')
@classmethod
def validate_url(cls, v: str) -> str:
if not v.startswith(('http://', 'https://')):
raise ValueError(f'Invalid URL: {v}')
return v
class AgentResponse(BaseModel):
answer: str
sources: list[SearchResult] = Field(default_factory=list)
confidence: float = Field(ge=0.0, le=1.0)
reasoning: Optional[str] = None
Pydantic validators run at parse time, not at use time. When you call SearchParams(query=user_input), the query gets cleaned and validated immediately. If it fails, you get a clear ValidationError with the field name and what went wrong.
For parsing model output into structured types, Pydantic handles JSON parsing and validation in one step:
import json
from pydantic import ValidationError
async def parse_model_output(raw_output: str) -> AgentResponse:
try:
data = json.loads(raw_output)
return AgentResponse.model_validate(data)
except json.JSONDecodeError as e:
raise ValueError(f"Model output was not valid JSON: {e}") from e
except ValidationError as e:
raise ValueError(f"Model output did not match expected schema: {e}") from e
Retry logic with tenacity
API calls fail. Network timeouts, rate limits, transient server errors. Production agents need retry logic that handles these gracefully.
The tenacity library is the standard for Python retry logic:
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log
)
import anthropic
import logging
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30),
retry=retry_if_exception_type((
anthropic.RateLimitError,
anthropic.APIConnectionError,
anthropic.InternalServerError
)),
before_sleep=before_sleep_log(logger, logging.WARNING)
)
async def create_message_with_retry(
client: anthropic.AsyncAnthropic,
**kwargs
) -> anthropic.Message:
return await client.messages.create(**kwargs)
Key decisions in this retry configuration:
- 3 attempts total (not 3 retries: the first attempt counts)
- Exponential backoff starting at 2 seconds, capped at 30 seconds
- Only retry on specific exceptions (rate limits, connection errors, server errors)
- Log before each retry so you can see the retry pattern in production
What not to retry: anthropic.BadRequestError (your request is malformed, retrying won't help), anthropic.AuthenticationError (credentials are wrong), or ValidationError from Pydantic (your data is bad). These are deterministic failures that retrying will not fix.
For rate limit handling specifically, the Anthropic SDK and OpenAI SDK both include a retry-after header in rate limit responses. A smarter retry respects that header:
import time
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=60),
retry=retry_if_exception_type(anthropic.RateLimitError)
)
async def rate_limit_aware_create(client, **kwargs):
try:
return await client.messages.create(**kwargs)
except anthropic.RateLimitError as e:
# Check for Retry-After header in the response
if hasattr(e, 'response') and e.response:
retry_after = e.response.headers.get('retry-after')
if retry_after:
await asyncio.sleep(float(retry_after))
raise # re-raise for tenacity to handle
Structured error handling
Python's exception hierarchy is flexible. Use it to model the specific failure types your agent can encounter:
class AgentError(Exception):
"""Base class for all agent errors."""
pass
class ModelError(AgentError):
"""Error from the model API."""
def __init__(self, message: str, model: str, original_error: Exception):
super().__init__(message)
self.model = model
self.original_error = original_error
class ToolError(AgentError):
"""Error executing a tool."""
def __init__(self, tool_name: str, tool_args: dict, original_error: Exception):
super().__init__(f"Tool '{tool_name}' failed: {original_error}")
self.tool_name = tool_name
self.tool_args = tool_args
self.original_error = original_error
class OutputValidationError(AgentError):
"""Model output didn't match expected schema."""
def __init__(self, expected_schema: type, raw_output: str, validation_error: Exception):
super().__init__(f"Output validation failed: {validation_error}")
self.expected_schema = expected_schema
self.raw_output = raw_output
self.validation_error = validation_error
class MaxIterationsError(AgentError):
"""Agent exceeded maximum allowed iterations."""
def __init__(self, max_iterations: int, last_state: dict):
super().__init__(f"Agent exceeded {max_iterations} iterations")
self.max_iterations = max_iterations
self.last_state = last_state
With typed errors, your calling code can make intelligent decisions:
async def safe_run_agent(query: str) -> str | None:
try:
return await run_agent(query)
except ToolError as e:
logger.error("Tool failure", extra={
"tool": e.tool_name,
"args": e.tool_args,
"error": str(e.original_error)
})
# Tool failures are often non-retryable; return None and handle upstream
return None
except OutputValidationError as e:
logger.warning("Output validation failed", extra={
"schema": e.expected_schema.__name__,
"raw_output": e.raw_output[:500] # truncate for logging
})
# Could retry with different instructions; for now, return None
return None
except MaxIterationsError as e:
logger.error("Agent loop exceeded max iterations", extra={
"max": e.max_iterations,
"last_state": e.last_state
})
raise # This is a logic problem, not a transient failure
Observability with structured logging
For production agents, structured logging is the minimum observability you need. structlog is the standard Python library for this:
import structlog
import time
from contextlib import asynccontextmanager
from typing import AsyncGenerator
logger = structlog.get_logger()
@asynccontextmanager
async def agent_span(
agent_id: str,
task: str
) -> AsyncGenerator[structlog.BoundLogger, None]:
"""Context manager that logs agent execution with timing."""
start_time = time.monotonic()
span_logger = logger.bind(
agent_id=agent_id,
task=task[:200] # truncate long tasks for log size
)
span_logger.info("agent_started")
try:
yield span_logger
duration_ms = int((time.monotonic() - start_time) * 1000)
span_logger.info("agent_completed", duration_ms=duration_ms)
except Exception as e:
duration_ms = int((time.monotonic() - start_time) * 1000)
span_logger.error(
"agent_failed",
duration_ms=duration_ms,
error_type=type(e).__name__,
error_message=str(e)
)
raise
async def run_agent_with_observability(query: str) -> str:
async with agent_span(agent_id="research_agent", task=query) as span:
result = await run_agent(query)
span.info("agent_output_ready", output_length=len(result))
return result
This logs a agent_started event, an agent_completed event with timing, or an agent_failed event with the error type and message. Every agent execution produces at least two log lines with consistent structure.
The structured format (key-value pairs rather than formatted strings) means you can filter and aggregate logs effectively. "Show me all agent_failed events in the last hour" is easy in any log aggregation platform when the event type is a discrete field.
Managing agent state in long-running loops
Agent loops need explicit state management. A dictionary is fine for simple cases but becomes hard to reason about as agents grow:
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class AgentState:
task: str
iteration: int = 0
max_iterations: int = 20
messages: list[dict] = field(default_factory=list)
tool_results: list[dict] = field(default_factory=list)
is_complete: bool = False
final_answer: Optional[str] = None
def check_iteration_limit(self) -> None:
if self.iteration >= self.max_iterations:
raise MaxIterationsError(
self.max_iterations,
{"iteration": self.iteration, "message_count": len(self.messages)}
)
def add_message(self, role: str, content) -> None:
self.messages.append({"role": role, "content": content})
def complete(self, answer: str) -> None:
self.is_complete = True
self.final_answer = answer
The dataclass approach gives you type annotations (helpful for IDE support), a __repr__ for free, and the ability to pass state as a single object rather than multiple parameters.
The check_iteration_limit method centralizes the guard against infinite loops. Call it at the start of each loop iteration. Agents that loop without a bound will run until they hit an API cost limit or a connection timeout, which is the wrong place to stop.
Prompt management
Prompts are code. Treat them like code: version them, test them, and don't duplicate them.
A minimal prompt manager:
from pathlib import Path
import string
class PromptTemplate:
def __init__(self, template_path: Path):
self.template_path = template_path
self._template: string.Template | None = None
def _load(self) -> string.Template:
if self._template is None:
text = self.template_path.read_text(encoding='utf-8')
self._template = string.Template(text)
return self._template
def render(self, **kwargs) -> str:
return self._load().substitute(**kwargs)
# Usage
PROMPTS_DIR = Path(__file__).parent / "prompts"
system_prompt = PromptTemplate(PROMPTS_DIR / "research_agent.txt")
rendered = system_prompt.render(
user_context="Premium subscriber",
current_date="2026-05-12"
)
Store prompt templates as .txt files alongside your code. git diff on a text file is much cleaner than git diff on a long string embedded in Python. You can also use CLAUDE.md-style formatting in your prompt files and switch prompts by changing a file path.
For production systems with multiple prompt versions, a prompt registry pattern (prompts stored in a database with version numbers) gives you the ability to roll back prompt changes without deploying new code. This is valuable when a prompt change degrades agent performance in production.
Token budget management
LLM API costs are proportional to token usage. In production, you want to monitor and control this:
from dataclasses import dataclass
@dataclass
class TokenBudget:
max_input_tokens: int
max_output_tokens: int
used_input_tokens: int = 0
used_output_tokens: int = 0
@property
def remaining_input(self) -> int:
return max(0, self.max_input_tokens - self.used_input_tokens)
@property
def remaining_output(self) -> int:
return max(0, self.max_output_tokens - self.used_output_tokens)
def record_usage(self, response: anthropic.Message) -> None:
self.used_input_tokens += response.usage.input_tokens
self.used_output_tokens += response.usage.output_tokens
def check_budget(self) -> None:
if self.remaining_input < 1000:
raise BudgetExceededError(
f"Input token budget nearly exhausted: "
f"{self.used_input_tokens}/{self.max_input_tokens}"
)
# Per-request budget: $0.10 worth of tokens at claude-4-sonnet pricing
# ~5k input tokens, ~2k output tokens = roughly $0.10
budget = TokenBudget(max_input_tokens=5000, max_output_tokens=2000)
Tracking tokens per agent run lets you set per-request cost budgets and log token consumption for later cost analysis. "Which types of queries consume the most tokens?" is an expensive question to answer retroactively if you didn't log it.
Python's ecosystem for AI agent development is mature and opinionated in 2026. Pydantic v2 for data validation, tenacity for retries, structlog for observability, asyncio throughout. These aren't arbitrary choices: they're what the community has converged on because they work at scale. The patterns in this article translate directly to production code. Start with them and you'll spend less time debugging infrastructure and more time on the actual agent logic.