Deployment Guide

This guide covers best practices for deploying Atomic Agents applications to production environments.

Overview

Deploying AI agents requires attention to:

  • Configuration Management: Environment-specific settings

  • API Key Security: Secure credential handling

  • Scaling: Handling concurrent requests

  • Monitoring: Observability and alerting

  • Error Handling: Graceful degradation

Environment Configuration

Using Environment Variables

Store configuration in environment variables:

import os
from dataclasses import dataclass
from typing import Optional


@dataclass
class AgentDeploymentConfig:
    """Production configuration for agents."""

    # Required
    openai_api_key: str
    model: str

    # Optional with defaults
    max_tokens: int = 2048
    temperature: float = 0.7
    timeout: float = 30.0
    max_retries: int = 3

    @classmethod
    def from_env(cls) -> "AgentDeploymentConfig":
        """Load configuration from environment variables."""
        api_key = os.getenv("OPENAI_API_KEY")
        if not api_key:
            raise ValueError("OPENAI_API_KEY environment variable is required")

        return cls(
            openai_api_key=api_key,
            model=os.getenv("AGENT_MODEL", "gpt-4o-mini"),
            max_tokens=int(os.getenv("AGENT_MAX_TOKENS", "2048")),
            temperature=float(os.getenv("AGENT_TEMPERATURE", "0.7")),
            timeout=float(os.getenv("AGENT_TIMEOUT", "30.0")),
            max_retries=int(os.getenv("AGENT_MAX_RETRIES", "3")),
        )


# Usage
config = AgentDeploymentConfig.from_env()

Configuration File Pattern

For complex deployments, use configuration files:

import os
import json
from pathlib import Path


def load_config(env: str = None) -> dict:
    """Load environment-specific configuration."""
    env = env or os.getenv("DEPLOYMENT_ENV", "development")

    config_path = Path(f"config/{env}.json")
    if not config_path.exists():
        raise FileNotFoundError(f"Config not found: {config_path}")

    with open(config_path) as f:
        config = json.load(f)

    # Override with environment variables
    if os.getenv("OPENAI_API_KEY"):
        config["openai_api_key"] = os.getenv("OPENAI_API_KEY")

    return config


# config/production.json example:
# {
#     "model": "gpt-4o",
#     "max_tokens": 4096,
#     "timeout": 60,
#     "rate_limit": {
#         "requests_per_minute": 100,
#         "tokens_per_minute": 100000
#     }
# }

Creating Production-Ready Agents

Agent Factory Pattern

Create agents with production configuration:

import instructor
import openai
from atomic_agents import AtomicAgent, AgentConfig, BasicChatInputSchema, BasicChatOutputSchema
from atomic_agents.context import ChatHistory, SystemPromptGenerator


class ProductionAgentFactory:
    """Factory for creating production-configured agents."""

    def __init__(self, config: AgentDeploymentConfig):
        self.config = config
        self.client = instructor.from_openai(
            openai.OpenAI(
                api_key=config.openai_api_key,
                timeout=config.timeout,
                max_retries=config.max_retries
            )
        )

    def create_chat_agent(
        self,
        system_prompt: str = None,
        with_history: bool = True
    ) -> AtomicAgent:
        """Create a production chat agent."""

        history = ChatHistory() if with_history else None

        system_prompt_gen = None
        if system_prompt:
            system_prompt_gen = SystemPromptGenerator(
                background=[system_prompt]
            )

        return AtomicAgent[BasicChatInputSchema, BasicChatOutputSchema](
            config=AgentConfig(
                client=self.client,
                model=self.config.model,
                history=history,
                system_prompt_generator=system_prompt_gen,
                model_api_parameters={
                    "max_tokens": self.config.max_tokens,
                    "temperature": self.config.temperature
                }
            )
        )


# Usage
config = AgentDeploymentConfig.from_env()
factory = ProductionAgentFactory(config)
agent = factory.create_chat_agent(
    system_prompt="You are a helpful customer service agent."
)

FastAPI Integration

Deploy agents as REST APIs:

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from contextlib import asynccontextmanager
import instructor
from openai import AsyncOpenAI
from atomic_agents import AtomicAgent, AgentConfig, BasicChatInputSchema, BasicChatOutputSchema
from atomic_agents.context import ChatHistory


# Request/Response models
class ChatRequest(BaseModel):
    message: str
    session_id: str | None = None


class ChatResponse(BaseModel):
    response: str
    session_id: str


# Session management (use Redis in production)
sessions: dict[str, ChatHistory] = {}


