MCP LLM Bridge:连接Model Context Protocol与OpenAI兼容LLM的桥梁

在这里插入图片描述

在人工智能快速发展的今天,大型语言模型(LLM)的应用场景越来越广泛,而不同模型之间的协议差异也带来了集成和使用上的挑战。本文将详细介绍MCP LLM Bridge,这是一个连接Model Context Protocol (MCP)服务器与OpenAI兼容LLM的桥接工具,它使得任何兼容OpenAI API的语言模型都能够通过标准化接口利用MCP兼容工具,无论是使用云端模型还是本地部署的实现(如Ollama)。

什么是MCP LLM Bridge?

MCP LLM Bridge是一个双向协议转换层,它在MCP和OpenAI的函数调用接口之间建立了桥梁。它将MCP工具规范转换为OpenAI函数模式,并处理函数调用回到MCP工具执行的映射。这种设计使得开发者可以在不同的模型和工具生态系统之间无缝切换,极大地提高了开发效率和系统灵活性。

核心功能与架构

MCP LLM Bridge的核心功能包括:

  1. 协议转换:在MCP和OpenAI API之间进行双向协议转换
  2. 工具注册与发现:自动发现MCP服务器提供的工具并注册到LLM客户端
  3. 函数调用处理:处理LLM的函数调用并映射到相应的MCP工具执行
  4. 多模型支持:支持OpenAI官方API以及兼容OpenAI API的本地模型(如Ollama)
  5. 数据库工具集成:内置数据库查询工具,方便进行数据操作

架构概览

MCP LLM Bridge采用模块化设计,主要包含以下组件:

  1. MCPLLMBridge:核心桥接类,负责协议转换和消息处理
  2. MCPClient:与MCP服务器通信的客户端
  3. LLMClient:与OpenAI兼容LLM通信的客户端
  4. DatabaseQueryTool:内置的数据库查询工具
  5. BridgeManager:管理桥接生命周期的管理器类

下面我们通过代码示例来详细了解MCP LLM Bridge的实现和使用方法。

安装与配置

首先,让我们看看如何安装和配置MCP LLM Bridge:

# 安装uv包管理工具
curl -LsSf https://astral.sh/uv/install.sh | sh

# 克隆仓库
git clone https://github.com/bartolli/mcp-llm-bridge.git
cd mcp-llm-bridge

# 创建并激活虚拟环境
uv venv
source .venv/bin/activate

# 安装依赖
uv pip install -e .

# 创建测试数据库
python -m mcp_llm_bridge.create_test_db

接下来,创建.env文件配置API密钥:

OPENAI_API_KEY=your_key
OPENAI_MODEL=gpt-4o  # 或其他支持工具的OpenAI模型

注意:如果需要重新激活环境以使用.env中的密钥,请执行:source .venv/bin/activate

核心组件详解

1. 桥接器(Bridge)

桥接器是MCP LLM Bridge的核心组件,负责在MCP和OpenAI API之间进行协议转换。下面是带有详细中文注释的核心代码:

