Agentbrisk

Python Best Practices for Production AI Agents in 2026

May 12, 2026 · Editorial Team · 8 min read · pythonai-agentsbest-practices

Python is the dominant language for AI agent development, which means there's an enormous amount of code out there ranging from production-grade to "it works on my laptop but will fall apart in staging." The gap between code that runs locally and code that runs reliably in production is mostly discipline: error handling, async patterns, retry logic, and observability.

These aren't glamorous topics, but they're what separates a demo from a deployed service. Here's what Python AI agent code looks like when it's done right.


Async from the start

Every AI agent that makes model API calls should be async. This isn't a performance optimization, it's a correctness issue. Synchronous code blocks the thread while waiting for API responses. An agent that makes 5 tool calls sequentially, each taking 500ms, ties up a thread for 2.5 seconds. In an async model, those same tool calls can run concurrently if they're independent.

All the major Python AI SDKs support async: the Anthropic SDK, OpenAI SDK, and Pydantic AI all have async-first APIs as of 2026.

import asyncio
import anthropic

async def run_agent(query: str) -> str:
    client = anthropic.AsyncAnthropic()
    
    messages = [{"role": "user", "content": query}]
    
    response = await client.messages.create(
        model="claude-4-sonnet-20260518",
        max_tokens=4096,
        messages=messages
    )
    
    return response.content[0].text

# Running from sync context
result = asyncio.run(run_agent("What is 2 + 2?"))

The AsyncAnthropic client is drop-in identical to the sync Anthropic client but all methods are coroutines. Use it everywhere.

For concurrent tool calls, asyncio.gather runs them in parallel:

async def run_tools_in_parallel(tool_calls: list[ToolCall]) -> list[ToolResult]:
    tasks = [execute_tool(tc) for tc in tool_calls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Handle results, including exceptions from individual tools
    processed = []
    for tool_call, result in zip(tool_calls, results):
        if isinstance(result, Exception):
            processed.append(ToolResult(
                tool_use_id=tool_call.id,
                is_error=True,
                content=str(result)
            ))
        else:
            processed.append(result)
    
    return processed

return_exceptions=True prevents a single failing tool from canceling all the others. Each tool runs to completion (or failure) and you handle the results individually.


Pydantic models for everything structured

The single best practice for Python agent code is using Pydantic v2 models for all structured data. Not just the API outputs, but tool parameters, intermediate state, and anything that crosses a boundary.

from pydantic import BaseModel, Field, field_validator
from typing import Optional
import re

class SearchParams(BaseModel):
    query: str = Field(min_length=1, max_length=500)
    max_results: int = Field(default=5, ge=1, le=50)
    safe_search: bool = True
    
    @field_validator('query')
    @classmethod
    def clean_query(cls, v: str) -> str:
        # Remove potential injection characters
        return re.sub(r'[<>"\']', '', v).strip()

class SearchResult(BaseModel):
    title: str
    url: str
    snippet: str
    relevance_score: Optional[float] = None
    
    @field_validator('url')
    @classmethod
    def validate_url(cls, v: str) -> str:
        if not v.startswith(('http://', 'https://')):
            raise ValueError(f'Invalid URL: {v}')
        return v

class AgentResponse(BaseModel):
    answer: str
    sources: list[SearchResult] = Field(default_factory=list)
    confidence: float = Field(ge=0.0, le=1.0)
    reasoning: Optional[str] = None

Pydantic validators run at parse time, not at use time. When you call SearchParams(query=user_input), the query gets cleaned and validated immediately. If it fails, you get a clear ValidationError with the field name and what went wrong.

For parsing model output into structured types, Pydantic handles JSON parsing and validation in one step:

import json
from pydantic import ValidationError

async def parse_model_output(raw_output: str) -> AgentResponse:
    try:
        data = json.loads(raw_output)
        return AgentResponse.model_validate(data)
    except json.JSONDecodeError as e:
        raise ValueError(f"Model output was not valid JSON: {e}") from e
    except ValidationError as e:
        raise ValueError(f"Model output did not match expected schema: {e}") from e

Retry logic with tenacity

API calls fail. Network timeouts, rate limits, transient server errors. Production agents need retry logic that handles these gracefully.

The tenacity library is the standard for Python retry logic:

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log
)
import anthropic
import logging

logger = logging.getLogger(__name__)

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=30),
    retry=retry_if_exception_type((
        anthropic.RateLimitError,
        anthropic.APIConnectionError,
        anthropic.InternalServerError
    )),
    before_sleep=before_sleep_log(logger, logging.WARNING)
)
async def create_message_with_retry(
    client: anthropic.AsyncAnthropic,
    **kwargs
) -> anthropic.Message:
    return await client.messages.create(**kwargs)

Key decisions in this retry configuration:

  • 3 attempts total (not 3 retries: the first attempt counts)
  • Exponential backoff starting at 2 seconds, capped at 30 seconds
  • Only retry on specific exceptions (rate limits, connection errors, server errors)
  • Log before each retry so you can see the retry pattern in production

What not to retry: anthropic.BadRequestError (your request is malformed, retrying won't help), anthropic.AuthenticationError (credentials are wrong), or ValidationError from Pydantic (your data is bad). These are deterministic failures that retrying will not fix.

For rate limit handling specifically, the Anthropic SDK and OpenAI SDK both include a retry-after header in rate limit responses. A smarter retry respects that header:

import time

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=1, max=60),
    retry=retry_if_exception_type(anthropic.RateLimitError)
)
async def rate_limit_aware_create(client, **kwargs):
    try:
        return await client.messages.create(**kwargs)
    except anthropic.RateLimitError as e:
        # Check for Retry-After header in the response
        if hasattr(e, 'response') and e.response:
            retry_after = e.response.headers.get('retry-after')
            if retry_after:
                await asyncio.sleep(float(retry_after))
        raise  # re-raise for tenacity to handle

Structured error handling

Python's exception hierarchy is flexible. Use it to model the specific failure types your agent can encounter:

class AgentError(Exception):
    """Base class for all agent errors."""
    pass

class ModelError(AgentError):
    """Error from the model API."""
    def __init__(self, message: str, model: str, original_error: Exception):
        super().__init__(message)
        self.model = model
        self.original_error = original_error

class ToolError(AgentError):
    """Error executing a tool."""
    def __init__(self, tool_name: str, tool_args: dict, original_error: Exception):
        super().__init__(f"Tool '{tool_name}' failed: {original_error}")
        self.tool_name = tool_name
        self.tool_args = tool_args
        self.original_error = original_error

class OutputValidationError(AgentError):
    """Model output didn't match expected schema."""
    def __init__(self, expected_schema: type, raw_output: str, validation_error: Exception):
        super().__init__(f"Output validation failed: {validation_error}")
        self.expected_schema = expected_schema
        self.raw_output = raw_output
        self.validation_error = validation_error

class MaxIterationsError(AgentError):
    """Agent exceeded maximum allowed iterations."""
    def __init__(self, max_iterations: int, last_state: dict):
        super().__init__(f"Agent exceeded {max_iterations} iterations")
        self.max_iterations = max_iterations
        self.last_state = last_state

With typed errors, your calling code can make intelligent decisions:

async def safe_run_agent(query: str) -> str | None:
    try:
        return await run_agent(query)
    except ToolError as e:
        logger.error("Tool failure", extra={
            "tool": e.tool_name,
            "args": e.tool_args,
            "error": str(e.original_error)
        })
        # Tool failures are often non-retryable; return None and handle upstream
        return None
    except OutputValidationError as e:
        logger.warning("Output validation failed", extra={
            "schema": e.expected_schema.__name__,
            "raw_output": e.raw_output[:500]  # truncate for logging
        })
        # Could retry with different instructions; for now, return None
        return None
    except MaxIterationsError as e:
        logger.error("Agent loop exceeded max iterations", extra={
            "max": e.max_iterations,
            "last_state": e.last_state
        })
        raise  # This is a logic problem, not a transient failure

Observability with structured logging

For production agents, structured logging is the minimum observability you need. structlog is the standard Python library for this:

import structlog
import time
from contextlib import asynccontextmanager
from typing import AsyncGenerator

logger = structlog.get_logger()

@asynccontextmanager
async def agent_span(
    agent_id: str,
    task: str
) -> AsyncGenerator[structlog.BoundLogger, None]:
    """Context manager that logs agent execution with timing."""
    start_time = time.monotonic()
    span_logger = logger.bind(
        agent_id=agent_id,
        task=task[:200]  # truncate long tasks for log size
    )
    
    span_logger.info("agent_started")
    
    try:
        yield span_logger
        duration_ms = int((time.monotonic() - start_time) * 1000)
        span_logger.info("agent_completed", duration_ms=duration_ms)
    except Exception as e:
        duration_ms = int((time.monotonic() - start_time) * 1000)
        span_logger.error(
            "agent_failed",
            duration_ms=duration_ms,
            error_type=type(e).__name__,
            error_message=str(e)
        )
        raise

async def run_agent_with_observability(query: str) -> str:
    async with agent_span(agent_id="research_agent", task=query) as span:
        result = await run_agent(query)
        span.info("agent_output_ready", output_length=len(result))
        return result

