Orchestration and Multi-Agent Patterns

This guide covers patterns for building multi-agent systems and orchestrating complex workflows with Atomic Agents.

Overview

Orchestration in Atomic Agents enables:

  • Tool Selection: Agents that choose appropriate tools based on input

  • Multi-Agent Pipelines: Chain agents for complex workflows

  • Dynamic Routing: Route queries to specialized agents

  • Parallel Execution: Run multiple agents concurrently

  • Agent Composition: Combine agents for sophisticated behavior

Tool Orchestration Pattern

The most common pattern: an orchestrator agent that selects and invokes tools.

from typing import Union
import instructor
import openai
from pydantic import Field
from atomic_agents import AtomicAgent, AgentConfig, BaseIOSchema
from atomic_agents.context import SystemPromptGenerator


# Define tool input schemas
class SearchToolInput(BaseIOSchema):
    """Input for web search tool."""
    queries: list[str] = Field(..., description="Search queries to execute")


class CalculatorToolInput(BaseIOSchema):
    """Input for calculator tool."""
    expression: str = Field(..., description="Mathematical expression to evaluate")


# Orchestrator output uses Union to select between tools
class OrchestratorOutput(BaseIOSchema):
    """Orchestrator decides which tool to use."""
    reasoning: str = Field(..., description="Why this tool was selected")
    tool_parameters: Union[SearchToolInput, CalculatorToolInput] = Field(
        ..., description="Parameters for the selected tool"
    )


class OrchestratorInput(BaseIOSchema):
    """User query for the orchestrator."""
    query: str = Field(..., description="User's question or request")


# Create the orchestrator agent
client = instructor.from_openai(openai.OpenAI())

orchestrator = AtomicAgent[OrchestratorInput, OrchestratorOutput](
    config=AgentConfig(
        client=client,
        model="gpt-4o-mini",
        system_prompt_generator=SystemPromptGenerator(
            background=[
                "You are an orchestrator that routes queries to appropriate tools.",
                "Use search for factual questions, current events, or lookups.",
                "Use calculator for mathematical expressions and computations."
            ],
            output_instructions=[
                "Analyze the query to determine the best tool.",
                "Provide clear reasoning for your choice.",
                "Format parameters correctly for the selected tool."
            ]
        )
    )
)


def process_query(query: str):
    """Process a query through the orchestrator."""
    result = orchestrator.run(OrchestratorInput(query=query))

    print(f"Reasoning: {result.reasoning}")

    # Route to appropriate tool based on output type
    if isinstance(result.tool_parameters, SearchToolInput):
        print(f"Using Search with queries: {result.tool_parameters.queries}")
        # search_results = search_tool.run(result.tool_parameters)
    elif isinstance(result.tool_parameters, CalculatorToolInput):
        print(f"Using Calculator with: {result.tool_parameters.expression}")
        # calc_result = calculator_tool.run(result.tool_parameters)


# Example usage
process_query("What is the capital of France?")  # Routes to search
process_query("Calculate 15% of 250")  # Routes to calculator

Sequential Pipeline Pattern

Chain multiple agents where each agent’s output feeds the next:

from typing import List
from pydantic import Field
from atomic_agents import AtomicAgent, AgentConfig, BaseIOSchema
from atomic_agents.context import SystemPromptGenerator


# Stage 1: Query Generation
class QueryGenInput(BaseIOSchema):
    topic: str = Field(..., description="Research topic")


class QueryGenOutput(BaseIOSchema):
    queries: List[str] = Field(..., description="Generated search queries")
    rationale: str = Field(..., description="Why these queries were chosen")


# Stage 2: Analysis
class AnalysisInput(BaseIOSchema):
    topic: str = Field(..., description="Original topic")
    search_results: str = Field(..., description="Aggregated search results")


class AnalysisOutput(BaseIOSchema):
    summary: str = Field(..., description="Synthesized summary")
    key_points: List[str] = Field(..., description="Key findings")
    confidence: float = Field(..., ge=0.0, le=1.0, description="Confidence score")


