Hooks Guide
This guide covers the hook system in Atomic Agents, enabling comprehensive monitoring, error handling, and intelligent retry mechanisms.
Overview
The Atomic Agents hook system integrates with Instructor’s event system to provide:
Comprehensive Monitoring: Track all aspects of agent execution
Robust Error Handling: Graceful handling of validation and completion errors
Intelligent Retry Patterns: Implement smart retry logic based on error context
Performance Metrics: Monitor response times, success rates, and error patterns
Zero Overhead: Hooks only execute when registered and enabled
Supported Hook Events
Event |
Description |
When Triggered |
|---|---|---|
|
Pydantic validation failures |
When LLM output doesn’t match schema |
|
Before API calls |
Just before sending request to LLM |
|
After API responses |
When LLM returns a response |
|
API or network errors |
On connection failures, timeouts, etc. |
Basic Hook Registration
Register hooks using the register_hook method on any AtomicAgent:
import os
import instructor
import openai
from atomic_agents import AtomicAgent, AgentConfig, BasicChatInputSchema, BasicChatOutputSchema
from atomic_agents.context import ChatHistory
def on_parse_error(error):
"""Handle validation errors."""
print(f"Validation failed: {error}")
def on_completion_kwargs(**kwargs):
"""Log API call details before request."""
model = kwargs.get("model", "unknown")
print(f"Calling model: {model}")
def on_completion_response(response, **kwargs):
"""Process successful responses."""
if hasattr(response, "usage"):
print(f"Tokens used: {response.usage.total_tokens}")
def on_completion_error(error, **kwargs):
"""Handle API errors."""
print(f"API error: {type(error).__name__}: {error}")
# Create agent
client = instructor.from_openai(openai.OpenAI())
agent = AtomicAgent[BasicChatInputSchema, BasicChatOutputSchema](
config=AgentConfig(
client=client,
model="gpt-4o-mini",
history=ChatHistory()
)
)
# Register hooks
agent.register_hook("parse:error", on_parse_error)
agent.register_hook("completion:kwargs", on_completion_kwargs)
agent.register_hook("completion:response", on_completion_response)
agent.register_hook("completion:error", on_completion_error)
# Use the agent normally - hooks are called automatically
response = agent.run(BasicChatInputSchema(chat_message="Hello!"))
Performance Monitoring
Track request metrics for performance analysis:
import time
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class AgentMetrics:
"""Tracks agent performance metrics."""
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
parse_errors: int = 0
total_response_time: float = 0.0
_request_start: Optional[float] = field(default=None, repr=False)
@property
def success_rate(self) -> float:
if self.total_requests == 0:
return 0.0
return self.successful_requests / self.total_requests * 100
@property
def avg_response_time(self) -> float:
if self.successful_requests == 0:
return 0.0
return self.total_response_time / self.successful_requests
# Create metrics instance
metrics = AgentMetrics()
def on_request_start(**kwargs):
"""Track request start time."""
metrics.total_requests += 1
metrics._request_start = time.time()
def on_request_complete(response, **kwargs):
"""Track successful request metrics."""
if metrics._request_start:
elapsed = time.time() - metrics._request_start
metrics.total_response_time += elapsed
metrics._request_start = None
metrics.successful_requests += 1
def on_request_error(error, **kwargs):
"""Track failed request metrics."""
metrics.failed_requests += 1
metrics._request_start = None
def on_validation_error(error):
"""Track validation errors."""
metrics.parse_errors += 1
# Register metrics hooks
agent.register_hook("completion:kwargs", on_request_start)
agent.register_hook("completion:response", on_request_complete)
agent.register_hook("completion:error", on_request_error)
agent.register_hook("parse:error", on_validation_error)
# After running queries, check metrics
print(f"Success Rate: {metrics.success_rate:.1f}%")
print(f"Avg Response Time: {metrics.avg_response_time:.2f}s")
Detailed Validation Error Handling
Extract detailed information from validation errors:
from pydantic import ValidationError
def detailed_parse_error_handler(error):
"""Extract detailed validation error information."""
if isinstance(error, ValidationError):
print("Validation Error Details:")
for err in error.errors():
# Get field path (e.g., "confidence" or "nested.field")
field_path = " -> ".join(str(x) for x in err["loc"])
error_type = err["type"]
message = err["msg"]
print(f" Field: {field_path}")
print(f" Type: {error_type}")
print(f" Message: {message}")
# Access input value if available
if "input" in err:
print(f" Invalid Value: {err['input']}")
else:
print(f"Parse Error: {error}")
agent.register_hook("parse:error", detailed_parse_error_handler)
Retry Strategies with Hooks
Implement intelligent retry logic based on error context:
import time
from functools import wraps
class RetryHandler:
"""Manages retry logic for agent calls."""
def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
self.max_retries = max_retries
self.base_delay = base_delay
self.current_attempt = 0
self.should_retry = False
def on_error(self, error, **kwargs):
"""Determine if retry is appropriate."""
self.current_attempt += 1
# Check if we should retry
if self.current_attempt < self.max_retries:
# Retry on rate limits and server errors
error_str = str(error).lower()
if any(x in error_str for x in ["rate limit", "timeout", "503", "502"]):
self.should_retry = True
delay = self.base_delay * (2 ** (self.current_attempt - 1))
print(f"Retrying in {delay}s (attempt {self.current_attempt}/{self.max_retries})")
time.sleep(delay)
else:
self.should_retry = False
else:
self.should_retry = False
print(f"Max retries ({self.max_retries}) exceeded")
def on_success(self, response, **kwargs):
"""Reset retry counter on success."""
self.current_attempt = 0
self.should_retry = False
def reset(self):
"""Reset retry state."""
self.current_attempt = 0
self.should_retry = False
def run_with_retry(agent, input_data, retry_handler: RetryHandler):
"""Execute agent with retry logic."""
retry_handler.reset()
while True:
try:
response = agent.run(input_data)
return response
except Exception as e:
if not retry_handler.should_retry:
raise
return None
# Usage
retry_handler = RetryHandler(max_retries=3, base_delay=1.0)
agent.register_hook("completion:error", retry_handler.on_error)
agent.register_hook("completion:response", retry_handler.on_success)
Managing Hooks
Enable/Disable Hooks
Temporarily disable hooks without unregistering:
# Disable all hooks
agent.disable_hooks()
# Run without hook overhead
response = agent.run(input_data)
# Re-enable hooks
agent.enable_hooks()
# Check if hooks are enabled
if agent.hooks_enabled():
print("Hooks are active")
Unregister Hooks
Remove specific hooks or clear all:
# Unregister a specific hook
agent.unregister_hook("parse:error", on_parse_error)
# Clear all hooks
agent.clear_hooks()
Production Logging Pattern
A complete production-ready logging setup:
import logging
import json
from datetime import datetime
from typing import Any, Dict
class ProductionAgentLogger:
"""Production-grade agent logging with hooks."""
def __init__(self, logger_name: str = "atomic_agent"):
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
# Add handler if none exists
if not self.logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
))
self.logger.addHandler(handler)
def log_request(self, **kwargs):
"""Log outgoing request details."""
self.logger.info(json.dumps({
"event": "request_start",
"model": kwargs.get("model"),
"messages_count": len(kwargs.get("messages", [])),
"timestamp": datetime.utcnow().isoformat()
}))
def log_response(self, response, **kwargs):
"""Log response details."""
log_data = {
"event": "request_complete",
"timestamp": datetime.utcnow().isoformat()
}
if hasattr(response, "usage"):
log_data["usage"] = {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
}
self.logger.info(json.dumps(log_data))
def log_error(self, error, **kwargs):
"""Log error details."""
self.logger.error(json.dumps({
"event": "request_error",
"error_type": type(error).__name__,
"error_message": str(error),
"timestamp": datetime.utcnow().isoformat()
}))
def log_validation_error(self, error):
"""Log validation error details."""
self.logger.warning(json.dumps({
"event": "validation_error",
"error_type": type(error).__name__,
"error_message": str(error),
"timestamp": datetime.utcnow().isoformat()
}))
def register_with_agent(self, agent: AtomicAgent):
"""Register all logging hooks with an agent."""
agent.register_hook("completion:kwargs", self.log_request)
agent.register_hook("completion:response", self.log_response)
agent.register_hook("completion:error", self.log_error)
agent.register_hook("parse:error", self.log_validation_error)
# Usage
logger = ProductionAgentLogger("my_agent")
logger.register_with_agent(agent)
Best Practices
1. Keep Hooks Lightweight
Hooks run synchronously - avoid heavy operations:
# Good: Quick logging
def on_response(response, **kwargs):
logger.info(f"Response received")
# Avoid: Heavy processing in hooks
def on_response_slow(response, **kwargs):
# Don't do this - blocks the response
save_to_database(response)
send_to_analytics(response)
generate_report(response)
2. Handle Hook Exceptions
Wrap hook logic to prevent failures from disrupting the agent:
def safe_hook(func):
"""Decorator to catch hook exceptions."""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"Hook error in {func.__name__}: {e}")
return wrapper
@safe_hook
def on_completion_response(response, **kwargs):
# If this fails, the agent continues working
process_response(response)
3. Use Hooks for Cross-Cutting Concerns
Hooks are ideal for:
Logging and monitoring
Metrics collection
Error tracking
Performance profiling
Audit trails
4. Don’t Modify Responses in Hooks
Hooks are for observation, not transformation:
# Good: Observe and log
def on_response(response, **kwargs):
logger.info(f"Got response: {response}")
# Avoid: Trying to modify response
def on_response_bad(response, **kwargs):
response.chat_message = "Modified" # Don't do this
Summary
Feature |
Method |
Description |
|---|---|---|
Register hook |
|
Add a hook callback |
Unregister hook |
|
Remove specific hook |
Clear all hooks |
|
Remove all hooks |
Enable hooks |
|
Activate hook system |
Disable hooks |
|
Deactivate hook system |
Check status |
|
Check if hooks active |
Use hooks to add monitoring and error handling to your agents without modifying core business logic.