Agentbrisk

AI Agent Rate Limiting: Anthropic, OpenAI, Google Limits and Strategies

April 2, 2026 · Editorial Team · 8 min read · ai-agentsrate-limitinganthropic

Rate limits are the ceiling that every production AI agent eventually hits. The limits are real, they vary significantly by provider and tier, and ignoring them produces unpredictable behavior at the worst possible times.

This is a practical guide to what the limits actually are in 2026 for the major providers, and the strategies to work within (or around) them effectively.


Actual rate limits in 2026

Providers change their limits regularly, but these are representative figures for production tiers. Always check the official documentation for your specific tier, but these give you the right order of magnitude.

Anthropic (Claude API)

Claude's limits are primarily token-based. For Claude 3.7 Sonnet on the Tier 3 plan ($100/month minimum spend):

  • Input tokens per minute (TPM): 200,000
  • Output tokens per minute: 80,000
  • Requests per minute (RPM): 1,000

For Claude 3.5 Haiku on Tier 3:

  • Input TPM: 400,000
  • Output TPM: 160,000
  • RPM: 2,000

Limits scale significantly at higher tiers. Tier 4 (which requires $500+ monthly spend) roughly doubles these numbers. Enterprise contracts are negotiated separately.

OpenAI (GPT-4o)

OpenAI structures limits by tier similarly. For Tier 3 ($100+ monthly spend):

  • GPT-4o: 30,000 RPM, 150M TPM
  • GPT-4o mini: 30,000 RPM, 150M TPM

OpenAI's token limits are much higher than Anthropic's at comparable tiers, but per-request latency on GPT-4o mini is generally lower, which means you can actually use more of that budget.

Google (Gemini API)

Google's Gemini 2.0 Flash on Tier 1 (free):

  • 15 RPM, 1M TPM, 1,500 requests/day

On Pay-as-you-go:

  • Gemini 2.0 Flash: 2,000 RPM, 4M TPM
  • Gemini 2.5 Pro: 150 RPM

The free tier limits are extremely restrictive. For any production use, you need to be on the pay-as-you-go tier.


Understanding the two limit types

Most providers enforce both requests-per-minute (RPM) and tokens-per-minute (TPM). You can hit either limit independently.

Hitting RPM: You're sending too many requests too fast. Common if you have many concurrent short tasks. Fix: add delays between requests, or batch short tasks into fewer larger requests.

Hitting TPM: You're processing too many tokens in a given minute. Common if you're sending large prompts or handling many concurrent long-context requests. Fix: reduce prompt size, reduce concurrency, or distribute load across time.

A single large request (say, 50,000 input tokens) can consume 25% of your per-minute token budget in one call. If you have multiple concurrent processes, they can pile up fast.


Token bucket rate limiting

The token bucket algorithm is the right tool for client-side rate limiting. You have a bucket that fills at a steady rate (one token per time unit) up to a maximum capacity. Each request consumes tokens proportional to its cost. If the bucket is empty, the request waits.

Here's a Python implementation for both RPM and TPM limiting:

import asyncio
import time
from dataclasses import dataclass

@dataclass
class RateLimiter:
    # Requests per minute limit
    rpm_limit: int
    # Tokens per minute limit
    tpm_limit: int
    
    def __post_init__(self):
        # Convert per-minute limits to per-second
        self._rpm_rate = self.rpm_limit / 60.0
        self._tpm_rate = self.tpm_limit / 60.0
        
        # Start buckets full
        self._rpm_tokens = float(self.rpm_limit)
        self._tpm_tokens = float(self.tpm_limit)
        
        self._last_refill = time.monotonic()
        self._lock = asyncio.Lock()
    
    def _refill(self) -> None:
        now = time.monotonic()
        elapsed = now - self._last_refill
        self._last_refill = now
        
        self._rpm_tokens = min(
            self.rpm_limit,
            self._rpm_tokens + elapsed * self._rpm_rate
        )
        self._tpm_tokens = min(
            self.tpm_limit,
            self._tpm_tokens + elapsed * self._tpm_rate
        )
    
    async def acquire(self, estimated_tokens: int) -> None:
        async with self._lock:
            while True:
                self._refill()
                
                if self._rpm_tokens >= 1 and self._tpm_tokens >= estimated_tokens:
                    self._rpm_tokens -= 1
                    self._tpm_tokens -= estimated_tokens
                    return
                
                # Calculate how long to wait
                wait_for_rpm = (1 - self._rpm_tokens) / self._rpm_rate if self._rpm_tokens < 1 else 0
                token_deficit = estimated_tokens - self._tpm_tokens
                wait_for_tpm = token_deficit / self._tpm_rate if token_deficit > 0 else 0
                
                wait_seconds = max(wait_for_rpm, wait_for_tpm) + 0.01
                await asyncio.sleep(wait_seconds)

