AI Agent Rate Limiting: Anthropic, OpenAI, Google Limits and Strategies
Rate limits are the ceiling that every production AI agent eventually hits. The limits are real, they vary significantly by provider and tier, and ignoring them produces unpredictable behavior at the worst possible times.
This is a practical guide to what the limits actually are in 2026 for the major providers, and the strategies to work within (or around) them effectively.
Actual rate limits in 2026
Providers change their limits regularly, but these are representative figures for production tiers. Always check the official documentation for your specific tier, but these give you the right order of magnitude.
Anthropic (Claude API)
Claude's limits are primarily token-based. For Claude 3.7 Sonnet on the Tier 3 plan ($100/month minimum spend):
- Input tokens per minute (TPM): 200,000
- Output tokens per minute: 80,000
- Requests per minute (RPM): 1,000
For Claude 3.5 Haiku on Tier 3:
- Input TPM: 400,000
- Output TPM: 160,000
- RPM: 2,000
Limits scale significantly at higher tiers. Tier 4 (which requires $500+ monthly spend) roughly doubles these numbers. Enterprise contracts are negotiated separately.
OpenAI (GPT-4o)
OpenAI structures limits by tier similarly. For Tier 3 ($100+ monthly spend):
- GPT-4o: 30,000 RPM, 150M TPM
- GPT-4o mini: 30,000 RPM, 150M TPM
OpenAI's token limits are much higher than Anthropic's at comparable tiers, but per-request latency on GPT-4o mini is generally lower, which means you can actually use more of that budget.
Google (Gemini API)
Google's Gemini 2.0 Flash on Tier 1 (free):
- 15 RPM, 1M TPM, 1,500 requests/day
On Pay-as-you-go:
- Gemini 2.0 Flash: 2,000 RPM, 4M TPM
- Gemini 2.5 Pro: 150 RPM
The free tier limits are extremely restrictive. For any production use, you need to be on the pay-as-you-go tier.
Understanding the two limit types
Most providers enforce both requests-per-minute (RPM) and tokens-per-minute (TPM). You can hit either limit independently.
Hitting RPM: You're sending too many requests too fast. Common if you have many concurrent short tasks. Fix: add delays between requests, or batch short tasks into fewer larger requests.
Hitting TPM: You're processing too many tokens in a given minute. Common if you're sending large prompts or handling many concurrent long-context requests. Fix: reduce prompt size, reduce concurrency, or distribute load across time.
A single large request (say, 50,000 input tokens) can consume 25% of your per-minute token budget in one call. If you have multiple concurrent processes, they can pile up fast.
Token bucket rate limiting
The token bucket algorithm is the right tool for client-side rate limiting. You have a bucket that fills at a steady rate (one token per time unit) up to a maximum capacity. Each request consumes tokens proportional to its cost. If the bucket is empty, the request waits.
Here's a Python implementation for both RPM and TPM limiting:
import asyncio
import time
from dataclasses import dataclass
@dataclass
class RateLimiter:
# Requests per minute limit
rpm_limit: int
# Tokens per minute limit
tpm_limit: int
def __post_init__(self):
# Convert per-minute limits to per-second
self._rpm_rate = self.rpm_limit / 60.0
self._tpm_rate = self.tpm_limit / 60.0
# Start buckets full
self._rpm_tokens = float(self.rpm_limit)
self._tpm_tokens = float(self.tpm_limit)
self._last_refill = time.monotonic()
self._lock = asyncio.Lock()
def _refill(self) -> None:
now = time.monotonic()
elapsed = now - self._last_refill
self._last_refill = now
self._rpm_tokens = min(
self.rpm_limit,
self._rpm_tokens + elapsed * self._rpm_rate
)
self._tpm_tokens = min(
self.tpm_limit,
self._tpm_tokens + elapsed * self._tpm_rate
)
async def acquire(self, estimated_tokens: int) -> None:
async with self._lock:
while True:
self._refill()
if self._rpm_tokens >= 1 and self._tpm_tokens >= estimated_tokens:
self._rpm_tokens -= 1
self._tpm_tokens -= estimated_tokens
return
# Calculate how long to wait
wait_for_rpm = (1 - self._rpm_tokens) / self._rpm_rate if self._rpm_tokens < 1 else 0
token_deficit = estimated_tokens - self._tpm_tokens
wait_for_tpm = token_deficit / self._tpm_rate if token_deficit > 0 else 0
wait_seconds = max(wait_for_rpm, wait_for_tpm) + 0.01
await asyncio.sleep(wait_seconds)
# Usage:
limiter = RateLimiter(
rpm_limit=900, # 90% of the 1,000 RPM Anthropic limit
tpm_limit=180_000, # 90% of the 200,000 TPM limit
)
# Before each API call:
await limiter.acquire(estimated_tokens=2000)
response = await anthropic_client.messages.create(...)
The 90% headroom is intentional. Running at 100% of the limit guarantees you'll hit it. Running at 90% gives you buffer for estimation errors and concurrent requests.
Estimating tokens before calling the API
To use token-based rate limiting accurately, you need a token count estimate before making the call. A few approaches:
Use a tokenizer. The tiktoken library (from OpenAI, but roughly accurate for Claude too) can count tokens quickly:
import tiktoken
encoder = tiktoken.get_encoding("cl100k_base")
def estimate_tokens(text: str) -> int:
return len(encoder.encode(text))
def estimate_message_tokens(messages: list[dict]) -> int:
total = 0
for message in messages:
total += 4 # per-message overhead
for key, value in message.items():
total += estimate_tokens(str(value))
total += 2 # conversation overhead
return total
For Anthropic specifically, their SDK has a count_tokens method on the client that gives exact counts, but it requires an API call. Use tiktoken for quick estimates and the SDK's counter only when precision matters.
Use heuristics. A rough estimate: 1 token per 4 characters in English text. This isn't accurate but it's fast and avoids the overhead of running a tokenizer on every request.
Overshoot safely. For rate limiting purposes, it's better to slightly overestimate token costs than underestimate them. Budget 20% more tokens than your estimate to account for the system prompt, message overhead, and output tokens.
Request queuing
When the rate limiter is full, requests wait. But in a web application, you don't want user-facing requests to wait indefinitely. A request queue with priority levels handles this better:
import asyncio
from enum import IntEnum
from dataclasses import dataclass, field
from typing import Any
class Priority(IntEnum):
CRITICAL = 0 # User is actively waiting
HIGH = 1 # Background task, but time-sensitive
NORMAL = 2 # Regular background processing
LOW = 3 # Batch jobs, can wait hours
@dataclass(order=True)
class QueueItem:
priority: Priority
created_at: float = field(compare=False)
future: asyncio.Future = field(compare=False)
func: Any = field(compare=False)
tokens: int = field(compare=False)
class PriorityRequestQueue:
def __init__(self, limiter: RateLimiter, max_wait_critical: float = 5.0):
self.limiter = limiter
self.max_wait_critical = max_wait_critical
self.queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
self._worker_task = None
def start(self):
self._worker_task = asyncio.create_task(self._worker())
async def _worker(self):
while True:
item = await self.queue.get()
try:
await self.limiter.acquire(item.tokens)
result = await item.func()
item.future.set_result(result)
except Exception as e:
item.future.set_exception(e)
finally:
self.queue.task_done()
async def submit(
self,
func,
estimated_tokens: int,
priority: Priority = Priority.NORMAL,
timeout: float = 30.0
) -> Any:
future = asyncio.get_event_loop().create_future()
item = QueueItem(
priority=priority,
created_at=time.monotonic(),
future=future,
func=func,
tokens=estimated_tokens
)
await self.queue.put(item)
try:
return await asyncio.wait_for(
asyncio.shield(future),
timeout=timeout
)
except asyncio.TimeoutError:
# Remove from queue if still pending
future.cancel()
raise TimeoutError(
f"Request timed out waiting for rate limiter after {timeout}s"
)
Critical requests (user is waiting at a loading screen) go in with Priority.CRITICAL. Batch processing jobs use Priority.LOW. The queue worker processes higher-priority items first.
Multi-key rotation
For higher throughput requirements, you can rotate across multiple API keys. This multiplies your effective rate limit by the number of keys.
This is permitted by most providers for legitimate multi-product or multi-tenant uses, but check the terms carefully. Rotating keys specifically to circumvent rate limits for a single application is often a terms violation.
The legitimate use case: you have multiple products or customer tenants, each with their own API key and billing. The rotation distributes load across them fairly.
import itertools
from threading import Lock
class KeyRotator:
def __init__(self, api_keys: list[str], provider: str = "anthropic"):
if not api_keys:
raise ValueError("At least one API key required")
self._keys = api_keys
self._cycle = itertools.cycle(api_keys)
self._lock = Lock()
self._key_error_counts: dict[str, int] = {k: 0 for k in api_keys}
self._key_last_error: dict[str, float] = {k: 0.0 for k in api_keys}
self._cooldown_seconds = 60.0
def get_key(self) -> str:
with self._lock:
# Try up to len(keys) times to find a healthy key
for _ in range(len(self._keys)):
key = next(self._cycle)
last_error = self._key_last_error.get(key, 0.0)
# Skip keys that have been rate-limited recently
if (self._key_error_counts[key] >= 3 and
time.time() - last_error < self._cooldown_seconds):
continue
return key
# All keys are rate-limited: return the one with oldest error
return min(
self._keys,
key=lambda k: self._key_last_error[k]
)
def report_rate_limit_error(self, key: str) -> None:
with self._lock:
self._key_error_counts[key] += 1
self._key_last_error[key] = time.time()
def report_success(self, key: str) -> None:
with self._lock:
self._key_error_counts[key] = max(
0,
self._key_error_counts[key] - 1
)
Adaptive concurrency
Instead of fixing your concurrency level and hoping it fits within rate limits, adaptive concurrency adjusts based on observed error rates.
class AdaptiveConcurrencyController:
def __init__(
self,
min_concurrency: int = 1,
max_concurrency: int = 20,
initial_concurrency: int = 5
):
self.min_concurrency = min_concurrency
self.max_concurrency = max_concurrency
self.current_concurrency = initial_concurrency
self._semaphore = asyncio.Semaphore(initial_concurrency)
self._lock = asyncio.Lock()
self._success_streak = 0
self._required_successes_to_increase = 10
async def acquire(self):
return await self._semaphore.acquire()
def release(self):
self._semaphore.release()
async def report_rate_limit(self):
async with self._lock:
new_limit = max(
self.min_concurrency,
int(self.current_concurrency * 0.7) # Back off by 30%
)
if new_limit < self.current_concurrency:
self.current_concurrency = new_limit
# Recreate semaphore with new limit
self._semaphore = asyncio.Semaphore(new_limit)
self._success_streak = 0
async def report_success(self):
async with self._lock:
self._success_streak += 1
if (self._success_streak >= self._required_successes_to_increase and
self.current_concurrency < self.max_concurrency):
self.current_concurrency += 1
self._semaphore = asyncio.Semaphore(self.current_concurrency)
self._success_streak = 0
This controller backs off by 30% when it hits a rate limit and slowly increases concurrency again after sustained success. It finds the sustainable throughput level automatically.
Cost and budget limits
Rate limits aren't the only constraint. Cost per day/month matters too. Add budget tracking alongside rate limiting:
class BudgetTracker:
def __init__(
self,
daily_budget_usd: float,
redis_client # for persistence across restarts
):
self.daily_budget = daily_budget_usd
self.redis = redis_client
async def record_usage(
self,
input_tokens: int,
output_tokens: int,
model: str
) -> float:
# Cost rates in USD per million tokens
rates = {
"claude-3-7-sonnet-20250219": (3.0, 15.0),
"claude-3-5-haiku-20241022": (0.80, 4.0),
"gpt-4o": (2.5, 10.0),
}
input_rate, output_rate = rates.get(model, (5.0, 15.0))
cost = (input_tokens * input_rate + output_tokens * output_rate) / 1_000_000
key = f"ai_spend:{time.strftime('%Y-%m-%d')}"
new_total = float(await self.redis.incrbyfloat(key, cost))
await self.redis.expire(key, 86400 * 2) # Keep 2 days
if new_total > self.daily_budget:
raise BudgetExceededError(
f"Daily budget of ${self.daily_budget:.2f} exceeded. "
f"Current: ${new_total:.2f}"
)
return new_total
async def get_today_spend(self) -> float:
key = f"ai_spend:{time.strftime('%Y-%m-%d')}"
value = await self.redis.get(key)
return float(value) if value else 0.0
Hard budget limits prevent runaway costs during incidents. A bug in your agent that sends 1,000x the normal token volume will hit the budget limit before it causes a $10,000 bill.
For handling the errors that happen when you do hit rate limits (despite your best prevention), the error recovery guide covers the retry and fallback patterns. And for caching strategies that reduce the total number of API calls you need to make (which directly reduces rate limit pressure), the caching strategies guide covers both prompt caching and response caching.