class MCPLLMBridge:
    """MCP与LLM之间的桥接器"""
    
    def __init__(self, config: BridgeConfig):
        # 初始化配置
        self.config = config
        # 创建MCP客户端,用于与MCP服务器通信
        self.mcp_client = MCPClient(config.mcp_server_params)
        # 创建LLM客户端,用于与OpenAI兼容的LLM通信
        self.llm_client = LLMClient(config.llm_config)
        # 初始化数据库查询工具
        self.query_tool = DatabaseQueryTool("test.db")
        
        # 组合系统提示词与数据库模式信息
        schema_prompt = f"""
        可用的数据库模式:
        {self.query_tool.get_schema_description()}
        
        查询数据库时:
        1. 使用模式中指定的精确列名
        2. 确保查询是有效的SQL
        3. 数据库是SQLite,请使用兼容的语法
        """
        
        # 设置系统提示词
        if config.system_prompt:
            self.llm_client.system_prompt = f"{config.system_prompt}\n\n{schema_prompt}"
        else:
            self.llm_client.system_prompt = schema_prompt
            
        # 初始化可用工具列表和工具名称映射
        self.available_tools: List[Any] = []
        self.tool_name_mapping: Dict[str, str] = {}  # 映射OpenAI工具名到MCP工具名
    
    async def initialize(self):
        """初始化客户端并设置工具"""
        try:
            # 连接MCP客户端
            await self.mcp_client.connect()
            
            # 获取MCP可用工具并添加数据库工具
            mcp_tools = await self.mcp_client.get_available_tools()
            if hasattr(mcp_tools, 'tools'):
                self.available_tools = [*mcp_tools.tools, self.query_tool.get_tool_spec()]
            else:
                self.available_tools = [*mcp_tools, self.query_tool.get_tool_spec()]
                
            # 转换并注册工具到LLM客户端
            converted_tools = self._convert_mcp_tools_to_openai_format(self.available_tools)
            self.llm_client.tools = converted_tools
            return True
        except Exception as e:
            logger.error(f"桥接器初始化失败: {str(e)}", exc_info=True)
            return False
    
    def _convert_mcp_tools_to_openai_format(self, mcp_tools: List[Any]) -> List[Dict[str, Any]]:
        """将MCP工具格式转换为OpenAI工具格式"""
        openai_tools = []
        
        # 提取工具列表
        if hasattr(mcp_tools, 'tools'):
            tools_list = mcp_tools.tools
        elif isinstance(mcp_tools, dict):
            tools_list = mcp_tools.get('tools', [])
        else:
            tools_list = mcp_tools
            
        # 处理每个工具
        if isinstance(tools_list, list):
            for tool in tools_list:
                if hasattr(tool, 'name') and hasattr(tool, 'description'):
                    # 规范化工具名称
                    openai_name = self._sanitize_tool_name(tool.name)
                    self.tool_name_mapping[openai_name] = tool.name
                    
                    # 获取工具输入模式
                    tool_schema = getattr(tool, 'inputSchema', {
                        "type": "object",
                        "properties": {},
                        "required": []
                    })
                    
                    # 创建OpenAI格式的工具定义
                    openai_tool = {
                        "type": "function",
                        "function": {
                            "name": openai_name,
                            "description": tool.description,
                            "parameters": tool_schema
                        }
                    }
                    openai_tools.append(openai_tool)
                    
        return openai_tools
    
    def _sanitize_tool_name(self, name: str) -> str:
        """规范化工具名称以兼容OpenAI"""
        # 替换可能导致问题的字符
        return name.replace("-", "_").replace(" ", "_").lower()
    
    async def process_message(self, message: str) -> str:
        """处理用户消息"""
        try:
            # 发送消息到LLM
            response = await self.llm_client.invoke_with_prompt(message)
            
            # 持续处理工具调用直到获得最终响应
            while response.is_tool_call:
                if not response.tool_calls:
                    break
                    
                # 处理工具调用
                tool_responses = await self._handle_tool_calls(response.tool_calls)
                
                # 继续与工具结果的对话
                response = await self.llm_client.invoke(tool_responses)
                
            return response.content
        except Exception as e:
            logger.error(f"处理消息时出错: {str(e)}", exc_info=True)
            return f"处理消息时出错: {str(e)}"
    
    async def _handle_tool_calls(self, tool_calls: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """通过MCP处理工具调用"""
        tool_responses = []
        
        for tool_call in tool_calls:
            try:
                # 获取原始MCP工具名称
                openai_name = tool_call.function.name
                mcp_name = self.tool_name_mapping.get(openai_name)
                if not mcp_name:
                    raise ValueError(f"未知工具: {openai_name}")
                
                # 解析参数
                arguments = json.loads(tool_call.function.arguments)
                
                # 通过MCP执行
                result = await self.mcp_client.call_tool(mcp_name, arguments)
                
                # 格式化响应 - 处理字符串和结构化结果
                if isinstance(result, str):
                    output = result
                elif hasattr(result, 'content') and isinstance(result.content, list):
                    # 处理MCP CallToolResult格式
                    output = " ".join(
                        content.text for content in result.content if hasattr(content, 'text')
                    )
                else:
                    output = str(result)
                
                # 格式化响应
                tool_responses.append({
                    "tool_call_id": tool_call.id,
                    "output": output
                })
            except Exception as e:
                logger.error(f"工具执行失败: {str(e)}", exc_info=True)
                tool_responses.append({
                    "tool_call_id": tool_call.id,
                    "output": f"错误: {str(e)}"
                })
                
        return tool_responses
    
    async def close(self):
        """清理资源"""
        await self.mcp_client.__aexit__(None, None, None)

2. LLM客户端

LLM客户端负责与OpenAI兼容的语言模型通信,处理消息发送和工具调用响应:

class LLMResponse:
    """标准化响应格式,专注于工具处理"""
    
    def __init__(self, completion: Any):
        # 保存原始完成对象
        self.completion = completion
        self.choice = completion.choices[0]
        self.message = self.choice.message
        self.stop_reason = self.choice.finish_reason
        # 判断是否为工具调用
        self.is_tool_call = self.stop_reason == "tool_calls"
        
        # 格式化内容以兼容桥接器
        self.content = self.message.content if self.message.content is not None else ""
        self.tool_calls = self.message.tool_calls if hasattr(self.message, "tool_calls") else None
    
    def get_message(self) -> Dict[str, Any]:
        """获取标准化消息格式"""
        return {
            "role": "assistant",
            "content": self.content,
            "tool_calls": self.tool_calls
        }

class LLMClient:
    """与OpenAI兼容LLM交互的客户端"""
    
    def __init__(self, config: LLMConfig):
        # 初始化配置
        self.config = config
        # 创建OpenAI客户端
        self.client = openai.OpenAI(
            api_key=config.api_key,
            base_url=config.base_url
        )
        self.tools = []
        self.messages = []
        self.system_prompt = None
    
    def _prepare_messages(self) -> List[Dict[str, Any]]:
        """准备API调用的消息"""
        formatted_messages = []
        # 添加系统提示(如果有)
        if self.system_prompt:
            formatted_messages.append({
                "role": "system",
                "content": self.system_prompt
            })
        # 添加对话历史
        formatted_messages.extend(self.messages)
        return formatted_messages
    
    async def invoke_with_prompt(self, prompt: str) -> LLMResponse:
        """向LLM发送单个提示"""
        # 添加用户消息
        self.messages.append({
            "role": "user",
            "content": prompt
        })
        # 调用LLM
        return await self.invoke([])
    
    async def invoke(self, tool_results: Optional[List[Dict[str, Any]]] = None) -> LLMResponse:
        """调用LLM,可选附带工具结果"""
        # 处理工具结果(如果有)
        if tool_results:
            for result in tool_results:
                self.messages.append({
                    "role": "tool",
                    "content": str(result.get("output", "")),  # 转换为字符串并提供默认值
                    "tool_call_id": result["tool_call_id"]
                })
        
        # 创建聊天完成
        completion = self.client.chat.completions.create(
            model=self.config.model,
            messages=self._prepare_messages(),
            tools=self.tools if self.tools else None,
            temperature=self.config.temperature,
            max_tokens=self.config.max_tokens
        )
        
        # 创建标准化响应
        response = LLMResponse(completion)
        # 将助手响应添加到消息历史
        self.messages.append(response.get_message())
        return response

3. 数据库查询工具

MCP LLM Bridge内置了一个数据库查询工具,用于执行SQL查询:

@dataclass
class DatabaseSchema:
    """表示数据库表的模式"""
    table_name: str
    columns: Dict[str, str]
    description: str

class DatabaseQueryTool:
    """用于执行数据库查询的工具,带有模式验证"""
    
    def __init__(self, db_path: str):
        # 数据库路径
        self.db_path = db_path
        self.logger = logging.getLogger(__name__)
        # 存储表模式
        self.schemas: Dict[str, DatabaseSchema] = {}
        
        # 注册默认产品模式
        self.register_schema(DatabaseSchema(
            table_name="products",
            columns={
                "id": "INTEGER",
                "title": "TEXT",
                "description": "TEXT",
                "price": "REAL",
                "category": "TEXT",
                "stock": "INTEGER",
                "created_at": "DATETIME"
            },
            description="产品目录,包含销售的商品"
        ))
    
    def register_schema(self, schema: DatabaseSchema):
        """注册数据库模式"""
        self.schemas[schema.table_name] = schema
    
    def get_tool_spec(self) -> Dict[str, Any]:
        """获取MCP格式的工具规范"""
        # 生成模式描述
        schema_desc = "\n".join([
            f"表 {schema.table_name}: {schema.description}\n"
            f"列: {', '.join(f'{name} ({type_})' for name, type_ in schema.columns.items())}"
            for schema in self.schemas.values()
        ])
        
        # 返回工具规范
        return {
            "name": "query_database",
            "description": f"对数据库执行SQL查询。可用模式:\n{schema_desc}",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "要执行的SQL查询"
                    }
                },
                "required": ["query"]
            }
        }
    
    def get_schema_description(self) -> str:
        """获取所有注册模式的格式化描述"""
        schema_parts = []
        for schema in self.schemas.values():
            column_info = []
            for name, type_ in schema.columns.items():
                column_info.append(f" - {name} ({type_})")
            schema_parts.append(f"表 {schema.table_name}: {schema.description}\n" + 
                               "\n".join(column_info))
        return "\n\n".join(schema_parts)
    
    def validate_query(self, query: str) -> bool:
        """根据注册模式验证查询"""
        query = query.lower()
        for schema in self.schemas.values():
            if schema.table_name in query:
                # 检查查询是否引用任何不存在的列
                for word in query.split():
                    if '.' in word:
                        table, column = word.split('.')
                        if table == schema.table_name and column not in schema.columns:
                            return False
                return True
        return True
    
    async def execute(self, params: Dict[str, Any]) -> List[Dict[str, Any]]:
        """执行SQL查询并返回结果"""
        query = params.get("query")
        if not query:
            raise ValueError("需要查询参数")
            
        # 验证查询
        if not self.validate_query(query):
            raise ValueError("查询引用了无效的列")
            
        # 连接数据库并执行查询
        conn = sqlite3.connect(self.db_path)
        try:
            cursor = conn.cursor()
            cursor.execute(query)
            columns = [description[0] for description in cursor.description]
            results = []
            for row in cursor.fetchall():
                results.append(dict(zip(columns, row)))
            return results
        finally:
            conn.close()

