Deployment Guide
This guide covers best practices for deploying Atomic Agents applications to production environments.
Overview
Deploying AI agents requires attention to:
Configuration Management: Environment-specific settings
API Key Security: Secure credential handling
Scaling: Handling concurrent requests
Monitoring: Observability and alerting
Error Handling: Graceful degradation
Environment Configuration
Using Environment Variables
Store configuration in environment variables:
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class AgentDeploymentConfig:
"""Production configuration for agents."""
# Required
openai_api_key: str
model: str
# Optional with defaults
max_tokens: int = 2048
temperature: float = 0.7
timeout: float = 30.0
max_retries: int = 3
@classmethod
def from_env(cls) -> "AgentDeploymentConfig":
"""Load configuration from environment variables."""
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable is required")
return cls(
openai_api_key=api_key,
model=os.getenv("AGENT_MODEL", "gpt-4o-mini"),
max_tokens=int(os.getenv("AGENT_MAX_TOKENS", "2048")),
temperature=float(os.getenv("AGENT_TEMPERATURE", "0.7")),
timeout=float(os.getenv("AGENT_TIMEOUT", "30.0")),
max_retries=int(os.getenv("AGENT_MAX_RETRIES", "3")),
)
# Usage
config = AgentDeploymentConfig.from_env()
Configuration File Pattern
For complex deployments, use configuration files:
import os
import json
from pathlib import Path
def load_config(env: str = None) -> dict:
"""Load environment-specific configuration."""
env = env or os.getenv("DEPLOYMENT_ENV", "development")
config_path = Path(f"config/{env}.json")
if not config_path.exists():
raise FileNotFoundError(f"Config not found: {config_path}")
with open(config_path) as f:
config = json.load(f)
# Override with environment variables
if os.getenv("OPENAI_API_KEY"):
config["openai_api_key"] = os.getenv("OPENAI_API_KEY")
return config
# config/production.json example:
# {
# "model": "gpt-4o",
# "max_tokens": 4096,
# "timeout": 60,
# "rate_limit": {
# "requests_per_minute": 100,
# "tokens_per_minute": 100000
# }
# }
Creating Production-Ready Agents
Agent Factory Pattern
Create agents with production configuration:
import instructor
import openai
from atomic_agents import AtomicAgent, AgentConfig, BasicChatInputSchema, BasicChatOutputSchema
from atomic_agents.context import ChatHistory, SystemPromptGenerator
class ProductionAgentFactory:
"""Factory for creating production-configured agents."""
def __init__(self, config: AgentDeploymentConfig):
self.config = config
self.client = instructor.from_openai(
openai.OpenAI(
api_key=config.openai_api_key,
timeout=config.timeout,
max_retries=config.max_retries
)
)
def create_chat_agent(
self,
system_prompt: str = None,
with_history: bool = True
) -> AtomicAgent:
"""Create a production chat agent."""
history = ChatHistory() if with_history else None
system_prompt_gen = None
if system_prompt:
system_prompt_gen = SystemPromptGenerator(
background=[system_prompt]
)
return AtomicAgent[BasicChatInputSchema, BasicChatOutputSchema](
config=AgentConfig(
client=self.client,
model=self.config.model,
history=history,
system_prompt_generator=system_prompt_gen,
model_api_parameters={
"max_tokens": self.config.max_tokens,
"temperature": self.config.temperature
}
)
)
# Usage
config = AgentDeploymentConfig.from_env()
factory = ProductionAgentFactory(config)
agent = factory.create_chat_agent(
system_prompt="You are a helpful customer service agent."
)
FastAPI Integration
Deploy agents as REST APIs:
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from contextlib import asynccontextmanager
import instructor
from openai import AsyncOpenAI
from atomic_agents import AtomicAgent, AgentConfig, BasicChatInputSchema, BasicChatOutputSchema
from atomic_agents.context import ChatHistory
# Request/Response models
class ChatRequest(BaseModel):
message: str
session_id: str | None = None
class ChatResponse(BaseModel):
response: str
session_id: str
# Session management (use Redis in production)
sessions: dict[str, ChatHistory] = {}
def get_or_create_session(session_id: str | None) -> tuple[str, ChatHistory]:
"""Get existing session or create new one."""
import uuid
if session_id and session_id in sessions:
return session_id, sessions[session_id]
new_id = session_id or str(uuid.uuid4())
sessions[new_id] = ChatHistory()
return new_id, sessions[new_id]
# Global agent (created on startup)
agent: AtomicAgent = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize agent on startup."""
global agent
import os
client = instructor.from_openai(
AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
)
agent = AtomicAgent[BasicChatInputSchema, BasicChatOutputSchema](
config=AgentConfig(
client=client,
model="gpt-4o-mini"
)
)
yield
app = FastAPI(lifespan=lifespan)
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""Chat endpoint with session management."""
session_id, history = get_or_create_session(request.session_id)
# Create agent with session history
agent.history = history
try:
response = await agent.run_async(
BasicChatInputSchema(chat_message=request.message)
)
return ChatResponse(
response=response.chat_message,
session_id=session_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/session/{session_id}")
async def delete_session(session_id: str):
"""Delete a chat session."""
if session_id in sessions:
del sessions[session_id]
return {"status": "deleted"}
raise HTTPException(status_code=404, detail="Session not found")
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "agent_loaded": agent is not None}
Docker Deployment
Dockerfile
FROM python:3.12-slim
WORKDIR /app
# Install uv for faster dependency installation
RUN pip install uv
# Copy dependency files
COPY pyproject.toml uv.lock ./
# Install dependencies
RUN uv sync --frozen --no-dev
# Copy application code
COPY . .
# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV DEPLOYMENT_ENV=production
# Expose port
EXPOSE 8000
# Run the application
CMD ["uv", "run", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Docker Compose
version: '3.8'
services:
agent-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- AGENT_MODEL=gpt-4o-mini
- DEPLOYMENT_ENV=production
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
deploy:
replicas: 3
resources:
limits:
memory: 512M
redis:
image: redis:7-alpine
ports:
- "6379:6379"
Rate Limiting
Implement rate limiting to control API costs:
import time
from collections import deque
from threading import Lock
from typing import Optional
class RateLimiter:
"""Token bucket rate limiter for API calls."""
def __init__(
self,
requests_per_minute: int = 60,
tokens_per_minute: int = 100000
):
self.requests_per_minute = requests_per_minute
self.tokens_per_minute = tokens_per_minute
self.request_times: deque = deque()
self.token_usage: deque = deque() # (timestamp, tokens)
self.lock = Lock()
def _clean_old_entries(self, queue: deque, window_seconds: float = 60):
"""Remove entries older than the window."""
cutoff = time.time() - window_seconds
while queue and queue[0] < cutoff:
queue.popleft()
def can_make_request(self, estimated_tokens: int = 1000) -> tuple[bool, Optional[float]]:
"""Check if request is allowed, return wait time if not."""
with self.lock:
now = time.time()
# Clean old entries
self._clean_old_entries(self.request_times)
# Check request rate
if len(self.request_times) >= self.requests_per_minute:
wait_time = 60 - (now - self.request_times[0])
return False, wait_time
# Check token rate
self._clean_old_token_entries()
current_tokens = sum(t[1] for t in self.token_usage)
if current_tokens + estimated_tokens > self.tokens_per_minute:
wait_time = 60 - (now - self.token_usage[0][0])
return False, wait_time
return True, None
def _clean_old_token_entries(self):
"""Remove token entries older than 60 seconds."""
cutoff = time.time() - 60
while self.token_usage and self.token_usage[0][0] < cutoff:
self.token_usage.popleft()
def record_request(self, tokens_used: int = 0):
"""Record a completed request."""
with self.lock:
now = time.time()
self.request_times.append(now)
if tokens_used > 0:
self.token_usage.append((now, tokens_used))
class RateLimitedAgent:
"""Agent wrapper with rate limiting."""
def __init__(self, agent: AtomicAgent, rate_limiter: RateLimiter):
self.agent = agent
self.rate_limiter = rate_limiter
def run(self, input_data, estimated_tokens: int = 1000):
"""Run with rate limiting."""
can_proceed, wait_time = self.rate_limiter.can_make_request(estimated_tokens)
if not can_proceed:
print(f"Rate limited, waiting {wait_time:.1f}s")
time.sleep(wait_time)
response = self.agent.run(input_data)
self.rate_limiter.record_request(estimated_tokens)
return response
# Usage
rate_limiter = RateLimiter(requests_per_minute=60, tokens_per_minute=100000)
limited_agent = RateLimitedAgent(agent, rate_limiter)
Graceful Shutdown
Handle shutdown signals properly:
import signal
import asyncio
from contextlib import asynccontextmanager
class GracefulShutdown:
"""Manages graceful shutdown for agent services."""
def __init__(self):
self.shutdown_event = asyncio.Event()
self.active_requests = 0
def setup_signal_handlers(self):
"""Register signal handlers."""
for sig in (signal.SIGTERM, signal.SIGINT):
signal.signal(sig, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals."""
print(f"Received signal {signum}, initiating shutdown...")
self.shutdown_event.set()
async def wait_for_shutdown(self, timeout: float = 30.0):
"""Wait for active requests to complete."""
print(f"Waiting for {self.active_requests} active requests...")
start = asyncio.get_event_loop().time()
while self.active_requests > 0:
if asyncio.get_event_loop().time() - start > timeout:
print(f"Timeout reached, {self.active_requests} requests still active")
break
await asyncio.sleep(0.1)
print("Shutdown complete")
@asynccontextmanager
async def request_context(self):
"""Context manager for tracking active requests."""
self.active_requests += 1
try:
yield
finally:
self.active_requests -= 1
# Usage with FastAPI
shutdown_handler = GracefulShutdown()
@asynccontextmanager
async def lifespan(app: FastAPI):
shutdown_handler.setup_signal_handlers()
yield
await shutdown_handler.wait_for_shutdown()
@app.post("/chat")
async def chat(request: ChatRequest):
async with shutdown_handler.request_context():
# Process request
pass
Health Checks
Implement comprehensive health checks:
from datetime import datetime
from pydantic import BaseModel
class HealthStatus(BaseModel):
status: str
timestamp: str
checks: dict[str, bool]
details: dict[str, str] | None = None
class HealthChecker:
"""Performs health checks for agent deployments."""
def __init__(self, agent: AtomicAgent):
self.agent = agent
self.last_successful_request: datetime | None = None
async def check_agent_health(self) -> bool:
"""Verify agent can process requests."""
try:
# Simple test request
response = await self.agent.run_async(
BasicChatInputSchema(chat_message="health check")
)
self.last_successful_request = datetime.utcnow()
return bool(response.chat_message)
except Exception:
return False
def check_api_key_valid(self) -> bool:
"""Verify API key is configured."""
import os
return bool(os.getenv("OPENAI_API_KEY"))
async def get_health_status(self) -> HealthStatus:
"""Get comprehensive health status."""
checks = {
"api_key_configured": self.check_api_key_valid(),
"agent_responsive": await self.check_agent_health(),
}
status = "healthy" if all(checks.values()) else "unhealthy"
details = {}
if self.last_successful_request:
details["last_success"] = self.last_successful_request.isoformat()
return HealthStatus(
status=status,
timestamp=datetime.utcnow().isoformat(),
checks=checks,
details=details if details else None
)
# Health check endpoint
@app.get("/health", response_model=HealthStatus)
async def health_check():
return await health_checker.get_health_status()
Best Practices Summary
Area |
Recommendation |
|---|---|
Configuration |
Use environment variables, never hardcode secrets |
API Keys |
Store in secrets manager (AWS Secrets Manager, Vault) |
Scaling |
Use async clients, implement connection pooling |
Monitoring |
Add health checks, log request/response metrics |
Error Handling |
Implement retries, circuit breakers, fallbacks |
Rate Limiting |
Respect API limits, implement client-side limiting |
Shutdown |
Handle signals, drain connections gracefully |
Deployment Checklist
[ ] Environment variables configured
[ ] API keys stored securely
[ ] Health check endpoint implemented
[ ] Rate limiting configured
[ ] Error handling and retries implemented
[ ] Logging and monitoring set up
[ ] Graceful shutdown handling
[ ] Docker/container configuration
[ ] Load balancing configured (if scaling)
[ ] Backup/fallback providers configured