class ResearchPipeline:
    """Multi-stage research pipeline."""

    def __init__(self, client):
        # Query generation agent
        self.query_agent = AtomicAgent[QueryGenInput, QueryGenOutput](
            config=AgentConfig(
                client=client,
                model="gpt-4o-mini",
                system_prompt_generator=SystemPromptGenerator(
                    background=["Generate effective search queries for research."],
                    steps=[
                        "Analyze the topic for key concepts.",
                        "Generate 3-5 diverse, specific queries.",
                        "Cover different aspects of the topic."
                    ]
                )
            )
        )

        # Analysis agent
        self.analysis_agent = AtomicAgent[AnalysisInput, AnalysisOutput](
            config=AgentConfig(
                client=client,
                model="gpt-4o-mini",
                system_prompt_generator=SystemPromptGenerator(
                    background=["Synthesize research into clear summaries."],
                    steps=[
                        "Review all search results.",
                        "Identify patterns and key information.",
                        "Generate a comprehensive summary."
                    ]
                )
            )
        )

    def research(self, topic: str, search_function) -> AnalysisOutput:
        """Execute the full research pipeline."""

        # Stage 1: Generate queries
        query_result = self.query_agent.run(QueryGenInput(topic=topic))
        print(f"Generated {len(query_result.queries)} queries")

        # Stage 2: Execute searches (external function)
        all_results = []
        for query in query_result.queries:
            results = search_function(query)
            all_results.append(f"Query: {query}\nResults: {results}")

        combined_results = "\n\n".join(all_results)

        # Stage 3: Analyze results
        analysis = self.analysis_agent.run(AnalysisInput(
            topic=topic,
            search_results=combined_results
        ))

        return analysis


# Usage
def mock_search(query: str) -> str:
    return f"[Simulated results for: {query}]"

pipeline = ResearchPipeline(client)
result = pipeline.research("renewable energy benefits", mock_search)
print(f"Summary: {result.summary}")
print(f"Confidence: {result.confidence:.0%}")

Parallel Execution Pattern

Run multiple agents concurrently for independent tasks:

import asyncio
from typing import List
from pydantic import Field
from atomic_agents import AtomicAgent, AgentConfig, BaseIOSchema
from atomic_agents.context import SystemPromptGenerator


class AnalysisRequest(BaseIOSchema):
    text: str = Field(..., description="Text to analyze")


class SentimentOutput(BaseIOSchema):
    sentiment: str = Field(..., description="positive, negative, or neutral")
    confidence: float = Field(..., ge=0.0, le=1.0)


class TopicOutput(BaseIOSchema):
    topics: List[str] = Field(..., description="Identified topics")
    primary_topic: str = Field(..., description="Main topic")


class SummaryOutput(BaseIOSchema):
    summary: str = Field(..., description="Brief summary")
    word_count: int = Field(..., description="Original word count")


class ParallelAnalyzer:
    """Runs multiple analysis agents in parallel."""

    def __init__(self, async_client):
        self.sentiment_agent = AtomicAgent[AnalysisRequest, SentimentOutput](
            config=AgentConfig(
                client=async_client,
                model="gpt-4o-mini",
                system_prompt_generator=SystemPromptGenerator(
                    background=["Analyze sentiment of text."]
                )
            )
        )

        self.topic_agent = AtomicAgent[AnalysisRequest, TopicOutput](
            config=AgentConfig(
                client=async_client,
                model="gpt-4o-mini",
                system_prompt_generator=SystemPromptGenerator(
                    background=["Extract topics from text."]
                )
            )
        )

        self.summary_agent = AtomicAgent[AnalysisRequest, SummaryOutput](
            config=AgentConfig(
                client=async_client,
                model="gpt-4o-mini",
                system_prompt_generator=SystemPromptGenerator(
                    background=["Summarize text concisely."]
                )
            )
        )

    async def analyze(self, text: str) -> dict:
        """Run all analyses in parallel."""
        request = AnalysisRequest(text=text)

        # Run all agents concurrently
        sentiment_task = self.sentiment_agent.run_async(request)
        topic_task = self.topic_agent.run_async(request)
        summary_task = self.summary_agent.run_async(request)

        # Wait for all to complete
        sentiment, topics, summary = await asyncio.gather(
            sentiment_task,
            topic_task,
            summary_task
        )

        return {
            "sentiment": sentiment,
            "topics": topics,
            "summary": summary
        }


