Skip to content

How to Integrate Model Context Protocol (MCP) with AstrBot

Problem

When I tried to integrate multiple tools with AstrBot, I ran into a common problem: every tool had a different integration pattern. Some used REST APIs, others used webhooks, and a few needed custom protocols. Managing all these different interfaces became a nightmare.

tool-integration-chaos
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Weather API │ │ Database │ │ File System │
│ (REST) │ │ (SQL) │ │ (Local I/O) │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────┐
│ AstrBot (Custom Adapters) │
│ - WeatherAdapter │
│ - DatabaseAdapter │
│ - FileSystemAdapter │
│ Each with different interfaces! │
└─────────────────────────────────────────────────────┘

I needed a standardized way to expose tools and resources to LLMs without writing custom adapters for everything. That’s when I discovered Model Context Protocol (MCP).

What is Model Context Protocol?

MCP is an open protocol that standardizes how LLM applications connect to external data sources and tools. Instead of building custom integrations for each tool, MCP provides a unified interface.

The protocol defines three core concepts:

  1. Tools - Executable functions that LLMs can invoke (like “get_weather” or “search_database”)
  2. Resources - Data sources that provide context (like files, database records, or API responses)
  3. Prompts - Reusable prompt templates for common interactions

MCP uses a client-server model where your application (AstrBot) acts as a client connecting to MCP servers that expose tools and resources.

mcp-architecture
┌─────────────────────────────────────────────────────────┐
│ AstrBot │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Star │ │ Agent │ │ MCP │ │
│ │ Plugins │ │ System │ │ Client │ │
│ └─────────────┘ └─────────────┘ └──────┬──────┘ │
└─────────────────────────────────────────────┼──────────┘
JSON-RPC 2.0
┌───────────────────────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Knowledge Base │ │ API Tools │ │ File System │
│ MCP Server │ │ MCP Server │ │ MCP Server │
│ │ │ │ │ │
│ - Tools │ │ - Tools │ │ - Tools │
│ - Resources │ │ - Resources │ │ - Resources │
│ - Prompts │ │ - Prompts │ │ - Prompts │
└─────────────────┘ └─────────────────┘ └─────────────────┘

Understanding MCP Core Concepts

Tools: Executable Functions

Tools are functions that LLMs can call to perform actions. Each tool has a schema defining its parameters and return type.

I started by creating a simple weather tool:

weather_tool.py
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("Weather Server", json_response=True)
@mcp.tool()
def get_weather(location: str) -> str:
"""Get current weather for a location.
Args:
location: City name or coordinates
Returns:
Weather information string
"""
# In a real implementation, call a weather API
return f"Weather for {location}: 72F, Partly cloudy"

The @mcp.tool() decorator automatically generates a JSON schema from the function signature. The docstring becomes the tool’s description, which helps the LLM understand when to use it.

Resources: Data Sources

Resources provide read-only data to LLMs. They use URI templates for identification.

knowledge_resource.py
@mcp.resource("kb://{topic}")
def knowledge_base(topic: str) -> str:
"""Retrieve knowledge base entry for a topic.
The URI template kb://{topic} matches requests like:
- kb://python
- kb://astrbot
"""
knowledge = {
"python": "Python is a high-level programming language...",
"astrbot": "AstrBot is a Python framework for building chatbots...",
}
return knowledge.get(topic, f"No entry found for {topic}")

When an LLM needs context about “python”, it requests kb://python and receives the relevant information.

Prompts: Reusable Templates

Prompts are predefined templates that users or LLMs can invoke.

review_prompt.py
@mcp.prompt()
def code_review(code: str, language: str = "python") -> str:
"""Generate a code review prompt.
Args:
code: The code to review
language: Programming language (default: python)
"""
return f"""Review this {language} code for:
- Bugs and errors
- Performance issues
- Best practices
- Security concerns
Code:
\`\`\`
{code}
\`\`\`
Provide specific, actionable feedback."""

Users can invoke this prompt with code_review("def hello(): pass") and get a structured review request.

Building an MCP Server for AstrBot

Now I’ll show how I built a complete MCP server that AstrBot can connect to.

Setting Up the Project

First, I installed the MCP Python SDK:

installation
pip install mcp

Then I created my server file:

mcp_server.py
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
import json
# Initialize the MCP server
mcp = FastMCP(
"AstrBot Integration Server",
json_response=True # Return JSON instead of text
)
# ============================================
# TOOLS
# ============================================
@mcp.tool()
def search_knowledge_base(query: str, limit: int = 5) -> str:
"""Search the knowledge base for relevant documents.
Args:
query: Search query string
limit: Maximum number of results (default: 5)
Returns:
JSON string with search results
"""
# Mock implementation - replace with actual search
results = [
{"title": "Getting Started with AstrBot", "score": 0.95},
{"title": "AstrBot Plugin Development", "score": 0.87},
][:limit]
return json.dumps(results, indent=2)
class DatabaseQuery(BaseModel):
"""Validated database query parameters."""
table: str = Field(description="Table name to query")
columns: list[str] = Field(default=["*"], description="Columns to select")
where: str | None = Field(default=None, description="WHERE clause")
@mcp.tool()
def query_database(params: DatabaseQuery) -> str:
"""Execute a database query.
Args:
params: Validated query parameters
Returns:
Query results as JSON
"""
# Mock implementation
return json.dumps({
"table": params.table,
"columns": params.columns,
"results": [
{"id": 1, "name": "Item 1"},
{"id": 2, "name": "Item 2"},
]
})
# ============================================
# RESOURCES
# ============================================
@mcp.resource("config://{section}")
def get_config_section(section: str) -> str:
"""Retrieve configuration section.
URI examples:
- config://database
- config://api_keys
"""
config = {
"database": {"host": "localhost", "port": 5432},
"api_keys": {"weather": "xxx", "maps": "yyy"},
}
return json.dumps(config.get(section, {}), indent=2)
@mcp.resource("file://{path:path}")
def read_file(path: str) -> str:
"""Read file contents from allowed directories.
Args:
path: File path (relative to allowed root)
"""
# Security: only allow specific directories
import os
allowed_roots = ["/data/documents", "/data/logs"]
# Validate path doesn't escape allowed roots
full_path = os.path.normpath(path)
if not any(full_path.startswith(root) for root in allowed_roots):
return "Error: Access denied - path outside allowed directories"
try:
with open(full_path, 'r') as f:
return f.read()
except FileNotFoundError:
return f"Error: File not found: {path}"
# ============================================
# PROMPTS
# ============================================
@mcp.prompt()
def analyze_data(data: str, analysis_type: str = "summary") -> str:
"""Generate data analysis prompt.
Args:
data: Data to analyze (CSV, JSON, or text)
analysis_type: Type of analysis (summary, trends, anomalies)
"""
templates = {
"summary": "Provide a summary of this data:\n\n{data}",
"trends": "Identify trends and patterns in this data:\n\n{data}",
"anomalies": "Find anomalies or outliers in this data:\n\n{data}",
}
template = templates.get(analysis_type, templates["summary"])
return template.format(data=data)
# ============================================
# RUN SERVER
# ============================================
if __name__ == "__main__":
# Run with streamable HTTP transport
mcp.run(transport="streamable-http")

This server exposes:

  • 2 tools: search_knowledge_base and query_database
  • 2 resources: config://{section} and file://{path:path}
  • 1 prompt: analyze_data

Transport Options

MCP supports three transport mechanisms:

TransportUse CaseProsCons
stdioLocal processesSimple, no networkOnly same-machine
sseWeb applicationsWorks over HTTPMore complex
streamable-httpProductionFull HTTP supportRequires HTTP server

For AstrBot, I found streamable-http works best for production deployments.

Connecting AstrBot to MCP Servers

Now I’ll show how to connect AstrBot to MCP servers as a client.

Creating an MCP Client Plugin