# Usage:
limiter = RateLimiter(
    rpm_limit=900,      # 90% of the 1,000 RPM Anthropic limit
    tpm_limit=180_000,  # 90% of the 200,000 TPM limit
)

# Before each API call:
await limiter.acquire(estimated_tokens=2000)
response = await anthropic_client.messages.create(...)

The 90% headroom is intentional. Running at 100% of the limit guarantees you'll hit it. Running at 90% gives you buffer for estimation errors and concurrent requests.


Estimating tokens before calling the API

To use token-based rate limiting accurately, you need a token count estimate before making the call. A few approaches:

Use a tokenizer. The tiktoken library (from OpenAI, but roughly accurate for Claude too) can count tokens quickly:

import tiktoken

encoder = tiktoken.get_encoding("cl100k_base")

def estimate_tokens(text: str) -> int:
    return len(encoder.encode(text))

def estimate_message_tokens(messages: list[dict]) -> int:
    total = 0
    for message in messages:
        total += 4  # per-message overhead
        for key, value in message.items():
            total += estimate_tokens(str(value))
    total += 2  # conversation overhead
    return total

For Anthropic specifically, their SDK has a count_tokens method on the client that gives exact counts, but it requires an API call. Use tiktoken for quick estimates and the SDK's counter only when precision matters.

Use heuristics. A rough estimate: 1 token per 4 characters in English text. This isn't accurate but it's fast and avoids the overhead of running a tokenizer on every request.

Overshoot safely. For rate limiting purposes, it's better to slightly overestimate token costs than underestimate them. Budget 20% more tokens than your estimate to account for the system prompt, message overhead, and output tokens.


Request queuing

When the rate limiter is full, requests wait. But in a web application, you don't want user-facing requests to wait indefinitely. A request queue with priority levels handles this better:

import asyncio
from enum import IntEnum
from dataclasses import dataclass, field
from typing import Any

class Priority(IntEnum):
    CRITICAL = 0    # User is actively waiting
    HIGH = 1        # Background task, but time-sensitive
    NORMAL = 2      # Regular background processing
    LOW = 3         # Batch jobs, can wait hours

@dataclass(order=True)
class QueueItem:
    priority: Priority
    created_at: float = field(compare=False)
    future: asyncio.Future = field(compare=False)
    func: Any = field(compare=False)
    tokens: int = field(compare=False)