# Usage
async def main():
    from openai import AsyncOpenAI

    async_client = instructor.from_openai(AsyncOpenAI())
    analyzer = ParallelAnalyzer(async_client)

    text = "The new renewable energy policy has shown promising results..."
    results = await analyzer.analyze(text)

    print(f"Sentiment: {results['sentiment'].sentiment}")
    print(f"Topics: {results['topics'].topics}")
    print(f"Summary: {results['summary'].summary}")


asyncio.run(main())

Router Pattern

Route queries to specialized agents based on classification:

from typing import Literal
from pydantic import Field
from atomic_agents import AtomicAgent, AgentConfig, BaseIOSchema
from atomic_agents.context import SystemPromptGenerator


class RouterInput(BaseIOSchema):
    query: str = Field(..., description="User query to route")


class RouterOutput(BaseIOSchema):
    category: Literal["technical", "creative", "analytical", "general"] = Field(
        ..., description="Query category"
    )
    confidence: float = Field(..., ge=0.0, le=1.0)
    reasoning: str = Field(..., description="Why this category was chosen")


class QueryResponse(BaseIOSchema):
    response: str = Field(..., description="Response to the query")


class AgentRouter:
    """Routes queries to specialized agents."""

    def __init__(self, client):
        # Router agent classifies queries
        self.router = AtomicAgent[RouterInput, RouterOutput](
            config=AgentConfig(
                client=client,
                model="gpt-4o-mini",
                system_prompt_generator=SystemPromptGenerator(
                    background=[
                        "Classify queries into categories:",
                        "- technical: coding, engineering, technical problems",
                        "- creative: writing, art, brainstorming",
                        "- analytical: data analysis, research, comparisons",
                        "- general: other queries"
                    ]
                )
            )
        )

        # Specialized agents for each category
        self.agents = {
            "technical": self._create_agent(client, [
                "You are a technical expert.",
                "Provide detailed, accurate technical answers.",
                "Include code examples when appropriate."
            ]),
            "creative": self._create_agent(client, [
                "You are a creative assistant.",
                "Think outside the box.",
                "Offer imaginative and original ideas."
            ]),
            "analytical": self._create_agent(client, [
                "You are an analytical expert.",
                "Provide data-driven insights.",
                "Structure analysis logically."
            ]),
            "general": self._create_agent(client, [
                "You are a helpful general assistant.",
                "Provide clear, helpful responses."
            ])
        }

    def _create_agent(self, client, background: list) -> AtomicAgent:
        return AtomicAgent[RouterInput, QueryResponse](
            config=AgentConfig(
                client=client,
                model="gpt-4o-mini",
                system_prompt_generator=SystemPromptGenerator(background=background)
            )
        )

    def route_and_respond(self, query: str) -> tuple[str, QueryResponse]:
        """Route query to appropriate agent and get response."""
        # Classify the query
        routing = self.router.run(RouterInput(query=query))
        print(f"Routed to: {routing.category} ({routing.confidence:.0%} confidence)")

        # Get response from specialized agent
        agent = self.agents[routing.category]
        response = agent.run(RouterInput(query=query))

        return routing.category, response


# Usage
router = AgentRouter(client)
category, response = router.route_and_respond("How do I implement a binary search tree?")
print(f"Category: {category}")
print(f"Response: {response.response}")

