mysql_mcp_server quickstart
这个代码实现了一个完整的MySQL MCP服务器,它允许AI模型通过标准化的MCP协议与MySQL数据库交互。列出数据库中的表作为资源读取表的内容执行任意SQL查询。
·
docker启动mysql服务
mysql服务,用于mcp server连接
docker run --name mysql-mcp-new -e MYSQL_ROOT_PASSWORD=123456 -e MYSQL_DATABASE=test_db -p 13306:3306 -d mysql:8.0
环境安装
git clone https://github.com/designcomputer/mysql_mcp_server.git
uv venv
source .venv/bin/activate
uv pip install -r requirements.txt
完整代码
import asyncio
import logging
import os
import sys
from mysql.connector import connect, Error
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
from pydantic import AnyUrl
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mysql_mcp_server")
def get_db_config():
"""Get database configuration from environment variables."""
config = {
"host": os.getenv("MYSQL_HOST", "localhost"),
"port": int(os.getenv("MYSQL_PORT", "3306")),
"user": os.getenv("MYSQL_USER"),
"password": os.getenv("MYSQL_PASSWORD"),
"database": os.getenv("MYSQL_DATABASE"),
# Add charset and collation to avoid utf8mb4_0900_ai_ci issues with older MySQL versions
# These can be overridden via environment variables for specific MySQL versions
"charset": os.getenv("MYSQL_CHARSET", "utf8mb4"),
"collation": os.getenv("MYSQL_COLLATION", "utf8mb4_unicode_ci"),
# Disable autocommit for better transaction control
"autocommit": True,
# Set SQL mode for better compatibility - can be overridden
"sql_mode": os.getenv("MYSQL_SQL_MODE", "TRADITIONAL")
}
# Remove None values to let MySQL connector use defaults if not specified
config = {k: v for k, v in config.items() if v is not None}
if not all([config.get("user"), config.get("password"), config.get("database")]):
logger.error("Missing required database configuration. Please check environment variables:")
logger.error("MYSQL_USER, MYSQL_PASSWORD, and MYSQL_DATABASE are required")
raise ValueError("Missing required database configuration")
return config
# Initialize server
app = Server("mysql_mcp_server")
@app.list_resources()
async def list_resources() -> list[Resource]:
"""List MySQL tables as resources."""
config = get_db_config()
try:
logger.info(f"Connecting to MySQL with charset: {config.get('charset')}, collation: {config.get('collation')}")
with connect(**config) as conn:
logger.info(f"Successfully connected to MySQL server version: {conn.get_server_info()}")
with conn.cursor() as cursor:
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
logger.info(f"Found tables: {tables}")
resources = []
for table in tables:
resources.append(
Resource(
uri=f"mysql://{table[0]}/data",
name=f"Table: {table[0]}",
mimeType="text/plain",
description=f"Data in table: {table[0]}"
)
)
return resources
except Error as e:
logger.error(f"Failed to list resources: {str(e)}")
logger.error(f"Error code: {e.errno}, SQL state: {e.sqlstate}")
return []
@app.read_resource()
async def read_resource(uri: AnyUrl) -> str:
"""Read table contents."""
config = get_db_config()
uri_str = str(uri)
logger.info(f"Reading resource: {uri_str}")
if not uri_str.startswith("mysql://"):
raise ValueError(f"Invalid URI scheme: {uri_str}")
parts = uri_str[8:].split('/')
table = parts[0]
try:
logger.info(f"Connecting to MySQL with charset: {config.get('charset')}, collation: {config.get('collation')}")
with connect(**config) as conn:
logger.info(f"Successfully connected to MySQL server version: {conn.get_server_info()}")
with conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM {table} LIMIT 100")
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
result = [",".join(map(str, row)) for row in rows]
return "\n".join([",".join(columns)] + result)
except Error as e:
logger.error(f"Database error reading resource {uri}: {str(e)}")
logger.error(f"Error code: {e.errno}, SQL state: {e.sqlstate}")
raise RuntimeError(f"Database error: {str(e)}")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List available MySQL tools."""
logger.info("Listing tools...")
return [
Tool(
name="execute_sql",
description="Execute an SQL query on the MySQL server",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The SQL query to execute"
}
},
"required": ["query"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Execute SQL commands."""
config = get_db_config()
logger.info(f"Calling tool: {name} with arguments: {arguments}")
if name != "execute_sql":
raise ValueError(f"Unknown tool: {name}")
query = arguments.get("query")
if not query:
raise ValueError("Query is required")
try:
logger.info(f"Connecting to MySQL with charset: {config.get('charset')}, collation: {config.get('collation')}")
with connect(**config) as conn:
logger.info(f"Successfully connected to MySQL server version: {conn.get_server_info()}")
with conn.cursor() as cursor:
cursor.execute(query)
# Special handling for SHOW TABLES
if query.strip().upper().startswith("SHOW TABLES"):
tables = cursor.fetchall()
result = ["Tables_in_" + config["database"]] # Header
result.extend([table[0] for table in tables])
return [TextContent(type="text", text="\n".join(result))]
# Handle all other queries that return result sets (SELECT, SHOW, DESCRIBE etc.)
elif cursor.description is not None:
columns = [desc[0] for desc in cursor.description]
try:
rows = cursor.fetchall()
result = [",".join(map(str, row)) for row in rows]
return [TextContent(type="text", text="\n".join([",".join(columns)] + result))]
except Error as e:
logger.warning(f"Error fetching results: {str(e)}")
return [TextContent(type="text", text=f"Query executed but error fetching results: {str(e)}")]
# Non-SELECT queries
else:
conn.commit()
return [TextContent(type="text", text=f"Query executed successfully. Rows affected: {cursor.rowcount}")]
except Error as e:
logger.error(f"Error executing SQL '{query}': {e}")
logger.error(f"Error code: {e.errno}, SQL state: {e.sqlstate}")
return [TextContent(type="text", text=f"Error executing query: {str(e)}")]
async def main():
"""Main entry point to run the MCP server."""
from mcp.server.stdio import stdio_server
# Add additional debug output
print("Starting MySQL MCP server with config:", file=sys.stderr)
config = get_db_config()
print(f"Host: {config['host']}", file=sys.stderr)
print(f"Port: {config['port']}", file=sys.stderr)
print(f"User: {config['user']}", file=sys.stderr)
print(f"Database: {config['database']}", file=sys.stderr)
logger.info("Starting MySQL MCP server...")
logger.info(f"Database config: {config['host']}/{config['database']} as {config['user']}")
async with stdio_server() as (read_stream, write_stream):
try:
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
except Exception as e:
logger.error(f"Server error: {str(e)}", exc_info=True)
raise
if __name__ == "__main__":
asyncio.run(main())
代码解释
导入模块和配置日志
import asyncio
import logging
import os
import sys
from mysql.connector import connect, Error
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
from pydantic import AnyUrl
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mysql_mcp_server")
这部分导入了必要的模块:
asyncio
:用于异步编程mysql.connector
:MySQL数据库连接器mcp.server
和mcp.types
:MCP协议相关的类和类型- 配置了日志系统,便于调试和监控
数据库配置函数
def get_db_config():
"""从环境变量获取数据库配置。"""
config = {
"host": os.getenv("MYSQL_HOST", "localhost"),
"port": int(os.getenv("MYSQL_PORT", "3306")),
"user": os.getenv("MYSQL_USER"),
"password": os.getenv("MYSQL_PASSWORD"),
"database": os.getenv("MYSQL_DATABASE"),
# 添加字符集和排序规则以避免utf8mb4_0900_ai_ci在旧版MySQL中的问题
"charset": os.getenv("MYSQL_CHARSET", "utf8mb4"),
"collation": os.getenv("MYSQL_COLLATION", "utf8mb4_unicode_ci"),
# 启用自动提交以便更好地控制事务
"autocommit": True,
# 设置SQL模式以提高兼容性
"sql_mode": os.getenv("MYSQL_SQL_MODE", "TRADITIONAL")
}
# 移除None值,让MySQL连接器使用默认值
config = {k: v for k, v in config.items() if v is not None}
if not all([config.get("user"), config.get("password"), config.get("database")]):
logger.error("缺少必要的数据库配置。请检查环境变量:")
logger.error("MYSQL_USER, MYSQL_PASSWORD, 和 MYSQL_DATABASE 是必需的")
raise ValueError("缺少必要的数据库配置")
return config
这个函数从环境变量中读取MySQL连接配置:
- 支持配置主机、端口、用户名、密码、数据库名等
- 设置了默认的字符集和排序规则,解决兼容性问题
- 验证必要的配置项(用户名、密码、数据库名)是否存在
初始化MCP服务器
# 初始化服务器
app = Server("mysql_mcp_server")
创建了一个名为"mysql_mcp_server"的MCP服务器实例。
资源列表功能
@app.list_resources()
async def list_resources() -> list[Resource]:
"""列出MySQL表作为资源。"""
# 实现代码...
这个函数实现了MCP的资源列表功能:
- 连接到MySQL数据库
- 执行
SHOW TABLES
命令获取所有表 - 将每个表转换为MCP资源格式返回
- 每个资源包含URI、名称、MIME类型和描述
读取资源功能
@app.read_resource()
async def read_resource(uri: AnyUrl) -> str:
"""读取表内容。"""
# 实现代码...
这个函数实现了读取资源的功能:
- 解析URI获取表名
- 连接到MySQL数据库
- 执行
SELECT * FROM {table} LIMIT 100
查询 - 将结果格式化为CSV格式的字符串返回
工具列表功能
@app.list_tools()
async def list_tools() -> list[Tool]:
"""列出可用的MySQL工具。"""
# 实现代码...
这个函数定义了MCP服务器提供的工具:
- 返回一个名为
execute_sql
的工具 - 该工具允许执行SQL查询
- 定义了工具的输入模式(需要一个query参数)
工具调用功能
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""执行SQL命令。"""
# 实现代码...
这个函数处理工具调用:
- 验证工具名称是否为
execute_sql
- 从参数中获取SQL查询
- 连接到MySQL数据库并执行查询
- 根据查询类型处理结果:
- 对于
SHOW TABLES
有特殊处理 - 对于返回结果集的查询(如SELECT),格式化为CSV
- 对于非查询操作(如INSERT),返回受影响的行数
- 对于
主函数
async def main():
"""运行MCP服务器的主入口点。"""
# 实现代码...
if __name__ == "__main__":
asyncio.run(main())
主函数负责:
- 输出调试信息,显示数据库配置
- 初始化stdio服务器(标准输入输出通信)
- 启动MCP服务器
- 处理异常并记录错误
总结
这个代码实现了一个完整的MySQL MCP服务器,它允许AI模型通过标准化的MCP协议与MySQL数据库交互。服务器提供了以下功能:
- 列出数据库中的表作为资源
- 读取表的内容
- 执行任意SQL查询
inspector测试
npx @modelcontextprotocol/inspector
inspector配置
参数配置
环境变量配置
基础测试语句
SHOW TABLES;
这个语句会列出数据库中的所有表,是最基本的测试,可以验证连接是否正常。
创建表测试
CREATE TABLE test_users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
插入数据测试
INSERT INTO test_users (username, email) VALUES
('user1', 'user1@example.com'),
('user2', 'user2@example.com'),
('user3', 'user3@example.com');
查询数据测试
SELECT * FROM test_users;
更复杂的查询测试
SELECT
username,
email,
DATE_FORMAT(created_at, '%Y-%m-%d') AS registration_date
FROM
test_users
WHERE
id > 1
ORDER BY
created_at DESC;
表结构查询测试
DESCRIBE test_users;
更多推荐
所有评论(0)