4. 主程序

主程序负责配置和启动桥接器:

async def main():
    # 加载环境变量
    load_dotenv()
    
    # 获取项目根目录(test.db所在位置)
    project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
    db_path = os.path.join(project_root, "test.db")
    
    # 配置桥接器
    config = BridgeConfig(
        # MCP服务器参数
        mcp_server_params=StdioServerParameters(
            command="uvx",
            args=["mcp-server-sqlite", "--db-path", db_path],
            env=None
        ),
        
        # 使用OpenAI API的配置
        # llm_config=LLMConfig(
        #     api_key=os.getenv("OPENAI_API_KEY"),
        #     model=os.getenv("OPENAI_MODEL", "gpt-4o"),
        #     base_url=None
        # ),
        
        # 使用本地模型的配置
        llm_config=LLMConfig(
            api_key="ollama",  # 本地测试可以是任何字符串
            model="mistral-nemo:12b-instruct-2407-q8_0",
            base_url="http://localhost:11434/v1"  # 指向本地模型的端点
        ),
        
        # 系统提示词
        system_prompt="您是一个有用的助手,可以使用工具来帮助回答问题。"
    )
    
    logger.info(f"启动桥接器,使用模型: {config.llm_config.model}")
    logger.info(f"使用数据库: {db_path}")
    
    # 使用上下文管理器的桥接器
    async with BridgeManager(config) as bridge:
        while True:
            try:
                user_input = input("\n输入您的提示(或输入'quit'退出): ")
                if user_input.lower() in ['quit', 'exit', 'q']:
                    break
                    
                # 处理用户消息
                response = await bridge.process_message(user_input)
                print(f"\n响应: {response}")
            except KeyboardInterrupt:
                logger.info("\n退出...")
                break
            except Exception as e:
                logger.error(f"\n发生错误: {e}")

