How to Integrate Model Context Protocol (MCP) with AstrBot
Problem
When I tried to integrate multiple tools with AstrBot, I ran into a common problem: every tool had a different integration pattern. Some used REST APIs, others used webhooks, and a few needed custom protocols. Managing all these different interfaces became a nightmare.
┌──────────────┐ ┌──────────────┐ ┌──────────────┐│ Weather API │ │ Database │ │ File System ││ (REST) │ │ (SQL) │ │ (Local I/O) │└──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │ │ ▼ ▼ ▼┌─────────────────────────────────────────────────────┐│ AstrBot (Custom Adapters) ││ - WeatherAdapter ││ - DatabaseAdapter ││ - FileSystemAdapter ││ Each with different interfaces! │└─────────────────────────────────────────────────────┘I needed a standardized way to expose tools and resources to LLMs without writing custom adapters for everything. That’s when I discovered Model Context Protocol (MCP).
What is Model Context Protocol?
MCP is an open protocol that standardizes how LLM applications connect to external data sources and tools. Instead of building custom integrations for each tool, MCP provides a unified interface.
The protocol defines three core concepts:
- Tools - Executable functions that LLMs can invoke (like “get_weather” or “search_database”)
- Resources - Data sources that provide context (like files, database records, or API responses)
- Prompts - Reusable prompt templates for common interactions
MCP uses a client-server model where your application (AstrBot) acts as a client connecting to MCP servers that expose tools and resources.
┌─────────────────────────────────────────────────────────┐│ AstrBot ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││ │ Star │ │ Agent │ │ MCP │ ││ │ Plugins │ │ System │ │ Client │ ││ └─────────────┘ └─────────────┘ └──────┬──────┘ │└─────────────────────────────────────────────┼──────────┘ │ JSON-RPC 2.0 │ ┌───────────────────────────────┼───────────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ Knowledge Base │ │ API Tools │ │ File System │ │ MCP Server │ │ MCP Server │ │ MCP Server │ │ │ │ │ │ │ │ - Tools │ │ - Tools │ │ - Tools │ │ - Resources │ │ - Resources │ │ - Resources │ │ - Prompts │ │ - Prompts │ │ - Prompts │ └─────────────────┘ └─────────────────┘ └─────────────────┘Understanding MCP Core Concepts
Tools: Executable Functions
Tools are functions that LLMs can call to perform actions. Each tool has a schema defining its parameters and return type.
I started by creating a simple weather tool:
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("Weather Server", json_response=True)
@mcp.tool()def get_weather(location: str) -> str: """Get current weather for a location.
Args: location: City name or coordinates
Returns: Weather information string """ # In a real implementation, call a weather API return f"Weather for {location}: 72F, Partly cloudy"The @mcp.tool() decorator automatically generates a JSON schema from the function signature. The docstring becomes the tool’s description, which helps the LLM understand when to use it.
Resources: Data Sources
Resources provide read-only data to LLMs. They use URI templates for identification.
@mcp.resource("kb://{topic}")def knowledge_base(topic: str) -> str: """Retrieve knowledge base entry for a topic.
The URI template kb://{topic} matches requests like: - kb://python - kb://astrbot """ knowledge = { "python": "Python is a high-level programming language...", "astrbot": "AstrBot is a Python framework for building chatbots...", } return knowledge.get(topic, f"No entry found for {topic}")When an LLM needs context about “python”, it requests kb://python and receives the relevant information.
Prompts: Reusable Templates
Prompts are predefined templates that users or LLMs can invoke.
@mcp.prompt()def code_review(code: str, language: str = "python") -> str: """Generate a code review prompt.
Args: code: The code to review language: Programming language (default: python) """ return f"""Review this {language} code for:- Bugs and errors- Performance issues- Best practices- Security concerns
Code:\`\`\`{code}\`\`\`
Provide specific, actionable feedback."""Users can invoke this prompt with code_review("def hello(): pass") and get a structured review request.
Building an MCP Server for AstrBot
Now I’ll show how I built a complete MCP server that AstrBot can connect to.
Setting Up the Project
First, I installed the MCP Python SDK:
pip install mcpThen I created my server file:
from mcp.server.fastmcp import FastMCPfrom pydantic import BaseModel, Fieldimport json
# Initialize the MCP servermcp = FastMCP( "AstrBot Integration Server", json_response=True # Return JSON instead of text)
# ============================================# TOOLS# ============================================
@mcp.tool()def search_knowledge_base(query: str, limit: int = 5) -> str: """Search the knowledge base for relevant documents.
Args: query: Search query string limit: Maximum number of results (default: 5)
Returns: JSON string with search results """ # Mock implementation - replace with actual search results = [ {"title": "Getting Started with AstrBot", "score": 0.95}, {"title": "AstrBot Plugin Development", "score": 0.87}, ][:limit]
return json.dumps(results, indent=2)
class DatabaseQuery(BaseModel): """Validated database query parameters.""" table: str = Field(description="Table name to query") columns: list[str] = Field(default=["*"], description="Columns to select") where: str | None = Field(default=None, description="WHERE clause")
@mcp.tool()def query_database(params: DatabaseQuery) -> str: """Execute a database query.
Args: params: Validated query parameters
Returns: Query results as JSON """ # Mock implementation return json.dumps({ "table": params.table, "columns": params.columns, "results": [ {"id": 1, "name": "Item 1"}, {"id": 2, "name": "Item 2"}, ] })
# ============================================# RESOURCES# ============================================
@mcp.resource("config://{section}")def get_config_section(section: str) -> str: """Retrieve configuration section.
URI examples: - config://database - config://api_keys """ config = { "database": {"host": "localhost", "port": 5432}, "api_keys": {"weather": "xxx", "maps": "yyy"}, } return json.dumps(config.get(section, {}), indent=2)
@mcp.resource("file://{path:path}")def read_file(path: str) -> str: """Read file contents from allowed directories.
Args: path: File path (relative to allowed root) """ # Security: only allow specific directories import os
allowed_roots = ["/data/documents", "/data/logs"]
# Validate path doesn't escape allowed roots full_path = os.path.normpath(path) if not any(full_path.startswith(root) for root in allowed_roots): return "Error: Access denied - path outside allowed directories"
try: with open(full_path, 'r') as f: return f.read() except FileNotFoundError: return f"Error: File not found: {path}"
# ============================================# PROMPTS# ============================================
@mcp.prompt()def analyze_data(data: str, analysis_type: str = "summary") -> str: """Generate data analysis prompt.
Args: data: Data to analyze (CSV, JSON, or text) analysis_type: Type of analysis (summary, trends, anomalies) """ templates = { "summary": "Provide a summary of this data:\n\n{data}", "trends": "Identify trends and patterns in this data:\n\n{data}", "anomalies": "Find anomalies or outliers in this data:\n\n{data}", }
template = templates.get(analysis_type, templates["summary"]) return template.format(data=data)
# ============================================# RUN SERVER# ============================================
if __name__ == "__main__": # Run with streamable HTTP transport mcp.run(transport="streamable-http")This server exposes:
- 2 tools:
search_knowledge_baseandquery_database - 2 resources:
config://{section}andfile://{path:path} - 1 prompt:
analyze_data
Transport Options
MCP supports three transport mechanisms:
| Transport | Use Case | Pros | Cons |
|---|---|---|---|
stdio | Local processes | Simple, no network | Only same-machine |
sse | Web applications | Works over HTTP | More complex |
streamable-http | Production | Full HTTP support | Requires HTTP server |
For AstrBot, I found streamable-http works best for production deployments.
Connecting AstrBot to MCP Servers
Now I’ll show how to connect AstrBot to MCP servers as a client.
Creating an MCP Client Plugin
from astrbot.api.event import filter, AstrMessageEventfrom astrbot.api.star import Context, Star, registerfrom mcp import ClientSessionfrom mcp.client.stdio import stdio_client, StdioServerParametersfrom mcp.client.streamable_http import streamablehttp_clientimport asyncio
@register("mcp_integration", "Your Name", "MCP integration for AstrBot", "1.0.0", "https://github.com/your/repo")class MCPIntegrationPlugin(Star): def __init__(self, context: Context): super().__init__(context) self.mcp_sessions: dict[str, ClientSession] = {}
async def _connect_stdio_server(self, name: str, command: str, args: list[str]) -> ClientSession: """Connect to an MCP server via stdio transport.""" server_params = StdioServerParameters( command=command, args=args, env=None )
read_stream, write_stream = await stdio_client(server_params).__aenter__() session = ClientSession(read_stream, write_stream) await session.__aenter__() await session.initialize()
self.mcp_sessions[name] = session return session
async def _connect_http_server(self, name: str, url: str) -> ClientSession: """Connect to an MCP server via HTTP transport.""" read_stream, write_stream = await streamablehttp_client(url).__aenter__() session = ClientSession(read_stream, write_stream) await session.__aenter__() await session.initialize()
self.mcp_sessions[name] = session return session
@filter.command("mcp_connect") async def connect_server(self, event: AstrMessageEvent): """Connect to an MCP server.
Usage: /mcp_connect <name> <url_or_command> """ parts = event.get_plain_text().split(maxsplit=2) if len(parts) < 3: yield event.plain_result("Usage: /mcp_connect <name> <url>") return
name = parts[1] url = parts[2]
try: if url.startswith("http"): session = await self._connect_http_server(name, url) else: # Treat as command for stdio session = await self._connect_stdio_server(name, url, [])
# List available capabilities tools = await session.list_tools() resources = await session.list_resources() prompts = await session.list_prompts()
result = f"Connected to {name}:\n" result += f"- Tools: {len(tools.tools)}\n" result += f"- Resources: {len(resources.resources)}\n" result += f"- Prompts: {len(prompts.prompts)}"
yield event.plain_result(result) except Exception as e: yield event.plain_result(f"Failed to connect: {str(e)}")
@filter.command("mcp_tools") async def list_tools(self, event: AstrMessageEvent): """List available tools from connected MCP servers.""" if not self.mcp_sessions: yield event.plain_result("No MCP servers connected. Use /mcp_connect first.") return
result = "Available MCP Tools:\n\n" for name, session in self.mcp_sessions.items(): tools = await session.list_tools() for tool in tools.tools: result += f"[{name}] {tool.name}\n" result += f" {tool.description}\n\n"
yield event.plain_result(result)
@filter.command("mcp_call") async def call_tool(self, event: AstrMessageEvent): """Call an MCP tool.
Usage: /mcp_call <server> <tool> [args_json] """ parts = event.get_plain_text().split(maxsplit=3) if len(parts) < 3: yield event.plain_result("Usage: /mcp_call <server> <tool> [args_json]") return
server_name = parts[1] tool_name = parts[2] args_json = parts[3] if len(parts) > 3 else "{}"
if server_name not in self.mcp_sessions: yield event.plain_result(f"Server '{server_name}' not connected") return
session = self.mcp_sessions[server_name]
import json args = json.loads(args_json)
try: result = await session.call_tool(tool_name, args) yield event.plain_result(result.content[0].text) except Exception as e: yield event.plain_result(f"Tool call failed: {str(e)}")
@filter.command("mcp_resource") async def read_resource(self, event: AstrMessageEvent): """Read an MCP resource.
Usage: /mcp_resource <server> <uri> """ parts = event.get_plain_text().split(maxsplit=2) if len(parts) < 3: yield event.plain_result("Usage: /mcp_resource <server> <uri>") return
server_name = parts[1] uri = parts[2]
if server_name not in self.mcp_sessions: yield event.plain_result(f"Server '{server_name}' not connected") return
session = self.mcp_sessions[server_name]
try: result = await session.read_resource(uri) yield event.plain_result(result.contents[0].text) except Exception as e: yield event.plain_result(f"Resource read failed: {str(e)}")Configuration File
For production use, I created a configuration file:
mcp_servers: - name: "knowledge_base" command: "python" args: ["-m", "mcp_kb_server"] transport: "stdio" env: KB_PATH: "/data/knowledge_base"
- name: "api_tools" url: "http://localhost:3000/mcp" transport: "http" auth: type: "bearer" token: "${MCP_API_TOKEN}"
- name: "file_system" command: "python" args: ["mcp_file_server.py"] transport: "stdio" allowed_paths: - "/data/documents" - "/data/logs"Tool Invocation Flow
Here’s how tool invocation works end-to-end:
User Message AstrBot MCP Server │ │ │ │ "What's the weather?" │ │ ├─────────────────────────────►│ │ │ │ │ │ │ LLM decides to call tool │ │ │ get_weather("Beijing") │ │ ├──────────────────────────►│ │ │ │ │ │ │ Execute tool │ │ │ Return result │ │◄──────────────────────────┤ │ │ │ │ │ LLM generates response │ │ │ using tool result │ │◄─────────────────────────────┤ │ │ "Weather in Beijing: 72F" │ │Practical Example: Knowledge Base Integration
I built a knowledge base MCP server that AstrBot can query:
from mcp.server.fastmcp import FastMCPimport osimport json
mcp = FastMCP("Knowledge Base Server", json_response=True)
KB_PATH = os.environ.get("KB_PATH", "/data/knowledge_base")
@mcp.tool()def search(query: str, limit: int = 5) -> str: """Search the knowledge base.
Args: query: Search query limit: Maximum results """ results = []
# Simple file-based search for filename in os.listdir(KB_PATH): if filename.endswith(".md"): filepath = os.path.join(KB_PATH, filename) with open(filepath, 'r') as f: content = f.read()
if query.lower() in content.lower(): results.append({ "file": filename, "preview": content[:200] + "..." })
if len(results) >= limit: break
return json.dumps(results, indent=2)
@mcp.resource("kb://{document_id}")def get_document(document_id: str) -> str: """Retrieve a specific document.
URI: kb://getting-started """ filepath = os.path.join(KB_PATH, f"{document_id}.md")
if not os.path.exists(filepath): return json.dumps({"error": f"Document not found: {document_id}"})
with open(filepath, 'r') as f: return f.read()
@mcp.resource("kb://list")def list_documents() -> str: """List all available documents.""" documents = []
for filename in os.listdir(KB_PATH): if filename.endswith(".md"): documents.append(filename[:-3]) # Remove .md extension
return json.dumps(documents, indent=2)
@mcp.prompt()def summarize_document(document_id: str) -> str: """Generate a prompt to summarize a document.
Args: document_id: Document to summarize """ return f"""Summarize the following document in bullet points.Focus on key concepts and actionable information.
Document: {document_id}"""
if __name__ == "__main__": mcp.run(transport="stdio")When I tested this:
user@host:~$ /mcp_connect kb python kb_mcp_server.pyConnected to kb:- Tools: 1- Resources: 2- Prompts: 1
user@host:~$ /mcp_call kb search '{"query": "astrbot"}'[ { "file": "getting-started.md", "preview": "# Getting Started with AstrBot\n\nAstrBot is a..." }]
user@host:~$ /mcp_resource kb kb://getting-started# Getting Started with AstrBot
AstrBot is a Python framework for building chatbots...Advanced Patterns
Capability Negotiation
MCP servers and clients negotiate capabilities during initialization:
# After connecting, check server capabilitiesasync def check_capabilities(session: ClientSession): # Server announces capabilities during initialization capabilities = session.server_capabilities
print(f"Supports tools: {capabilities.tools is not None}") print(f"Supports resources: {capabilities.resources is not None}") print(f"Supports prompts: {capabilities.prompts is not None}")
# Check specific features if capabilities.resources: print(f"Supports subscribe: {capabilities.resources.subscribe}")Authentication
For HTTP transport, I added authentication:
from mcp.client.streamable_http import streamablehttp_clientfrom mcp import ClientSession
async def connect_with_auth(url: str, token: str): headers = { "Authorization": f"Bearer {token}" }
read_stream, write_stream = await streamablehttp_client( url, headers=headers ).__aenter__()
session = ClientSession(read_stream, write_stream) await session.__aenter__() await session.initialize()
return sessionError Handling
MCP operations can fail. I implemented retry logic:
import asynciofrom mcp import McpError
async def call_tool_with_retry( session: ClientSession, tool_name: str, args: dict, max_retries: int = 3): for attempt in range(max_retries): try: result = await session.call_tool(tool_name, args) return result except McpError as e: if e.error.code == -32603: # Internal error if attempt < max_retries - 1: await asyncio.sleep(1 * (attempt + 1)) continue raise
raise Exception(f"Tool call failed after {max_retries} retries")Troubleshooting Common Issues
Connection Failures
When I first tried to connect, I got this error:
Error: Connection refused to MCP server at localhost:3000The server wasn’t running. I fixed it by starting the server first:
python mcp_server.py &Tool Invocation Errors
I got an error when calling a tool with wrong parameters:
McpError: Tool parameter validation failed Expected: {"location": "string"} Received: {"city": "Beijing"}The parameter name was wrong. I checked the tool schema:
tools = await session.list_tools()for tool in tools.tools: print(f"Tool: {tool.name}") print(f"Schema: {tool.inputSchema}")Resource Access Denied
When reading files, I got:
Error: Access denied - path outside allowed directoriesThe file path validation was too strict. I updated the allowed paths in the server configuration.
Summary
In this post, I showed how to integrate Model Context Protocol (MCP) with AstrBot. MCP provides a standardized way to expose tools, resources, and prompts to LLMs through a unified interface.
Key points:
- MCP uses three core concepts: tools (functions), resources (data), and prompts (templates)
- Client-server architecture: AstrBot connects to MCP servers via JSON-RPC 2.0
- Multiple transports: stdio for local, HTTP for network communication
- Schema validation: Tools use JSON Schema for parameter validation
- URI templates: Resources use URIs like
kb://{topic}for identification
The standardization MCP provides means I can write one integration and use it with any MCP-compatible LLM application, not just AstrBot. This reduces vendor lock-in and increases code reusability.
Final Words + More Resources
My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me
Here are also the most important links from this article along with some further resources that will help you in this scope:
- 👨💻 Model Context Protocol Specification
- 👨💻 MCP Python SDK
- 👨💻 AstrBot Documentation
- 👨💻 Anthropic Claude Documentation
Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!
Comments