class PriorityRequestQueue:
    def __init__(self, limiter: RateLimiter, max_wait_critical: float = 5.0):
        self.limiter = limiter
        self.max_wait_critical = max_wait_critical
        self.queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
        self._worker_task = None
    
    def start(self):
        self._worker_task = asyncio.create_task(self._worker())
    
    async def _worker(self):
        while True:
            item = await self.queue.get()
            try:
                await self.limiter.acquire(item.tokens)
                result = await item.func()
                item.future.set_result(result)
            except Exception as e:
                item.future.set_exception(e)
            finally:
                self.queue.task_done()
    
    async def submit(
        self,
        func,
        estimated_tokens: int,
        priority: Priority = Priority.NORMAL,
        timeout: float = 30.0
    ) -> Any:
        future = asyncio.get_event_loop().create_future()
        item = QueueItem(
            priority=priority,
            created_at=time.monotonic(),
            future=future,
            func=func,
            tokens=estimated_tokens
        )
        await self.queue.put(item)
        
        try:
            return await asyncio.wait_for(
                asyncio.shield(future),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            # Remove from queue if still pending
            future.cancel()
            raise TimeoutError(
                f"Request timed out waiting for rate limiter after {timeout}s"
            )

Critical requests (user is waiting at a loading screen) go in with Priority.CRITICAL. Batch processing jobs use Priority.LOW. The queue worker processes higher-priority items first.


Multi-key rotation

For higher throughput requirements, you can rotate across multiple API keys. This multiplies your effective rate limit by the number of keys.

This is permitted by most providers for legitimate multi-product or multi-tenant uses, but check the terms carefully. Rotating keys specifically to circumvent rate limits for a single application is often a terms violation.

The legitimate use case: you have multiple products or customer tenants, each with their own API key and billing. The rotation distributes load across them fairly.

import itertools
from threading import Lock

class KeyRotator:
    def __init__(self, api_keys: list[str], provider: str = "anthropic"):
        if not api_keys:
            raise ValueError("At least one API key required")
        
        self._keys = api_keys
        self._cycle = itertools.cycle(api_keys)
        self._lock = Lock()
        self._key_error_counts: dict[str, int] = {k: 0 for k in api_keys}
        self._key_last_error: dict[str, float] = {k: 0.0 for k in api_keys}
        self._cooldown_seconds = 60.0
    
    def get_key(self) -> str:
        with self._lock:
            # Try up to len(keys) times to find a healthy key
            for _ in range(len(self._keys)):
                key = next(self._cycle)
                last_error = self._key_last_error.get(key, 0.0)
                
                # Skip keys that have been rate-limited recently
                if (self._key_error_counts[key] >= 3 and
                        time.time() - last_error < self._cooldown_seconds):
                    continue
                
                return key
            
            # All keys are rate-limited: return the one with oldest error
            return min(
                self._keys,
                key=lambda k: self._key_last_error[k]
            )
    
    def report_rate_limit_error(self, key: str) -> None:
        with self._lock:
            self._key_error_counts[key] += 1
            self._key_last_error[key] = time.time()
    
    def report_success(self, key: str) -> None:
        with self._lock:
            self._key_error_counts[key] = max(
                0,
                self._key_error_counts[key] - 1
            )

Adaptive concurrency

Instead of fixing your concurrency level and hoping it fits within rate limits, adaptive concurrency adjusts based on observed error rates.

class AdaptiveConcurrencyController:
    def __init__(
        self,
        min_concurrency: int = 1,
        max_concurrency: int = 20,
        initial_concurrency: int = 5
    ):
        self.min_concurrency = min_concurrency
        self.max_concurrency = max_concurrency
        self.current_concurrency = initial_concurrency
        self._semaphore = asyncio.Semaphore(initial_concurrency)
        self._lock = asyncio.Lock()
        self._success_streak = 0
        self._required_successes_to_increase = 10
    
    async def acquire(self):
        return await self._semaphore.acquire()
    
    def release(self):
        self._semaphore.release()
    
    async def report_rate_limit(self):
        async with self._lock:
            new_limit = max(
                self.min_concurrency,
                int(self.current_concurrency * 0.7)  # Back off by 30%
            )
            if new_limit < self.current_concurrency:
                self.current_concurrency = new_limit
                # Recreate semaphore with new limit
                self._semaphore = asyncio.Semaphore(new_limit)
                self._success_streak = 0
    
    async def report_success(self):
        async with self._lock:
            self._success_streak += 1
            if (self._success_streak >= self._required_successes_to_increase and
                    self.current_concurrency < self.max_concurrency):
                self.current_concurrency += 1
                self._semaphore = asyncio.Semaphore(self.current_concurrency)
                self._success_streak = 0

This controller backs off by 30% when it hits a rate limit and slowly increases concurrency again after sustained success. It finds the sustainable throughput level automatically.


Cost and budget limits

Rate limits aren't the only constraint. Cost per day/month matters too. Add budget tracking alongside rate limiting:

class BudgetTracker:
    def __init__(
        self,
        daily_budget_usd: float,
        redis_client  # for persistence across restarts
    ):
        self.daily_budget = daily_budget_usd
        self.redis = redis_client
    
    async def record_usage(
        self,
        input_tokens: int,
        output_tokens: int,
        model: str
    ) -> float:
        # Cost rates in USD per million tokens
        rates = {
            "claude-3-7-sonnet-20250219": (3.0, 15.0),
            "claude-3-5-haiku-20241022": (0.80, 4.0),
            "gpt-4o": (2.5, 10.0),
        }
        
        input_rate, output_rate = rates.get(model, (5.0, 15.0))
        cost = (input_tokens * input_rate + output_tokens * output_rate) / 1_000_000
        
        key = f"ai_spend:{time.strftime('%Y-%m-%d')}"
        new_total = float(await self.redis.incrbyfloat(key, cost))
        await self.redis.expire(key, 86400 * 2)  # Keep 2 days
        
        if new_total > self.daily_budget:
            raise BudgetExceededError(
                f"Daily budget of ${self.daily_budget:.2f} exceeded. "
                f"Current: ${new_total:.2f}"
            )
        
        return new_total
    
    async def get_today_spend(self) -> float:
        key = f"ai_spend:{time.strftime('%Y-%m-%d')}"
        value = await self.redis.get(key)
        return float(value) if value else 0.0

Hard budget limits prevent runaway costs during incidents. A bug in your agent that sends 1,000x the normal token volume will hit the budget limit before it causes a $10,000 bill.


For handling the errors that happen when you do hit rate limits (despite your best prevention), the error recovery guide covers the retry and fallback patterns. And for caching strategies that reduce the total number of API calls you need to make (which directly reduces rate limit pressure), the caching strategies guide covers both prompt caching and response caching.

Search