AI Agent State Management: State Machines, Redis Sessions, Recovery
Most AI agent tutorials show you stateless agents: you send a message, you get a response, done. That works for demos. In production, agents need to remember things across turns, pause and resume tasks, handle failures gracefully, and sometimes recover from states they got into by accident.
State management is where most production agent implementations fall apart. This guide covers the patterns that work, with real code.
The state problem in agents
An AI agent typically has several categories of state that need different handling:
Conversation history. The messages the user and agent have exchanged. This is the most obvious kind of state and every agent library handles it, but it has a storage and trimming problem at scale.
Task state. Where a multi-step task is in its execution. Is the agent still gathering requirements, generating output, waiting for user confirmation, or finishing up? This is the state that most implementations handle poorly.
Tool results. The outputs of tools the agent called (API responses, search results, code execution output). These often need to persist so the agent can reference them later without re-running expensive operations.
User/session context. Information about the user and their preferences that should persist across multiple conversations.
World state. External state that the agent's actions have modified (files written, API calls made, emails sent). You need to track this so you know what to undo if something goes wrong.
Getting these five categories right is the difference between an agent that feels reliable and one that constantly forgets things or gets confused about where it is in a task.
State machines for task state
The most reliable pattern for task state is an explicit state machine. Instead of letting state be implicit in the conversation history, you define it explicitly as a set of states and valid transitions.
Here's a minimal state machine for a document generation agent:
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class AgentState(Enum):
IDLE = "idle"
GATHERING_REQUIREMENTS = "gathering_requirements"
GENERATING_OUTLINE = "generating_outline"
WRITING_SECTIONS = "writing_sections"
REVIEWING = "reviewing"
AWAITING_FEEDBACK = "awaiting_feedback"
FINALIZING = "finalizing"
COMPLETE = "complete"
ERROR = "error"
# Valid transitions: from state -> set of allowed next states
VALID_TRANSITIONS = {
AgentState.IDLE: {AgentState.GATHERING_REQUIREMENTS},
AgentState.GATHERING_REQUIREMENTS: {
AgentState.GENERATING_OUTLINE,
AgentState.ERROR
},
AgentState.GENERATING_OUTLINE: {
AgentState.WRITING_SECTIONS,
AgentState.AWAITING_FEEDBACK,
AgentState.ERROR
},
AgentState.WRITING_SECTIONS: {
AgentState.REVIEWING,
AgentState.ERROR
},
AgentState.REVIEWING: {
AgentState.AWAITING_FEEDBACK,
AgentState.FINALIZING,
AgentState.ERROR
},
AgentState.AWAITING_FEEDBACK: {
AgentState.WRITING_SECTIONS,
AgentState.FINALIZING,
AgentState.ERROR
},
AgentState.FINALIZING: {AgentState.COMPLETE, AgentState.ERROR},
AgentState.COMPLETE: set(),
AgentState.ERROR: {AgentState.IDLE},
}
@dataclass
class TaskSession:
session_id: str
state: AgentState
conversation_history: list
gathered_requirements: Optional[dict]
outline: Optional[str]
sections: dict # section_name -> content
feedback_history: list
error_message: Optional[str]
def transition(session: TaskSession, new_state: AgentState) -> TaskSession:
if new_state not in VALID_TRANSITIONS[session.state]:
raise ValueError(
f"Invalid transition: {session.state.value} -> {new_state.value}"
)
return TaskSession(**{**session.__dict__, 'state': new_state})
The state machine does two things that matter in production: it makes the current task state explicit and queryable (you can always answer "where is this task right now?"), and it prevents invalid state transitions that would put the agent into an inconsistent state.
When an agent makes a decision about what to do next, it checks valid transitions rather than asking the language model to remember what step it was on.
Persistent state with Redis
For production agents handling multiple concurrent sessions, in-memory state isn't enough. You need state that:
- Survives process restarts
- Is accessible across multiple agent instances
- Can be queried and monitored
- Has a natural expiration mechanism
Redis is the right tool here. Here's a session store implementation:
import json
import redis
from datetime import timedelta
from dataclasses import asdict
class AgentSessionStore:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.session_ttl = timedelta(hours=24)
def save(self, session: TaskSession) -> None:
key = f"agent_session:{session.session_id}"
data = asdict(session)
# Serialize enum to string for JSON storage
data['state'] = session.state.value
self.redis.setex(
key,
self.session_ttl,
json.dumps(data)
)
def load(self, session_id: str) -> Optional[TaskSession]:
key = f"agent_session:{session_id}"
data = self.redis.get(key)
if data is None:
return None
parsed = json.loads(data)
parsed['state'] = AgentState(parsed['state'])
return TaskSession(**parsed)
def delete(self, session_id: str) -> None:
self.redis.delete(f"agent_session:{session_id}")
def extend_ttl(self, session_id: str) -> None:
key = f"agent_session:{session_id}"
self.redis.expire(key, self.session_ttl)
def list_active_sessions(self) -> list[str]:
keys = self.redis.keys("agent_session:*")
return [k.decode().split(":")[1] for k in keys]
A few things to note about this implementation:
The TTL is set on every save, not just creation. This means active sessions stay alive; only idle ones expire. An agent working on a long task will keep renewing its TTL as it makes progress.
The serialization converts the enum to a string value before storing. This makes the stored data readable when you inspect Redis directly (which you will, when debugging).
The list_active_sessions method is useful for a monitoring dashboard. In production you'll want to know how many active sessions you have.
Handling conversation history at scale
Conversation history is state too, and it has a specific problem: it grows unboundedly. A long-running agent session might accumulate thousands of tokens of history, eventually exceeding context window limits or becoming expensive to process.
Three strategies for managing conversation history:
Sliding window. Keep only the N most recent messages. Simple but lossy: the agent forgets early context.
MAX_HISTORY_MESSAGES = 20
def trim_history(history: list) -> list:
if len(history) <= MAX_HISTORY_MESSAGES:
return history
# Always keep the system message if present
system_messages = [m for m in history if m['role'] == 'system']
recent_messages = history[-MAX_HISTORY_MESSAGES:]
return system_messages + recent_messages
Progressive summarization. When history exceeds a threshold, compress older messages into a summary. The agent gets a summary of earlier context plus the full recent context.
async def compress_history(
history: list,
anthropic_client,
keep_recent: int = 10
) -> list:
if len(history) <= keep_recent + 4:
return history
to_compress = history[:-keep_recent]
recent = history[-keep_recent:]
# Ask the model to summarize what happened
summary_response = await anthropic_client.messages.create(
model="claude-3-7-sonnet-20250219",
max_tokens=500,
messages=[
{
"role": "user",
"content": f"Summarize this conversation history concisely, "
f"preserving key facts, decisions made, and current task state:\n\n"
+ json.dumps(to_compress)
}
]
)
summary_message = {
"role": "assistant",
"content": f"[Earlier conversation summary: {summary_response.content[0].text}]"
}
return [summary_message] + recent
Task state extraction. Rather than summarizing conversation history, extract the important facts into structured task state. When the agent learns something important (user's name, their preferred output format, the deadline for the task), store it in the session's structured fields rather than relying on conversation history. Then the conversation history can be trimmed aggressively because the important data is in the state machine, not in the chat transcript.
Recovery patterns for failed states
Production agents fail. The model produces an error response, a tool call fails, the network drops, the process crashes. You need to handle each of these gracefully.
Checkpoint-based recovery. After every significant step, persist the session state. If the process crashes, the next run can resume from the last checkpoint instead of starting over.
async def run_task_step(
session: TaskSession,
store: AgentSessionStore
) -> TaskSession:
try:
if session.state == AgentState.GATHERING_REQUIREMENTS:
session = await gather_requirements(session)
session = transition(session, AgentState.GENERATING_OUTLINE)
store.save(session) # Checkpoint after state change
elif session.state == AgentState.GENERATING_OUTLINE:
session = await generate_outline(session)
session = transition(session, AgentState.WRITING_SECTIONS)
store.save(session) # Checkpoint
# ... etc
return session
except Exception as e:
error_session = transition(
TaskSession(**{**session.__dict__, 'error_message': str(e)}),
AgentState.ERROR
)
store.save(error_session)
return error_session
State repair. Sometimes an agent ends up in an inconsistent state due to a partial failure. Add a repair function that can validate and fix session state:
def validate_and_repair(session: TaskSession) -> tuple[TaskSession, list[str]]:
issues = []
# Check state consistency
if session.state == AgentState.WRITING_SECTIONS:
if not session.outline:
issues.append("In WRITING_SECTIONS but no outline present")
session = TaskSession(**{
**session.__dict__,
'state': AgentState.GENERATING_OUTLINE
})
if session.state == AgentState.REVIEWING:
if not session.sections:
issues.append("In REVIEWING but no sections written")
session = TaskSession(**{
**session.__dict__,
'state': AgentState.WRITING_SECTIONS
})
return session, issues
Idempotent operations. Design your agent's operations to be safe to re-run. If generating an outline, check if one already exists before generating a new one. If sending an email notification, check if one was already sent for this session. This makes crash recovery safe.
async def generate_outline(session: TaskSession) -> TaskSession:
# Idempotent: skip if already done
if session.outline is not None:
return session
outline = await call_llm_for_outline(session.gathered_requirements)
return TaskSession(**{**session.__dict__, 'outline': outline})
Monitoring and observability
State machines give you something valuable for free: a clear way to monitor agent health.
Track state distribution across all active sessions. If 30% of your sessions are in ERROR state, something is wrong. If sessions are getting stuck in AWAITING_FEEDBACK for days, you might need to implement a timeout and nudge.
def get_state_distribution(store: AgentSessionStore) -> dict[str, int]:
session_ids = store.list_active_sessions()
distribution = {}
for sid in session_ids:
session = store.load(sid)
if session:
state_name = session.state.value
distribution[state_name] = distribution.get(state_name, 0) + 1
return distribution
Track transition times. How long does a session typically spend in each state? Outliers indicate slow tool calls, rate limiting, or stuck states.
Log every state transition with timestamp. This creates an audit trail that's invaluable when a user reports that their agent did something unexpected.
When you don't need a state machine
Not every agent needs explicit state management. A simple question-answering agent or a one-shot code generation tool doesn't need Redis sessions and state machines. Those patterns are for:
- Agents that execute multi-step tasks over minutes or hours
- Agents that need to pause and wait for user input mid-task
- Agents that call expensive external tools whose results you want to cache
- Agents running in multi-instance production environments
- Any agent where you need to be able to audit what happened
For simple agents, a well-structured conversation history and clear system prompt is enough. Save the complexity for where it pays off.
For handling the failure modes that state machines surface, the error recovery patterns guide covers retry logic, exponential backoff, and fallback strategies. And for the caching layer that often accompanies stateful agents (caching tool results, response caching), the agent caching strategies guide has the relevant patterns.