Hooks Guide

This guide covers the hook system in Atomic Agents, enabling comprehensive monitoring, error handling, and intelligent retry mechanisms.

Overview

The Atomic Agents hook system integrates with Instructor’s event system to provide:

  • Comprehensive Monitoring: Track all aspects of agent execution

  • Robust Error Handling: Graceful handling of validation and completion errors

  • Intelligent Retry Patterns: Implement smart retry logic based on error context

  • Performance Metrics: Monitor response times, success rates, and error patterns

  • Zero Overhead: Hooks only execute when registered and enabled

Supported Hook Events

Event

Description

When Triggered

parse:error

Pydantic validation failures

When LLM output doesn’t match schema

completion:kwargs

Before API calls

Just before sending request to LLM

completion:response

After API responses

When LLM returns a response

completion:error

API or network errors

On connection failures, timeouts, etc.

Basic Hook Registration

Register hooks using the register_hook method on any AtomicAgent:

import os
import instructor
import openai
from atomic_agents import AtomicAgent, AgentConfig, BasicChatInputSchema, BasicChatOutputSchema
from atomic_agents.context import ChatHistory


def on_parse_error(error):
    """Handle validation errors."""
    print(f"Validation failed: {error}")


def on_completion_kwargs(**kwargs):
    """Log API call details before request."""
    model = kwargs.get("model", "unknown")
    print(f"Calling model: {model}")


def on_completion_response(response, **kwargs):
    """Process successful responses."""
    if hasattr(response, "usage"):
        print(f"Tokens used: {response.usage.total_tokens}")


def on_completion_error(error, **kwargs):
    """Handle API errors."""
    print(f"API error: {type(error).__name__}: {error}")


# Create agent
client = instructor.from_openai(openai.OpenAI())
agent = AtomicAgent[BasicChatInputSchema, BasicChatOutputSchema](
    config=AgentConfig(
        client=client,
        model="gpt-4o-mini",
        history=ChatHistory()
    )
)

# Register hooks
agent.register_hook("parse:error", on_parse_error)
agent.register_hook("completion:kwargs", on_completion_kwargs)
agent.register_hook("completion:response", on_completion_response)
agent.register_hook("completion:error", on_completion_error)

# Use the agent normally - hooks are called automatically
response = agent.run(BasicChatInputSchema(chat_message="Hello!"))

Performance Monitoring

Track request metrics for performance analysis:

import time
from dataclasses import dataclass, field
from typing import Optional


@dataclass
class AgentMetrics:
    """Tracks agent performance metrics."""

    total_requests: int = 0
    successful_requests: int = 0
    failed_requests: int = 0
    parse_errors: int = 0
    total_response_time: float = 0.0
    _request_start: Optional[float] = field(default=None, repr=False)

    @property
    def success_rate(self) -> float:
        if self.total_requests == 0:
            return 0.0
        return self.successful_requests / self.total_requests * 100

    @property
    def avg_response_time(self) -> float:
        if self.successful_requests == 0:
            return 0.0
        return self.total_response_time / self.successful_requests


# Create metrics instance
metrics = AgentMetrics()


def on_request_start(**kwargs):
    """Track request start time."""
    metrics.total_requests += 1
    metrics._request_start = time.time()


def on_request_complete(response, **kwargs):
    """Track successful request metrics."""
    if metrics._request_start:
        elapsed = time.time() - metrics._request_start
        metrics.total_response_time += elapsed
        metrics._request_start = None
    metrics.successful_requests += 1


def on_request_error(error, **kwargs):
    """Track failed request metrics."""
    metrics.failed_requests += 1
    metrics._request_start = None


def on_validation_error(error):
    """Track validation errors."""
    metrics.parse_errors += 1


# Register metrics hooks
agent.register_hook("completion:kwargs", on_request_start)
agent.register_hook("completion:response", on_request_complete)
agent.register_hook("completion:error", on_request_error)
agent.register_hook("parse:error", on_validation_error)

# After running queries, check metrics
print(f"Success Rate: {metrics.success_rate:.1f}%")
print(f"Avg Response Time: {metrics.avg_response_time:.2f}s")

Detailed Validation Error Handling

Extract detailed information from validation errors:

from pydantic import ValidationError


def detailed_parse_error_handler(error):
    """Extract detailed validation error information."""
    if isinstance(error, ValidationError):
        print("Validation Error Details:")
        for err in error.errors():
            # Get field path (e.g., "confidence" or "nested.field")
            field_path = " -> ".join(str(x) for x in err["loc"])
            error_type = err["type"]
            message = err["msg"]

            print(f"  Field: {field_path}")
            print(f"  Type: {error_type}")
            print(f"  Message: {message}")

            # Access input value if available
            if "input" in err:
                print(f"  Invalid Value: {err['input']}")
    else:
        print(f"Parse Error: {error}")


agent.register_hook("parse:error", detailed_parse_error_handler)

Retry Strategies with Hooks

Implement intelligent retry logic based on error context:

import time
from functools import wraps