Context Sharing Between Agents

Share information between agents using context providers:

from typing import List
from pydantic import Field
from atomic_agents import AtomicAgent, AgentConfig, BaseIOSchema
from atomic_agents.context import SystemPromptGenerator, BaseDynamicContextProvider


class SharedKnowledgeProvider(BaseDynamicContextProvider):
    """Shares knowledge between agents."""

    def __init__(self):
        super().__init__(title="Shared Knowledge")
        self.facts: List[str] = []
        self.decisions: List[str] = []

    def add_fact(self, fact: str):
        self.facts.append(fact)

    def add_decision(self, decision: str):
        self.decisions.append(decision)

    def get_info(self) -> str:
        output = []
        if self.facts:
            output.append("Known Facts:")
            output.extend(f"  - {f}" for f in self.facts)
        if self.decisions:
            output.append("Previous Decisions:")
            output.extend(f"  - {d}" for d in self.decisions)
        return "\n".join(output) if output else "No shared knowledge yet."


class FactInput(BaseIOSchema):
    query: str = Field(..., description="Query to process")


class FactOutput(BaseIOSchema):
    facts: List[str] = Field(..., description="Extracted facts")
    has_new_info: bool = Field(..., description="Whether new facts were found")


class DecisionInput(BaseIOSchema):
    question: str = Field(..., description="Decision to make")


class DecisionOutput(BaseIOSchema):
    decision: str = Field(..., description="The decision made")
    reasoning: str = Field(..., description="Reasoning behind decision")


class CollaborativeAgents:
    """Agents that share context and build on each other's work."""

    def __init__(self, client):
        self.shared_knowledge = SharedKnowledgeProvider()

        # Fact extraction agent
        self.fact_agent = AtomicAgent[FactInput, FactOutput](
            config=AgentConfig(
                client=client,
                model="gpt-4o-mini",
                system_prompt_generator=SystemPromptGenerator(
                    background=["Extract factual information from queries."]
                )
            )
        )
        self.fact_agent.register_context_provider("knowledge", self.shared_knowledge)

        # Decision-making agent
        self.decision_agent = AtomicAgent[DecisionInput, DecisionOutput](
            config=AgentConfig(
                client=client,
                model="gpt-4o-mini",
                system_prompt_generator=SystemPromptGenerator(
                    background=[
                        "Make decisions based on available facts.",
                        "Reference the shared knowledge when reasoning."
                    ]
                )
            )
        )
        self.decision_agent.register_context_provider("knowledge", self.shared_knowledge)

    def process_information(self, text: str):
        """Extract facts and add to shared knowledge."""
        result = self.fact_agent.run(FactInput(query=text))
        for fact in result.facts:
            self.shared_knowledge.add_fact(fact)
        return result

    def make_decision(self, question: str):
        """Make decision using shared knowledge."""
        result = self.decision_agent.run(DecisionInput(question=question))
        self.shared_knowledge.add_decision(f"{question} -> {result.decision}")
        return result


# Usage
collab = CollaborativeAgents(client)

# First agent extracts facts
collab.process_information("Solar panels have 20-25 year lifespans and costs dropped 89% since 2010.")
collab.process_information("Wind energy now provides 10% of global electricity.")

# Second agent makes decisions using accumulated knowledge
decision = collab.make_decision("Should we invest in renewable energy?")
print(f"Decision: {decision.decision}")
print(f"Reasoning: {decision.reasoning}")

Supervisor Pattern

A supervisor agent that manages and validates worker agents:

from typing import List, Optional
from pydantic import Field
from atomic_agents import AtomicAgent, AgentConfig, BaseIOSchema
from atomic_agents.context import SystemPromptGenerator


class TaskAssignment(BaseIOSchema):
    task: str = Field(..., description="Task to complete")


class WorkerOutput(BaseIOSchema):
    result: str = Field(..., description="Task result")
    confidence: float = Field(..., ge=0.0, le=1.0)