def get_or_create_session(session_id: str | None) -> tuple[str, ChatHistory]:
    """Get existing session or create new one."""
    import uuid

    if session_id and session_id in sessions:
        return session_id, sessions[session_id]

    new_id = session_id or str(uuid.uuid4())
    sessions[new_id] = ChatHistory()
    return new_id, sessions[new_id]


# Global agent (created on startup)
agent: AtomicAgent = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialize agent on startup."""
    global agent
    import os

    client = instructor.from_openai(
        AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    )

    agent = AtomicAgent[BasicChatInputSchema, BasicChatOutputSchema](
        config=AgentConfig(
            client=client,
            model="gpt-4o-mini"
        )
    )
    yield


app = FastAPI(lifespan=lifespan)


@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """Chat endpoint with session management."""
    session_id, history = get_or_create_session(request.session_id)

    # Create agent with session history
    agent.history = history

    try:
        response = await agent.run_async(
            BasicChatInputSchema(chat_message=request.message)
        )
        return ChatResponse(
            response=response.chat_message,
            session_id=session_id
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.delete("/session/{session_id}")
async def delete_session(session_id: str):
    """Delete a chat session."""
    if session_id in sessions:
        del sessions[session_id]
        return {"status": "deleted"}
    raise HTTPException(status_code=404, detail="Session not found")


@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {"status": "healthy", "agent_loaded": agent is not None}

Docker Deployment

Dockerfile

FROM python:3.12-slim

WORKDIR /app

# Install uv for faster dependency installation
RUN pip install uv

# Copy dependency files
COPY pyproject.toml uv.lock ./

# Install dependencies
RUN uv sync --frozen --no-dev

# Copy application code
COPY . .

# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV DEPLOYMENT_ENV=production

# Expose port
EXPOSE 8000

# Run the application
CMD ["uv", "run", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose

version: '3.8'

services:
  agent-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - AGENT_MODEL=gpt-4o-mini
      - DEPLOYMENT_ENV=production
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
    deploy:
      replicas: 3
      resources:
        limits:
          memory: 512M

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

Rate Limiting

Implement rate limiting to control API costs:

import time
from collections import deque
from threading import Lock
from typing import Optional


class RateLimiter:
    """Token bucket rate limiter for API calls."""

    def __init__(
        self,
        requests_per_minute: int = 60,
        tokens_per_minute: int = 100000
    ):
        self.requests_per_minute = requests_per_minute
        self.tokens_per_minute = tokens_per_minute
        self.request_times: deque = deque()
        self.token_usage: deque = deque()  # (timestamp, tokens)
        self.lock = Lock()

    def _clean_old_entries(self, queue: deque, window_seconds: float = 60):
        """Remove entries older than the window."""
        cutoff = time.time() - window_seconds
        while queue and queue[0] < cutoff:
            queue.popleft()

    def can_make_request(self, estimated_tokens: int = 1000) -> tuple[bool, Optional[float]]:
        """Check if request is allowed, return wait time if not."""
        with self.lock:
            now = time.time()

            # Clean old entries
            self._clean_old_entries(self.request_times)

            # Check request rate
            if len(self.request_times) >= self.requests_per_minute:
                wait_time = 60 - (now - self.request_times[0])
                return False, wait_time

            # Check token rate
            self._clean_old_token_entries()
            current_tokens = sum(t[1] for t in self.token_usage)
            if current_tokens + estimated_tokens > self.tokens_per_minute:
                wait_time = 60 - (now - self.token_usage[0][0])
                return False, wait_time

            return True, None

    def _clean_old_token_entries(self):
        """Remove token entries older than 60 seconds."""
        cutoff = time.time() - 60
        while self.token_usage and self.token_usage[0][0] < cutoff:
            self.token_usage.popleft()

    def record_request(self, tokens_used: int = 0):
        """Record a completed request."""
        with self.lock:
            now = time.time()
            self.request_times.append(now)
            if tokens_used > 0:
                self.token_usage.append((now, tokens_used))


class RateLimitedAgent:
    """Agent wrapper with rate limiting."""

    def __init__(self, agent: AtomicAgent, rate_limiter: RateLimiter):
        self.agent = agent
        self.rate_limiter = rate_limiter

    def run(self, input_data, estimated_tokens: int = 1000):
        """Run with rate limiting."""
        can_proceed, wait_time = self.rate_limiter.can_make_request(estimated_tokens)

        if not can_proceed:
            print(f"Rate limited, waiting {wait_time:.1f}s")
            time.sleep(wait_time)

        response = self.agent.run(input_data)
        self.rate_limiter.record_request(estimated_tokens)
        return response


# Usage
rate_limiter = RateLimiter(requests_per_minute=60, tokens_per_minute=100000)
limited_agent = RateLimitedAgent(agent, rate_limiter)

Graceful Shutdown

Handle shutdown signals properly:

import signal
import asyncio
from contextlib import asynccontextmanager


class GracefulShutdown:
    """Manages graceful shutdown for agent services."""

    def __init__(self):
        self.shutdown_event = asyncio.Event()
        self.active_requests = 0

    def setup_signal_handlers(self):
        """Register signal handlers."""
        for sig in (signal.SIGTERM, signal.SIGINT):
            signal.signal(sig, self._signal_handler)

    def _signal_handler(self, signum, frame):
        """Handle shutdown signals."""
        print(f"Received signal {signum}, initiating shutdown...")
        self.shutdown_event.set()

    async def wait_for_shutdown(self, timeout: float = 30.0):
        """Wait for active requests to complete."""
        print(f"Waiting for {self.active_requests} active requests...")

        start = asyncio.get_event_loop().time()
        while self.active_requests > 0:
            if asyncio.get_event_loop().time() - start > timeout:
                print(f"Timeout reached, {self.active_requests} requests still active")
                break
            await asyncio.sleep(0.1)

        print("Shutdown complete")

    @asynccontextmanager
    async def request_context(self):
        """Context manager for tracking active requests."""
        self.active_requests += 1
        try:
            yield
        finally:
            self.active_requests -= 1


# Usage with FastAPI
shutdown_handler = GracefulShutdown()


@asynccontextmanager
async def lifespan(app: FastAPI):
    shutdown_handler.setup_signal_handlers()
    yield
    await shutdown_handler.wait_for_shutdown()


@app.post("/chat")
async def chat(request: ChatRequest):
    async with shutdown_handler.request_context():
        # Process request
        pass

Health Checks

Implement comprehensive health checks:

from datetime import datetime
from pydantic import BaseModel


class HealthStatus(BaseModel):
    status: str
    timestamp: str
    checks: dict[str, bool]
    details: dict[str, str] | None = None


class HealthChecker:
    """Performs health checks for agent deployments."""

    def __init__(self, agent: AtomicAgent):
        self.agent = agent
        self.last_successful_request: datetime | None = None

    async def check_agent_health(self) -> bool:
        """Verify agent can process requests."""
        try:
            # Simple test request
            response = await self.agent.run_async(
                BasicChatInputSchema(chat_message="health check")
            )
            self.last_successful_request = datetime.utcnow()
            return bool(response.chat_message)
        except Exception:
            return False

    def check_api_key_valid(self) -> bool:
        """Verify API key is configured."""
        import os
        return bool(os.getenv("OPENAI_API_KEY"))

    async def get_health_status(self) -> HealthStatus:
        """Get comprehensive health status."""
        checks = {
            "api_key_configured": self.check_api_key_valid(),
            "agent_responsive": await self.check_agent_health(),
        }

        status = "healthy" if all(checks.values()) else "unhealthy"

        details = {}
        if self.last_successful_request:
            details["last_success"] = self.last_successful_request.isoformat()

        return HealthStatus(
            status=status,
            timestamp=datetime.utcnow().isoformat(),
            checks=checks,
            details=details if details else None
        )


# Health check endpoint
@app.get("/health", response_model=HealthStatus)
async def health_check():
    return await health_checker.get_health_status()

Best Practices Summary

Area

Recommendation

Configuration

Use environment variables, never hardcode secrets

API Keys

Store in secrets manager (AWS Secrets Manager, Vault)

Scaling

Use async clients, implement connection pooling

Monitoring

Add health checks, log request/response metrics

Error Handling

Implement retries, circuit breakers, fallbacks

Rate Limiting

Respect API limits, implement client-side limiting

Shutdown

Handle signals, drain connections gracefully

Deployment Checklist

  • [ ] Environment variables configured

  • [ ] API keys stored securely

  • [ ] Health check endpoint implemented

  • [ ] Rate limiting configured

  • [ ] Error handling and retries implemented

  • [ ] Logging and monitoring set up

  • [ ] Graceful shutdown handling

  • [ ] Docker/container configuration

  • [ ] Load balancing configured (if scaling)

  • [ ] Backup/fallback providers configured