if __name__ == "__main__":
    asyncio.run(main())

使用场景与示例

场景1:使用OpenAI API

以下是使用OpenAI API的配置示例:

# 使用OpenAI API的配置
llm_config=LLMConfig(
    api_key=os.getenv("OPENAI_API_KEY"),
    model=os.getenv("OPENAI_MODEL", "gpt-4o"),
    base_url=None
)

场景2:使用本地Ollama模型

以下是使用本地Ollama模型的配置示例:

# 使用本地Ollama模型的配置
llm_config=LLMConfig(
    api_key="not-needed",  # 本地模型不需要API密钥
    model="mistral-nemo:12b-instruct-2407-q8_0",
    base_url="http://localhost:11434/v1"  # Ollama API端点
)

场景3:使用其他本地模型

以下是使用其他本地模型的配置示例:

# 使用其他本地模型的配置
llm_config=LLMConfig(
    api_key="not-needed",
    model="local-model",
    base_url="http://localhost:1234/v1"  # 本地模型API端点
)

示例查询

以下是一些示例查询,展示了MCP LLM Bridge的功能:

  1. 查询数据库中最贵的产品

    "数据库中最贵的产品是什么?"
    
  2. 按类别查询产品

    "列出电子类别中的所有产品"
    
  3. 查询库存情况

    "哪些产品库存不足10件?"
    

测试与验证

MCP LLM Bridge提供了测试功能,可以验证桥接器的正确性:

# 安装测试依赖
uv pip install -e ".[test]"

# 运行测试
python -m pytest -v tests/

高级功能

自定义工具注册

您可以扩展DatabaseQueryTool类,注册自定义的数据库模式:

# 创建自定义数据库工具
db_tool = DatabaseQueryTool("my_database.db")

# 注册用户表模式
db_tool.register_schema(DatabaseSchema(
    table_name="users",
    columns={
        "id": "INTEGER",
        "username": "TEXT",
        "email": "TEXT",
        "created_at": "DATETIME"
    },
    description="用户账户信息表"
))

# 注册订单表模式
db_tool.register_schema(DatabaseSchema(
    table_name="orders",
    columns={
        "id": "INTEGER",
        "user_id": "INTEGER",
        "product_id": "INTEGER",
        "quantity": "INTEGER",
        "total_price": "REAL",
        "order_date": "DATETIME"
    },
    description="用户订单记录表"
))

自定义系统提示词

您可以自定义系统提示词,以便更好地控制LLM的行为:

# 自定义系统提示词
system_prompt = """
您是一个专门的数据分析助手,可以帮助用户查询和分析数据库中的信息。
在回答问题时,请遵循以下准则:
1. 优先使用SQL查询获取数据
2. 提供清晰的数据解释和见解
3. 如果数据不足,建议用户如何获取更多信息
4. 对于复杂查询,解释SQL语句的逻辑
"""

# 在配置中使用自定义提示词
config = BridgeConfig(
    mcp_server_params=...,
    llm_config=...,
    system_prompt=system_prompt
)

结论

MCP LLM Bridge为开发者提供了一个强大的工具,使得在MCP和OpenAI兼容LLM之间建立无缝连接成为可能。通过这个桥接器,开发者可以利用各种LLM的能力,同时保持工具和接口的一致性。无论是使用云端的OpenAI模型,还是本地部署的Ollama模型,MCP LLM Bridge都能提供统一的接口和体验。

这个项目展示了协议转换和互操作性在AI工具生态系统中的重要性,为未来更多模型和工具的集成提供了可能性。随着AI技术的不断发展,像MCP LLM Bridge这样的桥接工具将变得越来越重要,帮助开发者充分利用不同模型和工具的优势。

参考资源

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