class SupervisorReview(BaseIOSchema):
    task: str = Field(..., description="Original task")
    worker_result: str = Field(..., description="Worker's result")


class SupervisorOutput(BaseIOSchema):
    approved: bool = Field(..., description="Whether result is approved")
    feedback: Optional[str] = Field(None, description="Feedback if not approved")
    final_result: str = Field(..., description="Final result (possibly refined)")


class SupervisedWorkflow:
    """Workflow with supervisor validation."""

    def __init__(self, client, max_iterations: int = 3):
        self.max_iterations = max_iterations

        # Worker agent
        self.worker = AtomicAgent[TaskAssignment, WorkerOutput](
            config=AgentConfig(
                client=client,
                model="gpt-4o-mini",
                system_prompt_generator=SystemPromptGenerator(
                    background=["Complete assigned tasks thoroughly."]
                )
            )
        )

        # Supervisor agent
        self.supervisor = AtomicAgent[SupervisorReview, SupervisorOutput](
            config=AgentConfig(
                client=client,
                model="gpt-4o-mini",
                system_prompt_generator=SystemPromptGenerator(
                    background=[
                        "Review worker outputs for quality.",
                        "Approve good work, provide feedback for improvements.",
                        "Refine results if needed."
                    ]
                )
            )
        )

    def execute(self, task: str) -> SupervisorOutput:
        """Execute task with supervisor review loop."""

        for iteration in range(self.max_iterations):
            # Worker attempts task
            worker_result = self.worker.run(TaskAssignment(task=task))
            print(f"Iteration {iteration + 1}: Worker confidence {worker_result.confidence:.0%}")

            # Supervisor reviews
            review = self.supervisor.run(SupervisorReview(
                task=task,
                worker_result=worker_result.result
            ))

            if review.approved:
                print("Supervisor approved result")
                return review
            else:
                print(f"Supervisor feedback: {review.feedback}")
                # Update task with feedback for next iteration
                task = f"{task}\n\nPrevious attempt feedback: {review.feedback}"

        print("Max iterations reached, returning best effort")
        return review


# Usage
workflow = SupervisedWorkflow(client)
result = workflow.execute("Write a haiku about programming")
print(f"Final result: {result.final_result}")

Best Practices

1. Design Clear Interfaces

Define explicit input/output schemas for each agent:

# Good: Clear, typed interfaces
class AgentAOutput(BaseIOSchema):
    data: str
    metadata: dict

class AgentBInput(BaseIOSchema):
    data: str  # Explicitly matches AgentAOutput.data

2. Handle Failures Gracefully

Implement fallbacks and error handling:

def execute_with_fallback(primary_agent, fallback_agent, input_data):
    try:
        return primary_agent.run(input_data)
    except Exception as e:
        print(f"Primary failed: {e}, using fallback")
        return fallback_agent.run(input_data)

3. Monitor Agent Interactions

Log inter-agent communication:

def logged_handoff(from_agent: str, to_agent: str, data):
    print(f"[{from_agent}] -> [{to_agent}]: {type(data).__name__}")
    return data

4. Keep Agents Focused

Each agent should have a single responsibility:

# Good: Single responsibility
query_generator = AtomicAgent[...]  # Only generates queries
analyzer = AtomicAgent[...]  # Only analyzes

# Avoid: Multiple responsibilities in one agent
do_everything_agent = AtomicAgent[...]  # Too complex

Summary

Pattern

Use Case

Key Benefit

Tool Orchestration

Dynamic tool selection

Flexible routing

Sequential Pipeline

Multi-step processing

Clear data flow

Parallel Execution

Independent analyses

Performance

Router Pattern

Query classification

Specialization

Context Sharing

Knowledge accumulation

Collaboration

Supervisor Pattern

Quality assurance

Validation

Choose patterns based on your workflow requirements and combine them for sophisticated agent systems.