class RetryHandler:
    """Manages retry logic for agent calls."""

    def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.current_attempt = 0
        self.should_retry = False

    def on_error(self, error, **kwargs):
        """Determine if retry is appropriate."""
        self.current_attempt += 1

        # Check if we should retry
        if self.current_attempt < self.max_retries:
            # Retry on rate limits and server errors
            error_str = str(error).lower()
            if any(x in error_str for x in ["rate limit", "timeout", "503", "502"]):
                self.should_retry = True
                delay = self.base_delay * (2 ** (self.current_attempt - 1))
                print(f"Retrying in {delay}s (attempt {self.current_attempt}/{self.max_retries})")
                time.sleep(delay)
            else:
                self.should_retry = False
        else:
            self.should_retry = False
            print(f"Max retries ({self.max_retries}) exceeded")

    def on_success(self, response, **kwargs):
        """Reset retry counter on success."""
        self.current_attempt = 0
        self.should_retry = False

    def reset(self):
        """Reset retry state."""
        self.current_attempt = 0
        self.should_retry = False


def run_with_retry(agent, input_data, retry_handler: RetryHandler):
    """Execute agent with retry logic."""
    retry_handler.reset()

    while True:
        try:
            response = agent.run(input_data)
            return response
        except Exception as e:
            if not retry_handler.should_retry:
                raise

    return None


# Usage
retry_handler = RetryHandler(max_retries=3, base_delay=1.0)
agent.register_hook("completion:error", retry_handler.on_error)
agent.register_hook("completion:response", retry_handler.on_success)

Managing Hooks

Enable/Disable Hooks

Temporarily disable hooks without unregistering:

# Disable all hooks
agent.disable_hooks()

# Run without hook overhead
response = agent.run(input_data)

# Re-enable hooks
agent.enable_hooks()

# Check if hooks are enabled
if agent.hooks_enabled():
    print("Hooks are active")

Unregister Hooks

Remove specific hooks or clear all:

# Unregister a specific hook
agent.unregister_hook("parse:error", on_parse_error)

# Clear all hooks
agent.clear_hooks()

Production Logging Pattern

A complete production-ready logging setup:

import logging
import json
from datetime import datetime
from typing import Any, Dict


class ProductionAgentLogger:
    """Production-grade agent logging with hooks."""

    def __init__(self, logger_name: str = "atomic_agent"):
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(logging.INFO)

        # Add handler if none exists
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            handler.setFormatter(logging.Formatter(
                "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
            ))
            self.logger.addHandler(handler)

    def log_request(self, **kwargs):
        """Log outgoing request details."""
        self.logger.info(json.dumps({
            "event": "request_start",
            "model": kwargs.get("model"),
            "messages_count": len(kwargs.get("messages", [])),
            "timestamp": datetime.utcnow().isoformat()
        }))

    def log_response(self, response, **kwargs):
        """Log response details."""
        log_data = {
            "event": "request_complete",
            "timestamp": datetime.utcnow().isoformat()
        }

        if hasattr(response, "usage"):
            log_data["usage"] = {
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens
            }

        self.logger.info(json.dumps(log_data))

    def log_error(self, error, **kwargs):
        """Log error details."""
        self.logger.error(json.dumps({
            "event": "request_error",
            "error_type": type(error).__name__,
            "error_message": str(error),
            "timestamp": datetime.utcnow().isoformat()
        }))

    def log_validation_error(self, error):
        """Log validation error details."""
        self.logger.warning(json.dumps({
            "event": "validation_error",
            "error_type": type(error).__name__,
            "error_message": str(error),
            "timestamp": datetime.utcnow().isoformat()
        }))

    def register_with_agent(self, agent: AtomicAgent):
        """Register all logging hooks with an agent."""
        agent.register_hook("completion:kwargs", self.log_request)
        agent.register_hook("completion:response", self.log_response)
        agent.register_hook("completion:error", self.log_error)
        agent.register_hook("parse:error", self.log_validation_error)


# Usage
logger = ProductionAgentLogger("my_agent")
logger.register_with_agent(agent)

Best Practices

1. Keep Hooks Lightweight

Hooks run synchronously - avoid heavy operations:

# Good: Quick logging
def on_response(response, **kwargs):
    logger.info(f"Response received")

# Avoid: Heavy processing in hooks
def on_response_slow(response, **kwargs):
    # Don't do this - blocks the response
    save_to_database(response)
    send_to_analytics(response)
    generate_report(response)

2. Handle Hook Exceptions

Wrap hook logic to prevent failures from disrupting the agent:

def safe_hook(func):
    """Decorator to catch hook exceptions."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logger.error(f"Hook error in {func.__name__}: {e}")
    return wrapper


@safe_hook
def on_completion_response(response, **kwargs):
    # If this fails, the agent continues working
    process_response(response)

3. Use Hooks for Cross-Cutting Concerns

Hooks are ideal for:

  • Logging and monitoring

  • Metrics collection

  • Error tracking

  • Performance profiling

  • Audit trails

4. Don’t Modify Responses in Hooks

Hooks are for observation, not transformation:

# Good: Observe and log
def on_response(response, **kwargs):
    logger.info(f"Got response: {response}")

# Avoid: Trying to modify response
def on_response_bad(response, **kwargs):
    response.chat_message = "Modified"  # Don't do this

Summary

Feature

Method

Description

Register hook

agent.register_hook(event, callback)

Add a hook callback

Unregister hook

agent.unregister_hook(event, callback)

Remove specific hook

Clear all hooks

agent.clear_hooks()

Remove all hooks

Enable hooks

agent.enable_hooks()

Activate hook system

Disable hooks

agent.disable_hooks()

Deactivate hook system

Check status

agent.hooks_enabled()

Check if hooks active

Use hooks to add monitoring and error handling to your agents without modifying core business logic.