main.py
from astrbot.api.event import filter, AstrMessageEvent
from astrbot.api.star import Context, Star, register
from mcp import ClientSession
from mcp.client.stdio import stdio_client, StdioServerParameters
from mcp.client.streamable_http import streamablehttp_client
import asyncio
@register("mcp_integration", "Your Name", "MCP integration for AstrBot", "1.0.0", "https://github.com/your/repo")
class MCPIntegrationPlugin(Star):
def __init__(self, context: Context):
super().__init__(context)
self.mcp_sessions: dict[str, ClientSession] = {}
async def _connect_stdio_server(self, name: str, command: str, args: list[str]) -> ClientSession:
"""Connect to an MCP server via stdio transport."""
server_params = StdioServerParameters(
command=command,
args=args,
env=None
)
read_stream, write_stream = await stdio_client(server_params).__aenter__()
session = ClientSession(read_stream, write_stream)
await session.__aenter__()
await session.initialize()
self.mcp_sessions[name] = session
return session
async def _connect_http_server(self, name: str, url: str) -> ClientSession:
"""Connect to an MCP server via HTTP transport."""
read_stream, write_stream = await streamablehttp_client(url).__aenter__()
session = ClientSession(read_stream, write_stream)
await session.__aenter__()
await session.initialize()
self.mcp_sessions[name] = session
return session
@filter.command("mcp_connect")
async def connect_server(self, event: AstrMessageEvent):
"""Connect to an MCP server.
Usage: /mcp_connect <name> <url_or_command>
"""
parts = event.get_plain_text().split(maxsplit=2)
if len(parts) < 3:
yield event.plain_result("Usage: /mcp_connect <name> <url>")
return
name = parts[1]
url = parts[2]
try:
if url.startswith("http"):
session = await self._connect_http_server(name, url)
else:
# Treat as command for stdio
session = await self._connect_stdio_server(name, url, [])
# List available capabilities
tools = await session.list_tools()
resources = await session.list_resources()
prompts = await session.list_prompts()
result = f"Connected to {name}:\n"
result += f"- Tools: {len(tools.tools)}\n"
result += f"- Resources: {len(resources.resources)}\n"
result += f"- Prompts: {len(prompts.prompts)}"
yield event.plain_result(result)
except Exception as e:
yield event.plain_result(f"Failed to connect: {str(e)}")
@filter.command("mcp_tools")
async def list_tools(self, event: AstrMessageEvent):
"""List available tools from connected MCP servers."""
if not self.mcp_sessions:
yield event.plain_result("No MCP servers connected. Use /mcp_connect first.")
return
result = "Available MCP Tools:\n\n"
for name, session in self.mcp_sessions.items():
tools = await session.list_tools()
for tool in tools.tools:
result += f"[{name}] {tool.name}\n"
result += f" {tool.description}\n\n"
yield event.plain_result(result)
@filter.command("mcp_call")
async def call_tool(self, event: AstrMessageEvent):
"""Call an MCP tool.
Usage: /mcp_call <server> <tool> [args_json]
"""
parts = event.get_plain_text().split(maxsplit=3)
if len(parts) < 3:
yield event.plain_result("Usage: /mcp_call <server> <tool> [args_json]")
return
server_name = parts[1]
tool_name = parts[2]
args_json = parts[3] if len(parts) > 3 else "{}"
if server_name not in self.mcp_sessions:
yield event.plain_result(f"Server '{server_name}' not connected")
return
session = self.mcp_sessions[server_name]
import json
args = json.loads(args_json)
try:
result = await session.call_tool(tool_name, args)
yield event.plain_result(result.content[0].text)
except Exception as e:
yield event.plain_result(f"Tool call failed: {str(e)}")
@filter.command("mcp_resource")
async def read_resource(self, event: AstrMessageEvent):
"""Read an MCP resource.
Usage: /mcp_resource <server> <uri>
"""
parts = event.get_plain_text().split(maxsplit=2)
if len(parts) < 3:
yield event.plain_result("Usage: /mcp_resource <server> <uri>")
return
server_name = parts[1]
uri = parts[2]
if server_name not in self.mcp_sessions:
yield event.plain_result(f"Server '{server_name}' not connected")
return
session = self.mcp_sessions[server_name]
try:
result = await session.read_resource(uri)
yield event.plain_result(result.contents[0].text)
except Exception as e:
yield event.plain_result(f"Resource read failed: {str(e)}")

Configuration File

For production use, I created a configuration file:

config/mcp_servers.yaml
mcp_servers:
- name: "knowledge_base"
command: "python"
args: ["-m", "mcp_kb_server"]
transport: "stdio"
env:
KB_PATH: "/data/knowledge_base"
- name: "api_tools"
url: "http://localhost:3000/mcp"
transport: "http"
auth:
type: "bearer"
token: "${MCP_API_TOKEN}"
- name: "file_system"
command: "python"
args: ["mcp_file_server.py"]
transport: "stdio"
allowed_paths:
- "/data/documents"
- "/data/logs"

Tool Invocation Flow

Here’s how tool invocation works end-to-end:

tool-invocation-flow
User Message AstrBot MCP Server
│ │ │
│ "What's the weather?" │ │
├─────────────────────────────►│ │
│ │ │
│ │ LLM decides to call tool │
│ │ get_weather("Beijing") │
│ ├──────────────────────────►│
│ │ │
│ │ │ Execute tool
│ │ │ Return result
│ │◄──────────────────────────┤
│ │ │
│ │ LLM generates response │
│ │ using tool result │
│◄─────────────────────────────┤ │
│ "Weather in Beijing: 72F" │ │

Practical Example: Knowledge Base Integration

I built a knowledge base MCP server that AstrBot can query:

kb_mcp_server.py
from mcp.server.fastmcp import FastMCP
import os
import json
mcp = FastMCP("Knowledge Base Server", json_response=True)
KB_PATH = os.environ.get("KB_PATH", "/data/knowledge_base")
@mcp.tool()
def search(query: str, limit: int = 5) -> str:
"""Search the knowledge base.
Args:
query: Search query
limit: Maximum results
"""
results = []
# Simple file-based search
for filename in os.listdir(KB_PATH):
if filename.endswith(".md"):
filepath = os.path.join(KB_PATH, filename)
with open(filepath, 'r') as f:
content = f.read()
if query.lower() in content.lower():
results.append({
"file": filename,
"preview": content[:200] + "..."
})
if len(results) >= limit:
break
return json.dumps(results, indent=2)
@mcp.resource("kb://{document_id}")
def get_document(document_id: str) -> str:
"""Retrieve a specific document.
URI: kb://getting-started
"""
filepath = os.path.join(KB_PATH, f"{document_id}.md")
if not os.path.exists(filepath):
return json.dumps({"error": f"Document not found: {document_id}"})
with open(filepath, 'r') as f:
return f.read()
@mcp.resource("kb://list")
def list_documents() -> str:
"""List all available documents."""
documents = []
for filename in os.listdir(KB_PATH):
if filename.endswith(".md"):
documents.append(filename[:-3]) # Remove .md extension
return json.dumps(documents, indent=2)
@mcp.prompt()
def summarize_document(document_id: str) -> str:
"""Generate a prompt to summarize a document.
Args:
document_id: Document to summarize
"""
return f"""Summarize the following document in bullet points.
Focus on key concepts and actionable information.
Document: {document_id}
"""
if __name__ == "__main__":
mcp.run(transport="stdio")

When I tested this:

test-output
user@host:~$ /mcp_connect kb python kb_mcp_server.py
Connected to kb:
- Tools: 1
- Resources: 2
- Prompts: 1
user@host:~$ /mcp_call kb search '{"query": "astrbot"}'
[
{
"file": "getting-started.md",
"preview": "# Getting Started with AstrBot\n\nAstrBot is a..."
}
]
user@host:~$ /mcp_resource kb kb://getting-started
# Getting Started with AstrBot
AstrBot is a Python framework for building chatbots...

Advanced Patterns

Capability Negotiation

MCP servers and clients negotiate capabilities during initialization:

capability-check.py
# After connecting, check server capabilities
async def check_capabilities(session: ClientSession):
# Server announces capabilities during initialization
capabilities = session.server_capabilities
print(f"Supports tools: {capabilities.tools is not None}")
print(f"Supports resources: {capabilities.resources is not None}")
print(f"Supports prompts: {capabilities.prompts is not None}")
# Check specific features
if capabilities.resources:
print(f"Supports subscribe: {capabilities.resources.subscribe}")

Authentication

For HTTP transport, I added authentication:

auth-config.py
from mcp.client.streamable_http import streamablehttp_client
from mcp import ClientSession
async def connect_with_auth(url: str, token: str):
headers = {
"Authorization": f"Bearer {token}"
}
read_stream, write_stream = await streamablehttp_client(
url,
headers=headers
).__aenter__()
session = ClientSession(read_stream, write_stream)
await session.__aenter__()
await session.initialize()
return session

Error Handling

MCP operations can fail. I implemented retry logic:

error-handling.py
import asyncio
from mcp import McpError
async def call_tool_with_retry(
session: ClientSession,
tool_name: str,
args: dict,
max_retries: int = 3
):
for attempt in range(max_retries):
try:
result = await session.call_tool(tool_name, args)
return result
except McpError as e:
if e.error.code == -32603: # Internal error
if attempt < max_retries - 1:
await asyncio.sleep(1 * (attempt + 1))
continue
raise
raise Exception(f"Tool call failed after {max_retries} retries")

Troubleshooting Common Issues

Connection Failures

When I first tried to connect, I got this error:

connection-error
Error: Connection refused to MCP server at localhost:3000

The server wasn’t running. I fixed it by starting the server first:

fix-connection
python mcp_server.py &

Tool Invocation Errors

I got an error when calling a tool with wrong parameters:

tool-error
McpError: Tool parameter validation failed
Expected: {"location": "string"}
Received: {"city": "Beijing"}

The parameter name was wrong. I checked the tool schema:

check-schema.py
tools = await session.list_tools()
for tool in tools.tools:
print(f"Tool: {tool.name}")
print(f"Schema: {tool.inputSchema}")

Resource Access Denied

When reading files, I got:

access-denied
Error: Access denied - path outside allowed directories

The file path validation was too strict. I updated the allowed paths in the server configuration.

Summary

In this post, I showed how to integrate Model Context Protocol (MCP) with AstrBot. MCP provides a standardized way to expose tools, resources, and prompts to LLMs through a unified interface.

Key points:

  1. MCP uses three core concepts: tools (functions), resources (data), and prompts (templates)
  2. Client-server architecture: AstrBot connects to MCP servers via JSON-RPC 2.0
  3. Multiple transports: stdio for local, HTTP for network communication
  4. Schema validation: Tools use JSON Schema for parameter validation
  5. URI templates: Resources use URIs like kb://{topic} for identification

The standardization MCP provides means I can write one integration and use it with any MCP-compatible LLM application, not just AstrBot. This reduces vendor lock-in and increases code reusability.

Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments