大家好呀,上文我们讨论了MCP是什么及好处,本文动手实现一个简单的MCP客户端和服务端的编写,并通过MCP Server 实现天气查询工具的调用。

一、创建 MCP 项目

1、安装UV

#使用 pip 安装 
 pip install uv
#创建项目目录
 uv init mcp-client
 cd mcp-client

在这里插入图片描述
在这里插入图片描述

# 创建虚拟环境
uv venv
# 激活虚拟环境
source .venv/bin/activate

在这里插入图片描述

二、搭建 MCP 客户端

1、安装 MCP SDK

uv add mcp

在这里插入图片描述

2、创建 MCPClient.py,代码如下:

import asyncio
import os
import json
import sys
from typing import Optional
from contextlib import AsyncExitStack
from openai import OpenAI
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

# 加载 .env 文件,确保 API Key 受到保护
load_dotenv()


class MCPClient:
    def __init__(self):
        """初始化 MCP 客户端"""
        self.exit_stack = AsyncExitStack()
        # 从环境变量读取配置
        self.openai_api_key = os.getenv("OPENAI_API_KEY")  # 读取 OpenAI API Key
        self.base_url = os.getenv("BASE_URL")  # 读取 BASE URL
        self.model = os.getenv("MODEL")  # 读取模型名称

        if not self.openai_api_key:
            raise ValueError("❌ 未找到 OpenAI API Key,请在 .env 文件中设置 OPENAI_API_KEY")

        # 初始化 OpenAI 客户端
        self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)
        self.session: Optional[ClientSession] = None

    async def connect_to_server(self, server_script_path: str):
        """连接到 MCP 服务器并列出可用工具"""
        # 检查脚本类型
        is_python = server_script_path.endswith('.py')
        is_js = server_script_path.endswith('.js')
        if not (is_python or is_js):
            raise ValueError("服务器脚本必须是 .py 或 .js 文件")

        # 根据脚本类型选择执行命令
        command = "python" if is_python else "node"
        server_params = StdioServerParameters(
            command=command,
            args=[server_script_path],
            env=None
        )

        # 启动 MCP 服务器并建立通信
        stdio_transport = await self.exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(
            ClientSession(self.stdio, self.write)
        )
        await self.session.initialize()

        # 列出 MCP 服务器上的工具
        response = await self.session.list_tools()
        tools = response.tools
        print("\n已连接到服务器,支持以下工具:", [tool.name for tool in tools])

    async def process_query(self, query: str) -> str:
        """
        使用大模型处理查询并调用可用的 MCP 工具 (Function Calling)
        """
        messages = [{"role": "user", "content": query}]
        
        # 获取可用工具列表
        response = await self.session.list_tools()
        available_tools = [{
            "type": "function",
            "function": {
                "name": tool.name,
                "description": tool.description,
                "input_schema": tool.inputSchema
            }
        } for tool in response.tools]

        # 调用 OpenAI API
        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=available_tools
        )

        # 处理返回的内容
        content = response.choices[0]
        if content.finish_reason == "tool_calls":
            # 如果需要使用工具,解析工具调用
            tool_call = content.message.tool_calls[0]
            tool_name = tool_call.function.name
            tool_args = json.loads(tool_call.function.arguments)

            # 执行工具
            result = await self.session.call_tool(tool_name, tool_args)
            print(f"\n\n[Calling tool {tool_name} with args {tool_args}]\n\n")

            # 将结果存入消息历史
            messages.append(content.message.model_dump())
            messages.append({
                "role": "tool",
                "content": result.content[0].text,
                "tool_call_id": tool_call.id,
            })

            # 将结果返回给大模型生成最终响应
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
            )
            return response.choices[0].message.content

        return content.message.content

    async def chat_loop(self):
        """运行交互式聊天循环"""
        print("\n🤖 MCP 客户端已启动!输入 'quit' 退出")
        while True:
            try:
                query = input("\n你: ").strip()
                if query.lower() == 'quit':
                    break
                response = await self.process_query(query)  # 发送用户输入到 OpenAI API
                print(f"\n🤖 OpenAI: {response}")
            except Exception as e:
                print(f"\n⚠️ 发生错误: {str(e)}")

    async def cleanup(self):
        """清理资源"""
        await self.exit_stack.aclose()


async def main():
    """主函数"""
    if len(sys.argv) < 2:
        print("Usage: python client.py <path_to_server_script>")
        sys.exit(1)

    client = MCPClient()
    try:
        await client.connect_to_server(sys.argv[1])
        await client.chat_loop()
    finally:
        await client.cleanup()


if __name__ == "__main__":
    asyncio.run(main())

三、天气查询服务器Server搭建

1、. MCP服务器概念介绍

根据MCP协议定义,Server可以提供三种类型的标准能力,Resources、Tools、Prompts,每个Server可同时提供者三种类型能力或其中一种。
Resources:资源,类似于文件数据读取,可以是文件资源或是API响应返回的内容。
Tools:工具,第三方服务、功能函数,通过此可控制LLM可调用哪些函数。
Prompts:提示词,为用户预先定义好的完成特定任务的模板。

2. MCP服务器通讯机制

Model Context Protocol(MCP)是一种由 Anthropic 开源的协议,旨在将大型语言模型直接连接至数据源,实现无缝集成。根据 MCP 的规范,当前支持两种传输方式:标准输入输出(stdio)和基于HTTP 的服务器推送事件(SSE)。

标准输入输出(stdio)模式是一种用于本地通信的传输方式。在这种模式下,MCP 客户端会将服务器程序作为子进程启动,双方通过标准输入(stdin)和标准输出(stdout)进行数据交换,消息格式为JSON-RPC 2.0。这种方式适用于客户端和服务器在同一台机器上运行的场景,确保了高效、低延迟的通信。

基于 HTTP 和服务器推送事件(SSE)的传输方式,适用于客户端和服务器位于不同物理位置的场景。在这种模式下,客户端和服务器通过 HTTP 协议进行通信,利用 SSE 实现服务器向客户端的实时数据推送,消息的格式为JSON-RPC 2.0,Server定义了/see与/messages接口用于推送与接收数据。

3、实现MCP服务端,

服务端通过使用OpenWeather API,创建一个能够实时查询天气的服务器(server),并使用stdio方式进行通信

申请一个OpenWeather API的密钥
# 测试查询结果
curl -s "https://api.openweathermap.org/data/2.5/weatherq=Beijing&appid='YOUR_API_KEY'&units=metric&lang=zh_cn"

在这里插入图片描述

服务器依赖安装
uv add mcp httpx

在这里插入图片描述

编写MCP server 内容
import json
import os
import httpx
from dotenv import load_dotenv
from typing import Any
from mcp.server.fastmcp import FastMCP

load_dotenv()

# 初始化 MCP 服务器
mcp = FastMCP("WeatherServer")

# OpenWeather API 配置
OPENWEATHER_API_BASE = "https://api.openweathermap.org/data/2.5/weather"
API_KEY = os.getenv("API_KEY")  # 从环境变量中读取API_KEY
USER_AGENT = "weather-app/1.0"


async def fetch_weather(city: str) -> dict[str, Any] | None:
    """
    从 OpenWeather API 获取天气信息。
    :param city: 城市名称(需使用英文,如 Beijing)
    :return: 天气数据字典;若出错返回包含 error 信息的字典
    """
    params = {
        "q": city,
        "appid": API_KEY,
        "units": "metric",
        "lang": "zh_cn"
    }
    headers = {"User-Agent": USER_AGENT}
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(
                OPENWEATHER_API_BASE,
                params=params,
                headers=headers,
                timeout=30.0
            )
            response.raise_for_status()
            return response.json()  # 返回字典类型
        except httpx.HTTPStatusError as e:
            return {"error": f"HTTP 错误: {e.response.status_code}"}
        except Exception as e:
            return {"error": f"请求失败: {str(e)}"}


def format_weather(data: dict[str, Any] | str) -> str:
    """
    将天气数据格式化为易读文本。
    :param data: 天气数据(可以是字典或 JSON 字符串)
    :return: 格式化后的天气信息字符串
    """
    # 如果传入的是字符串,则先转换为字典
    if isinstance(data, str):
        try:
            data = json.loads(data)
        except Exception as e:
            return f"无法解析天气数据: {e}"
    
    # 如果数据中包含错误信息,直接返回错误提示
    if "error" in data:
        return f"⚠️ {data['error']}"
    
    # 提取数据时做容错处理
    city = data.get("name", "未知")
    country = data.get("sys", {}).get("country", "未知")
    temp = data.get("main", {}).get("temp", "N/A")
    humidity = data.get("main", {}).get("humidity", "N/A")
    wind_speed = data.get("wind", {}).get("speed", "N/A")
    # weather 可能为空列表,因此用 [0] 前先提供默认字典
    weather_list = data.get("weather", [{}])
    description = weather_list[0].get("description", "未知")
    
    return (
        f"🌍 {city}, {country}\n"
        f"🌡 温度: {temp}°C\n"
        f"💧 湿度: {humidity}%\n"
        f"🌬 风速: {wind_speed} m/s\n"
        f"🌤 天气: {description}\n"
    )


@mcp.tool()
async def query_weather(city: str) -> str:
    """
    输入指定城市的英文名称,返回今日天气查询结果。
    :param city: 城市名称(需使用英文)
    :return: 格式化后的天气信息
    """
    data = await fetch_weather(city)
    return format_weather(data)


if __name__ == "__main__":
    # 以标准 I/O 方式运行 MCP 服务器
    mcp.run(transport='stdio')
测试运行
# 确认激活虚拟环境
source .venv/bin/activate
# 运行
uv run MCPClient.py server.py

在这里插入图片描述

这里调用的是deepseek-v3模型,调用工具中的天气查询进行查询对应城市天气情况

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