MCP Server and Client Example
This guide provides a detailed overview of the Model Context Protocol (MCP) server and client example implementation in the Atomic Agents framework.
Overview
The MCP example demonstrates how to build an intelligent agent system using the Model Context Protocol, showcasing both server and client implementations. The example supports three transport methods: STDIO, Server-Sent Events (SSE), and HTTP Stream.
Architecture
MCP Server
The server component (example-mcp-server
) is built using:
FastMCP: A high-performance MCP server implementation
Starlette: A lightweight ASGI framework
Uvicorn: An ASGI server implementation
Key components:
Transport Layers:
server_stdio.py
: Implements STDIO-based communicationserver_sse.py
: Implements SSE-based HTTP communicationserver_http.py
: Implements unified HTTP streaming transport
Tools Service:
Manages registration and execution of MCP tools
Handles tool discovery and metadata
Resource Service:
Manages static resources
Handles resource discovery and access
Built-in Tools:
AddNumbersTool
: Performs additionSubtractNumbersTool
: Performs subtractionMultiplyNumbersTool
: Performs multiplicationDivideNumbersTool
: Performs division
MCP Client
The client component (example-client
) is an intelligent agent that:
Tool Discovery:
Dynamically discovers available tools from the MCP server
Builds a schema-based tool registry
Query Processing:
Uses GPT models for natural language understanding
Extracts parameters from user queries
Selects appropriate tools based on intent
Execution Flow:
Maintains conversation context
Handles tool execution results
Provides conversational responses
Implementation Details
Server Implementation
The server supports three transport methods:
SSE Transport (
server_sse.py
):
# Initialize FastMCP server
mcp = FastMCP("example-mcp-server")
# Register tools and resources
tool_service.register_tools(get_available_tools())
resource_service.register_resources(get_available_resources())
# Create Starlette app with CORS support
app = create_starlette_app(mcp_server)
STDIO Transport (
server_stdio.py
):
Runs as a subprocess
Communicates through standard input/output
Ideal for local development
HTTP Stream Transport (
server_http.py
):
Single
/mcp
endpoint for JSON-RPC and SSE-style streamingHandles session via
Mcp-Session-Id
header; allows resumable and cancelable streams
Client Implementation
The client uses a sophisticated orchestration system:
Tool Management:
# Fetch available tools (synchronous)
tools = fetch_mcp_tools(
mcp_endpoint=config.mcp_server_url,
transport_type=MCPTransportType.HTTP_STREAM,
)
# Or fetch tools asynchronously (must be called within async context)
tools = await fetch_mcp_tools_async(
mcp_endpoint=config.mcp_server_command,
transport_type=MCPTransportType.STDIO,
client_session=session, # Optional pre-initialized session
)
# Build tool schema mapping
tool_schema_to_class_map = {
ToolClass.input_schema: ToolClass
for ToolClass in tools
if hasattr(ToolClass, "input_schema")
}
Query Processing:
Uses an orchestrator agent to analyze queries
Extracts parameters and selects appropriate tools
Maintains conversation context through ChatHistory
Async vs Sync Tool Fetching:
# Synchronous fetching (suitable for HTTP/SSE)
tools = fetch_mcp_tools(
mcp_endpoint="http://localhost:6969",
transport_type=MCPTransportType.HTTP_STREAM
)
# Asynchronous fetching (optimized for STDIO with persistent sessions)
async def setup_tools():
tools = await fetch_mcp_tools_async(
transport_type=MCPTransportType.STDIO,
client_session=session # Reuse existing session
)
return tools
MCP Transport Methods
The example implements three distinct transport methods via the MCPTransportType
enum, each with its own advantages:
from atomic_agents.connectors.mcp.mcp_definition_service import MCPTransportType
# Available transport types
MCPTransportType.STDIO # Standard input/output transport
MCPTransportType.SSE # Server-Sent Events transport
MCPTransportType.HTTP_STREAM # HTTP Stream transport
1. STDIO Transport
STDIO transport uses standard input/output streams for communication between the client and server:
# Client-side STDIO setup (from main_stdio.py)
async def _bootstrap_stdio():
stdio_exit_stack = AsyncExitStack()
command_parts = shlex.split(config.mcp_stdio_server_command)
server_params = StdioServerParameters(command=command_parts[0], args=command_parts[1:], env=None)
read_stream, write_stream = await stdio_exit_stack.enter_async_context(stdio_client(server_params))
session = await stdio_exit_stack.enter_async_context(ClientSession(read_stream, write_stream))
await session.initialize()
return session
Key advantages:
No network configuration required
Simple local setup
Direct process communication
Lower latency for local usage
Use cases:
Development and testing
Single-user environments
Embedded agent applications
Offline operation
2. SSE Transport
Server-Sent Events (SSE) transport uses HTTP long-polling for real-time, one-way communication:
# Server-side SSE setup (from server_sse.py)
async def handle_sse(request: Request) -> None:
async with sse.connect_sse(
request.scope,
request.receive,
request._send, # noqa: SLF001
) as (read_stream, write_stream):
await mcp_server.run(
read_stream,
write_stream,
mcp_server.create_initialization_options(),
)
Key advantages:
Multiple clients can connect to a single server
Network-based communication
Stateless server architecture
Suitable for distributed systems
Use cases:
Production deployments
Multi-user environments
Scalable agent infrastructure
Cross-network operation
3. HTTP Stream Transport
HTTP Stream transport uses a single /mcp
endpoint for JSON-RPC and streaming communication:
# Client-side HTTP Stream setup
tools = fetch_mcp_tools(
mcp_endpoint="http://localhost:6969",
transport_type=MCPTransportType.HTTP_STREAM,
)
Key advantages:
Single endpoint for all MCP operations
Session management via headers
Resumable and cancelable streams
Modern HTTP-based architecture
Use cases:
Modern web applications
Cloud-native deployments
Microservice architectures
API gateway integration
Interfaces
Tool Interface
The MCP server defines a standardized tool interface that all tools must implement:
class Tool(ABC):
"""Abstract base class for all tools."""
name: ClassVar[str]
description: ClassVar[str]
input_model: ClassVar[Type[BaseToolInput]]
output_model: ClassVar[Optional[Type[BaseModel]]] = None
@abstractmethod
async def execute(self, input_data: BaseToolInput) -> ToolResponse:
"""Execute the tool with given arguments."""
pass
def get_schema(self) -> Dict[str, Any]:
"""Get JSON schema for the tool."""
schema = {
"name": self.name,
"description": self.description,
"input": self.input_model.model_json_schema(),
}
if self.output_model:
schema["output"] = self.output_model.model_json_schema()
return schema
The tool interface consists of:
Class Variables:
name
: Tool identifier used in MCP communicationsdescription
: Human-readable tool descriptioninput_model
: Pydantic model defining input parametersoutput_model
: Pydantic model defining output structure (optional)
Execute Method:
Asynchronous method that performs the tool’s functionality
Takes strongly-typed input data
Returns a structured ToolResponse
Schema Method:
Provides JSON Schema for tool discovery
Enables automatic documentation generation
Facilitates client-side validation
Resource Interface
The MCP server defines a standardized resource interface that all resources must implement:
class Resource(ABC):
"""Abstract base class for all resources."""
name: ClassVar[str]
description: ClassVar[str]
uri: ClassVar[str]
mime_type: ClassVar[str]
input_model: ClassVar[Type[BaseResourceInput]]
output_model: ClassVar[Optional[Type[BaseModel]]] = None
@abstractmethod
async def read(self, input_data: BaseResourceInput) -> ResourceResponse:
"""Read data from the resource."""
pass
def get_schema(self) -> Dict[str, Any]:
"""Get JSON schema for the resource."""
schema = {
"name": self.name,
"description": self.description,
"uri": self.uri,
"mime_type": self.mime_type,
"input": self.input_model.model_json_schema(),
}
if self.output_model:
schema["output"] = self.output_model.model_json_schema()
return schema
The resource interface consists of:
Class Variables:
name
: Resource identifier used in MCP communicationsdescription
: Human-readable resource descriptionuri
: URI pattern for accessing the resourcemime_type
: MIME type of the resource contentinput_model
: Pydantic model defining input parametersoutput_model
: Pydantic model defining output structure (optional)
Read Method:
Asynchronous method that retrieves data from the resource
Takes strongly-typed input data
Returns a structured ResourceResponse
Schema Method:
Provides URI Template for resource discovery
Enables automatic documentation generation
Prompt Interface
The MCP client uses a standardized prompt interface for managing prompts:
class Prompt(ABC):
"""Abstract base class for all prompts."""
name: ClassVar[str]
description: ClassVar[str]
input_model: ClassVar[Type[BasePromptInput]]
output_model: ClassVar[Optional[Type[BaseModel]]] = None
@abstractmethod
async def generate(self, input_data: BasePromptInput) -> PromptResponse:
"""Generate the prompt with given arguments."""
pass
def get_schema(self) -> Dict[str, Any]:
"""Get JSON schema for the prompt."""
schema = {
"name": self.name,
"description": self.description,
"input": self.input_model.model_json_schema(),
}
if self.output_model:
schema["output"] = self.output_model.model_json_schema()
return schema
The prompt interface consists of:
Class Variables:
name
: Prompt identifier used in MCP communicationsdescription
: Human-readable prompt descriptioninput_model
: Pydantic model defining input parametersoutput_model
: Pydantic model defining output structure (optional)
Generate Method:
Asynchronous method that generates the prompt
Takes strongly-typed input data
Returns a structured PromptResponse
Schema Method:
Provides JSON Schema for prompt discovery
Enables automatic documentation generation
Configuration
Server Configuration
The server can be configured through command-line arguments:
poetry run example-mcp-server --mode=sse --host=0.0.0.0 --port=6969 --reload
poetry run example-mcp-server --mode=stdio
poetry run example-mcp-server --mode=http_stream --host=0.0.0.0 --port=6969
Options:
--mode
: Transport mode (sse/stdio/http_stream)--host
: Host to bind to--port
: Port to listen on--reload
: Enable auto-reload for development
Client Configuration
The client uses a configuration class:
@dataclass
class MCPConfig:
mcp_server_url: str = "http://localhost:6969"
openai_model: str = "gpt-4o-mini"
openai_api_key: str = os.getenv("OPENAI_API_KEY")
For STDIO transport, additional options are available:
@dataclass
class MCPConfig:
openai_model: str = "gpt-4o-mini"
openai_api_key: str = os.getenv("OPENAI_API_KEY")
mcp_stdio_server_command: str = "poetry run example-mcp-server --mode stdio"
Usage Examples
Start the Server (SSE mode):
cd example-mcp-server
poetry run example-mcp-server --mode=sse
Run the Client:
Using the main launcher with transport selection:
cd example-client
poetry run python -m example_client.main --transport sse
Directly calling the SSE client:
cd example-client
poetry run python -m example_client.main_sse
Directly calling the STDIO client:
cd example-client
poetry run python -m example_client.main_stdio
Example Queries:
You: What is 2+2?
You: Calculate the square root of 144
You: Generate a random number between 1 and 100
Best Practices
Development:
Use STDIO transport for local development
Enable server auto-reload during development
Implement proper error handling
Production:
Use SSE transport for production deployments
Configure appropriate CORS settings
Implement authentication if needed
Tool Development:
Follow the Tool interface contract
Provide clear input/output schemas
Include comprehensive documentation
Extending the Example
Adding New Tools
To add new tools:
Create a new tool class implementing the Tool interface
Register the tool in the server’s tool service
The client will automatically discover and use the new tool
Example tool structure:
class MyNewTool(Tool):
name = "my_new_tool"
description = "This tool performs a custom operation"
input_model = create_model(
"MyNewToolInput",
param1=(str, Field(..., description="First parameter")),
param2=(int, Field(..., description="Second parameter")),
__base__=BaseToolInput
)
async def execute(self, input_data: BaseToolInput) -> ToolResponse:
# Access params with input_data.param1, input_data.param2
result = f"Processed {input_data.param1} with {input_data.param2}"
return ToolResponse.from_text(result)
Then register the tool in the server:
def get_available_tools() -> List[Tool]:
return [
# ... existing tools ...
MyNewTool(),
]
Adding New Resources
To add new resources:
Create a new resource class implementing the Resource interface
Register the resource in the server’s resource service
The client can access the new resource via its URI
Example resource structure:
class MyNewResource(Resource):
name = "my_new_resource"
description = "This resource provides custom data"
uri = "resource://my_new_resource/{param1}"
mime_type = "application/json"
input_model = create_model(
"MyNewResourceInput",
param1=(str, Field(..., description="Resource parameter")),
__base__=BaseResourceInput
)
async def read(self, input_data: BaseResourceInput) -> ResourceResponse:
# Access params with input_data.param1
data = {"message": f"Data for {input_data.param1}"}
return ResourceResponse.from_data(data, self.mime_type)
Then register the resource in the server:
def get_available_resources() -> List[Resource]:
return [
# ... existing resources ...
MyNewResource(),
]
Adding New Prompts
To add new prompts:
Create a new prompt class implementing the Prompt interface
Register the prompt in the server’s prompt service
The client will automatically discover and use the new prompt
Example prompt structure:
class MyNewPrompt(Prompt):
name = "my_new_prompt"
description = "This prompt generates a custom response"
input_model = create_model(
"MyNewPromptInput",
param1=(str, Field(..., description="First parameter")),
param2=(int, Field(..., description="Second parameter")),
__base__=BasePromptInput
)
async def generate(self, input_data: BasePromptInput) -> PromptResponse:
# Access params with input_data.param1, input_data.param2
result = f"Generated response for {input_data.param1} with {input_data.param2}"
return PromptResponse.from_text(result)
Then register the prompt in the server:
def get_available_prompts() -> List[Prompt]:
return [
# ... existing prompts ...
MyNewPrompt(),
]