Logging and Monitoring Guide
This guide covers logging, monitoring, and observability best practices for Atomic Agents applications.
Overview
Effective logging and monitoring enables:
Debugging: Trace issues in agent behavior
Performance Tracking: Identify bottlenecks
Cost Monitoring: Track API usage and costs
Alerting: Detect anomalies and failures
Auditing: Maintain records for compliance
Basic Logging Setup
Configure Python Logging
Set up structured logging for agents:
import logging
import json
from datetime import datetime
def setup_logging(
level: str = "INFO",
log_file: str = None,
json_format: bool = True
):
"""Configure logging for agent applications."""
# Create logger
logger = logging.getLogger("atomic_agents")
logger.setLevel(getattr(logging, level.upper()))
# JSON formatter for structured logs
class JsonFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data)
# Console handler
console_handler = logging.StreamHandler()
if json_format:
console_handler.setFormatter(JsonFormatter())
else:
console_handler.setFormatter(logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
))
logger.addHandler(console_handler)
# File handler (optional)
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(JsonFormatter())
logger.addHandler(file_handler)
return logger
# Usage
logger = setup_logging(level="INFO", json_format=True)
logger.info("Agent initialized", extra={"model": "gpt-4o-mini"})
Agent Logging with Hooks
Comprehensive Request Logging
Use hooks to log all agent interactions:
import time
import logging
import json
from typing import Any, Optional
from dataclasses import dataclass, field
from atomic_agents import AtomicAgent
logger = logging.getLogger("atomic_agents")
@dataclass
class RequestContext:
"""Tracks request context for logging."""
request_id: str
start_time: float
model: Optional[str] = None
input_tokens: Optional[int] = None
output_tokens: Optional[int] = None
class AgentLogger:
"""Comprehensive agent logging using hooks."""
def __init__(self, agent: AtomicAgent):
self.agent = agent
self.current_request: Optional[RequestContext] = None
# Register hooks
agent.register_hook("completion:kwargs", self._on_request_start)
agent.register_hook("completion:response", self._on_request_complete)
agent.register_hook("completion:error", self._on_request_error)
agent.register_hook("parse:error", self._on_parse_error)
def _generate_request_id(self) -> str:
import uuid
return str(uuid.uuid4())[:8]
def _on_request_start(self, **kwargs):
"""Log request start."""
self.current_request = RequestContext(
request_id=self._generate_request_id(),
start_time=time.time(),
model=kwargs.get("model")
)
logger.info(json.dumps({
"event": "request_start",
"request_id": self.current_request.request_id,
"model": self.current_request.model,
"message_count": len(kwargs.get("messages", []))
}))
def _on_request_complete(self, response, **kwargs):
"""Log successful request."""
if not self.current_request:
return
duration = time.time() - self.current_request.start_time
log_data = {
"event": "request_complete",
"request_id": self.current_request.request_id,
"duration_ms": round(duration * 1000, 2),
"model": self.current_request.model
}
# Add token usage if available
if hasattr(response, "usage"):
log_data["tokens"] = {
"prompt": response.usage.prompt_tokens,
"completion": response.usage.completion_tokens,
"total": response.usage.total_tokens
}
logger.info(json.dumps(log_data))
self.current_request = None
def _on_request_error(self, error, **kwargs):
"""Log request error."""
log_data = {
"event": "request_error",
"error_type": type(error).__name__,
"error_message": str(error)
}
if self.current_request:
log_data["request_id"] = self.current_request.request_id
log_data["duration_ms"] = round(
(time.time() - self.current_request.start_time) * 1000, 2
)
logger.error(json.dumps(log_data))
self.current_request = None
def _on_parse_error(self, error):
"""Log validation error."""
logger.warning(json.dumps({
"event": "parse_error",
"request_id": self.current_request.request_id if self.current_request else None,
"error_type": type(error).__name__,
"error_message": str(error)
}))
# Usage
agent_logger = AgentLogger(agent)
# Logs are automatically created for all agent operations
Metrics Collection
Token and Cost Tracking
Track API usage and costs:
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List
import threading
@dataclass
class UsageMetrics:
"""Tracks API usage metrics."""
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
requests: int = 0
errors: int = 0
total_latency_ms: float = 0
# Cost per 1K tokens (example rates)
COST_PER_1K_INPUT = 0.00015 # gpt-4o-mini input
COST_PER_1K_OUTPUT = 0.0006 # gpt-4o-mini output
@property
def avg_latency_ms(self) -> float:
return self.total_latency_ms / self.requests if self.requests > 0 else 0
@property
def estimated_cost(self) -> float:
input_cost = (self.prompt_tokens / 1000) * self.COST_PER_1K_INPUT
output_cost = (self.completion_tokens / 1000) * self.COST_PER_1K_OUTPUT
return input_cost + output_cost
@property
def error_rate(self) -> float:
return self.errors / self.requests if self.requests > 0 else 0
class MetricsCollector:
"""Collects and aggregates agent metrics."""
def __init__(self):
self.current_metrics = UsageMetrics()
self.hourly_metrics: Dict[str, UsageMetrics] = {}
self.lock = threading.Lock()
def record_request(
self,
prompt_tokens: int,
completion_tokens: int,
latency_ms: float,
error: bool = False
):
"""Record a request's metrics."""
with self.lock:
# Update current metrics
self.current_metrics.prompt_tokens += prompt_tokens
self.current_metrics.completion_tokens += completion_tokens
self.current_metrics.total_tokens += prompt_tokens + completion_tokens
self.current_metrics.requests += 1
self.current_metrics.total_latency_ms += latency_ms
if error:
self.current_metrics.errors += 1
# Update hourly bucket
hour_key = datetime.utcnow().strftime("%Y-%m-%d-%H")
if hour_key not in self.hourly_metrics:
self.hourly_metrics[hour_key] = UsageMetrics()
hourly = self.hourly_metrics[hour_key]
hourly.prompt_tokens += prompt_tokens
hourly.completion_tokens += completion_tokens
hourly.total_tokens += prompt_tokens + completion_tokens
hourly.requests += 1
hourly.total_latency_ms += latency_ms
if error:
hourly.errors += 1
def get_summary(self) -> dict:
"""Get metrics summary."""
with self.lock:
return {
"total_requests": self.current_metrics.requests,
"total_tokens": self.current_metrics.total_tokens,
"avg_latency_ms": round(self.current_metrics.avg_latency_ms, 2),
"error_rate": round(self.current_metrics.error_rate * 100, 2),
"estimated_cost_usd": round(self.current_metrics.estimated_cost, 4)
}
def get_hourly_summary(self, hours: int = 24) -> List[dict]:
"""Get hourly metrics for the last N hours."""
with self.lock:
summaries = []
for hour_key, metrics in sorted(self.hourly_metrics.items())[-hours:]:
summaries.append({
"hour": hour_key,
"requests": metrics.requests,
"tokens": metrics.total_tokens,
"cost_usd": round(metrics.estimated_cost, 4)
})
return summaries
# Global metrics collector
metrics = MetricsCollector()
def on_completion_response(response, **kwargs):
"""Hook to record metrics."""
if hasattr(response, "usage"):
metrics.record_request(
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens,
latency_ms=0 # Calculate from request timing
)
# Register with agent
agent.register_hook("completion:response", on_completion_response)
Monitoring Dashboard
FastAPI Metrics Endpoint
Expose metrics via HTTP:
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
app = FastAPI()
class MetricsSummary(BaseModel):
total_requests: int
total_tokens: int
avg_latency_ms: float
error_rate: float
estimated_cost_usd: float
class HourlySummary(BaseModel):
hour: str
requests: int
tokens: int
cost_usd: float
@app.get("/metrics", response_model=MetricsSummary)
async def get_metrics():
"""Get current metrics summary."""
return metrics.get_summary()
@app.get("/metrics/hourly", response_model=List[HourlySummary])
async def get_hourly_metrics(hours: int = 24):
"""Get hourly metrics breakdown."""
return metrics.get_hourly_summary(hours)
@app.get("/metrics/prometheus")
async def prometheus_metrics():
"""Prometheus-compatible metrics endpoint."""
summary = metrics.get_summary()
output = []
output.append(f"# HELP agent_requests_total Total agent requests")
output.append(f"# TYPE agent_requests_total counter")
output.append(f"agent_requests_total {summary['total_requests']}")
output.append(f"# HELP agent_tokens_total Total tokens used")
output.append(f"# TYPE agent_tokens_total counter")
output.append(f"agent_tokens_total {summary['total_tokens']}")
output.append(f"# HELP agent_latency_ms Average latency in ms")
output.append(f"# TYPE agent_latency_ms gauge")
output.append(f"agent_latency_ms {summary['avg_latency_ms']}")
output.append(f"# HELP agent_error_rate Error rate percentage")
output.append(f"# TYPE agent_error_rate gauge")
output.append(f"agent_error_rate {summary['error_rate']}")
return "\n".join(output)
Distributed Tracing
OpenTelemetry Integration
Add distributed tracing for complex systems:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
def setup_tracing(service_name: str = "atomic-agents"):
"""Configure OpenTelemetry tracing."""
# Set up tracer provider
provider = TracerProvider()
# Add OTLP exporter (for Jaeger, Zipkin, etc.)
otlp_exporter = OTLPSpanExporter(
endpoint="http://localhost:4317",
insecure=True
)
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(provider)
# Instrument HTTP client (used by OpenAI SDK)
HTTPXClientInstrumentor().instrument()
return trace.get_tracer(service_name)
tracer = setup_tracing()
class TracedAgent:
"""Agent wrapper with distributed tracing."""
def __init__(self, agent: AtomicAgent):
self.agent = agent
def run(self, input_data):
"""Run with tracing span."""
with tracer.start_as_current_span("agent.run") as span:
span.set_attribute("agent.model", self.agent.model)
span.set_attribute("input.length", len(str(input_data)))
try:
response = self.agent.run(input_data)
span.set_attribute("output.length", len(str(response)))
span.set_status(trace.Status(trace.StatusCode.OK))
return response
except Exception as e:
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
# Usage
traced_agent = TracedAgent(agent)
Alerting
Alert Conditions
Define alert conditions for monitoring:
from dataclasses import dataclass
from typing import Callable, List, Optional
from datetime import datetime
import logging
logger = logging.getLogger("alerts")
@dataclass
class AlertCondition:
"""Defines an alert condition."""
name: str
check: Callable[[], bool]
message: str
severity: str = "warning" # warning, error, critical
class AlertManager:
"""Manages alert conditions and notifications."""
def __init__(self, metrics: MetricsCollector):
self.metrics = metrics
self.conditions: List[AlertCondition] = []
self.last_alerts: dict = {} # Prevent alert spam
def add_condition(self, condition: AlertCondition):
"""Add an alert condition."""
self.conditions.append(condition)
def check_alerts(self) -> List[AlertCondition]:
"""Check all conditions and return triggered alerts."""
triggered = []
now = datetime.utcnow()
for condition in self.conditions:
# Check cooldown (don't alert more than once per 5 minutes)
last_alert = self.last_alerts.get(condition.name)
if last_alert and (now - last_alert).seconds < 300:
continue
if condition.check():
triggered.append(condition)
self.last_alerts[condition.name] = now
self._send_alert(condition)
return triggered
def _send_alert(self, condition: AlertCondition):
"""Send alert notification."""
logger.warning(f"ALERT [{condition.severity}]: {condition.name} - {condition.message}")
# Add integration with Slack, PagerDuty, etc.
# Create alert manager with conditions
alerts = AlertManager(metrics)
# High error rate alert
alerts.add_condition(AlertCondition(
name="high_error_rate",
check=lambda: metrics.current_metrics.error_rate > 0.1,
message="Error rate exceeds 10%",
severity="error"
))
# High latency alert
alerts.add_condition(AlertCondition(
name="high_latency",
check=lambda: metrics.current_metrics.avg_latency_ms > 5000,
message="Average latency exceeds 5 seconds",
severity="warning"
))
# Cost threshold alert
alerts.add_condition(AlertCondition(
name="cost_threshold",
check=lambda: metrics.current_metrics.estimated_cost > 100,
message="Estimated cost exceeds $100",
severity="warning"
))
Log Analysis Patterns
Structured Log Queries
Design logs for easy querying:
import json
from datetime import datetime
class StructuredLogger:
"""Logger optimized for log analysis tools."""
def __init__(self, service: str, environment: str):
self.service = service
self.environment = environment
self.logger = logging.getLogger(service)
def _log(self, level: str, event: str, **extra):
"""Create structured log entry."""
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"service": self.service,
"environment": self.environment,
"level": level,
"event": event,
**extra
}
log_method = getattr(self.logger, level.lower())
log_method(json.dumps(log_entry))
def info(self, event: str, **extra):
self._log("INFO", event, **extra)
def warning(self, event: str, **extra):
self._log("WARNING", event, **extra)
def error(self, event: str, **extra):
self._log("ERROR", event, **extra)
# Specialized log methods
def log_request(self, request_id: str, model: str, user_id: str = None):
self.info(
"agent_request_start",
request_id=request_id,
model=model,
user_id=user_id
)
def log_response(
self,
request_id: str,
duration_ms: float,
tokens: int,
cost: float
):
self.info(
"agent_request_complete",
request_id=request_id,
duration_ms=duration_ms,
tokens=tokens,
cost_usd=cost
)
def log_error(self, request_id: str, error_type: str, error_message: str):
self.error(
"agent_request_failed",
request_id=request_id,
error_type=error_type,
error_message=error_message
)
# Usage
log = StructuredLogger(service="my-agent", environment="production")
log.log_request(request_id="abc123", model="gpt-4o-mini", user_id="user456")
Best Practices
Logging Guidelines
What to Log |
Why |
Example |
|---|---|---|
Request IDs |
Trace requests |
|
Timestamps |
Timeline analysis |
|
Model used |
Cost attribution |
|
Token counts |
Usage tracking |
|
Latency |
Performance monitoring |
|
Error types |
Debugging |
|
User IDs |
Audit trails |
|
What NOT to Log
Full request/response content (privacy)
API keys or secrets
Personal identifiable information (PII)
Sensitive business data
Summary
Component |
Purpose |
Tools |
|---|---|---|
Logging |
Debug & audit |
Python logging, structured JSON |
Metrics |
Performance tracking |
Custom collectors, Prometheus |
Tracing |
Request flow |
OpenTelemetry, Jaeger |
Alerting |
Issue detection |
Custom rules, PagerDuty |
Dashboards |
Visualization |
Grafana, custom endpoints |
Implement logging and monitoring from the start - it’s much harder to add later.