This logs a agent_started event, an agent_completed event with timing, or an agent_failed event with the error type and message. Every agent execution produces at least two log lines with consistent structure.

The structured format (key-value pairs rather than formatted strings) means you can filter and aggregate logs effectively. "Show me all agent_failed events in the last hour" is easy in any log aggregation platform when the event type is a discrete field.


Managing agent state in long-running loops

Agent loops need explicit state management. A dictionary is fine for simple cases but becomes hard to reason about as agents grow:

from dataclasses import dataclass, field
from typing import Optional

@dataclass
class AgentState:
    task: str
    iteration: int = 0
    max_iterations: int = 20
    messages: list[dict] = field(default_factory=list)
    tool_results: list[dict] = field(default_factory=list)
    is_complete: bool = False
    final_answer: Optional[str] = None
    
    def check_iteration_limit(self) -> None:
        if self.iteration >= self.max_iterations:
            raise MaxIterationsError(
                self.max_iterations,
                {"iteration": self.iteration, "message_count": len(self.messages)}
            )
    
    def add_message(self, role: str, content) -> None:
        self.messages.append({"role": role, "content": content})
    
    def complete(self, answer: str) -> None:
        self.is_complete = True
        self.final_answer = answer

The dataclass approach gives you type annotations (helpful for IDE support), a __repr__ for free, and the ability to pass state as a single object rather than multiple parameters.

The check_iteration_limit method centralizes the guard against infinite loops. Call it at the start of each loop iteration. Agents that loop without a bound will run until they hit an API cost limit or a connection timeout, which is the wrong place to stop.


Prompt management

Prompts are code. Treat them like code: version them, test them, and don't duplicate them.

A minimal prompt manager:

from pathlib import Path
import string

class PromptTemplate:
    def __init__(self, template_path: Path):
        self.template_path = template_path
        self._template: string.Template | None = None
    
    def _load(self) -> string.Template:
        if self._template is None:
            text = self.template_path.read_text(encoding='utf-8')
            self._template = string.Template(text)
        return self._template
    
    def render(self, **kwargs) -> str:
        return self._load().substitute(**kwargs)

# Usage
PROMPTS_DIR = Path(__file__).parent / "prompts"

system_prompt = PromptTemplate(PROMPTS_DIR / "research_agent.txt")
rendered = system_prompt.render(
    user_context="Premium subscriber",
    current_date="2026-05-12"
)

Store prompt templates as .txt files alongside your code. git diff on a text file is much cleaner than git diff on a long string embedded in Python. You can also use CLAUDE.md-style formatting in your prompt files and switch prompts by changing a file path.

For production systems with multiple prompt versions, a prompt registry pattern (prompts stored in a database with version numbers) gives you the ability to roll back prompt changes without deploying new code. This is valuable when a prompt change degrades agent performance in production.


Token budget management

LLM API costs are proportional to token usage. In production, you want to monitor and control this:

from dataclasses import dataclass

@dataclass
class TokenBudget:
    max_input_tokens: int
    max_output_tokens: int
    used_input_tokens: int = 0
    used_output_tokens: int = 0
    
    @property
    def remaining_input(self) -> int:
        return max(0, self.max_input_tokens - self.used_input_tokens)
    
    @property
    def remaining_output(self) -> int:
        return max(0, self.max_output_tokens - self.used_output_tokens)
    
    def record_usage(self, response: anthropic.Message) -> None:
        self.used_input_tokens += response.usage.input_tokens
        self.used_output_tokens += response.usage.output_tokens
    
    def check_budget(self) -> None:
        if self.remaining_input < 1000:
            raise BudgetExceededError(
                f"Input token budget nearly exhausted: "
                f"{self.used_input_tokens}/{self.max_input_tokens}"
            )

# Per-request budget: $0.10 worth of tokens at claude-4-sonnet pricing
# ~5k input tokens, ~2k output tokens = roughly $0.10
budget = TokenBudget(max_input_tokens=5000, max_output_tokens=2000)

Tracking tokens per agent run lets you set per-request cost budgets and log token consumption for later cost analysis. "Which types of queries consume the most tokens?" is an expensive question to answer retroactively if you didn't log it.


Python's ecosystem for AI agent development is mature and opinionated in 2026. Pydantic v2 for data validation, tenacity for retries, structlog for observability, asyncio throughout. These aren't arbitrary choices: they're what the community has converged on because they work at scale. The patterns in this article translate directly to production code. Start with them and you'll spend less time debugging infrastructure and more time on the actual agent logic.

Search