DeepSeek-R1-Distill-Llama-8B实战教程:用Ollama REST API对接Python FastAPI构建私有推理服务

想不想把DeepSeek-R1-Distill-Llama-8B这个强大的推理模型变成你自己的私有AI服务?不用再每次打开网页界面,不用依赖第三方平台,直接在本地或者自己的服务器上搭建一个专属的推理API,想什么时候调用就什么时候调用。

今天我就带你一步步实现这个目标。我们会用Ollama来部署模型,然后用Python的FastAPI框架构建一个完整的REST API服务。整个过程就像搭积木一样简单,即使你之前没怎么接触过API开发,跟着做也能搞定。

1. 准备工作:了解我们的工具箱

在开始动手之前,我们先简单了解一下要用到的几个工具,这样后面操作起来心里更有底。

1.1 DeepSeek-R1-Distill-Llama-8B:我们的核心模型

DeepSeek-R1-Distill-Llama-8B是DeepSeek团队推出的推理模型,它有个特别厉害的地方——在数学、代码和逻辑推理任务上,表现和OpenAI的o1模型差不多水平。

你可能听说过,很多大模型需要先进行监督微调,然后再用强化学习训练。但这个模型不一样,它直接用大规模强化学习训练,跳过了监督微调这一步。这样做的好处是模型在推理时能展现出很多有趣且强大的行为模式。

不过这种训练方式也有个小问题,就是模型有时候会陷入无限重复、输出可读性不太好,或者中英文混杂的情况。为了解决这些问题,DeepSeek团队又推出了改进版本,在强化学习之前加入了冷启动数据,让模型表现更稳定。

现在开源的版本里,DeepSeek-R1-Distill-Llama-8B在各种基准测试中表现相当不错。比如在AIME 2024测试中能达到50.4%的通过率,在CodeForces编程竞赛中能拿到1205分。对于8B参数量的模型来说,这个成绩已经很能打了。

1.2 Ollama:模型部署的得力助手

Ollama是个特别适合本地部署大模型的工具。它把模型管理、推理服务这些复杂的事情都封装好了,你只需要几条简单的命令就能把模型跑起来。

最方便的是,Ollama自带了一个REST API接口。这意味着你不需要自己写复杂的模型加载和推理代码,直接通过HTTP请求就能调用模型。我们后面要做的FastAPI服务,其实就是在这个API基础上再加一层包装,让它用起来更方便、功能更丰富。

1.3 FastAPI:构建API的快速通道

FastAPI是Python里特别流行的一个Web框架,专门用来构建API服务。它有几个明显的优点:

  • 速度快:基于Starlette和Pydantic,性能接近NodeJS和Go
  • 简单易用:写起来特别直观,几行代码就能搞出一个API
  • 自动文档:自动生成交互式API文档,不用自己写说明
  • 类型安全:利用Python的类型提示,减少错误

我们选择FastAPI,就是看中了它的简单和高效。你不需要是Web开发专家,也能快速搭建出专业的API服务。

2. 第一步:部署DeepSeek-R1-Distill-Llama-8B模型

好了,理论知识了解得差不多了,现在开始动手。第一步是把模型部署起来。

2.1 安装Ollama

如果你还没安装Ollama,先去官网下载对应你操作系统的版本。安装过程很简单,基本上就是一路下一步。

安装完成后,打开终端(Windows用PowerShell或CMD,Mac/Linux用Terminal),输入以下命令检查是否安装成功:

ollama --version

如果能看到版本号,说明安装成功了。

2.2 拉取和运行模型

接下来拉取我们要用的模型。在终端里输入:

ollama pull deepseek-r1:8b

这个命令会从Ollama的模型库下载DeepSeek-R1-Distill-Llama-8B模型。下载时间取决于你的网速,模型大小大概在5GB左右,耐心等待一下。

下载完成后,运行模型:

ollama run deepseek-r1:8b

如果看到模型开始响应,说明运行成功了。你可以试着问它几个问题,比如:

帮我解释一下什么是递归函数

模型会开始思考(你会看到它在"思考"的过程),然后给出回答。按Ctrl+C可以退出交互模式。

2.3 验证Ollama API

模型运行起来后,Ollama会在本地启动一个API服务,默认地址是http://localhost:11434。我们可以用curl命令测试一下:

curl http://localhost:11434/api/generate -d '{
  "model": "deepseek-r1:8b",
  "prompt": "你好,请介绍一下你自己",
  "stream": false
}'

如果看到返回的JSON数据里有模型的回答,说明API工作正常。这个测试很重要,因为后面我们的FastAPI服务就是通过这个接口和模型通信的。

3. 第二步:搭建FastAPI服务框架

模型部署好了,现在来构建我们的API服务。我建议你创建一个新的项目文件夹,这样文件管理起来更清晰。

3.1 创建项目结构

先创建项目文件夹和必要的文件:

mkdir deepseek-api-service
cd deepseek-api-service

然后创建以下文件结构:

deepseek-api-service/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── api.py
│   ├── models.py
│   └── config.py
├── requirements.txt
└── README.md

3.2 安装依赖包

在项目根目录创建requirements.txt文件,内容如下:

fastapi==0.104.1
uvicorn==0.24.0
httpx==0.25.1
pydantic==2.5.0
python-dotenv==1.0.0

然后安装这些依赖:

pip install -r requirements.txt

如果你习惯用虚拟环境,可以先创建并激活虚拟环境:

python -m venv venv
# Windows
venv\Scripts\activate
# Mac/Linux
source venv/bin/activate

然后再安装依赖。

3.3 编写基础配置

先来写配置文件app/config.py

import os
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    """应用配置类"""
    # Ollama服务地址
    OLLAMA_BASE_URL: str = "http://localhost:11434"
    
    # 使用的模型名称
    MODEL_NAME: str = "deepseek-r1:8b"
    
    # API相关配置
    API_TITLE: str = "DeepSeek-R1 API服务"
    API_DESCRIPTION: str = "基于DeepSeek-R1-Distill-Llama-8B模型的推理API服务"
    API_VERSION: str = "1.0.0"
    
    # 超时设置(秒)
    REQUEST_TIMEOUT: int = 300  # 5分钟,给模型足够的思考时间
    
    # 日志配置
    LOG_LEVEL: str = "INFO"
    
    class Config:
        env_file = ".env"

settings = Settings()

这个配置类用了Pydantic的BaseSettings,好处是可以从环境变量读取配置。我们创建了一个.env文件来管理敏感或可变的配置:

# .env文件
OLLAMA_BASE_URL=http://localhost:11434
MODEL_NAME=deepseek-r1:8b
LOG_LEVEL=INFO

4. 第三步:实现核心API功能

配置准备好了,现在来实现最核心的部分——和Ollama API通信的代码。

4.1 定义数据模型

app/models.py中定义请求和响应的数据结构:

from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from datetime import datetime

class ChatMessage(BaseModel):
    """聊天消息"""
    role: str = Field(..., description="消息角色:user或assistant")
    content: str = Field(..., description="消息内容")

class ChatRequest(BaseModel):
    """聊天请求"""
    messages: List[ChatMessage] = Field(..., description="消息历史列表")
    temperature: Optional[float] = Field(0.7, ge=0.0, le=2.0, description="温度参数,控制随机性")
    max_tokens: Optional[int] = Field(2048, ge=1, le=8192, description="最大生成token数")
    stream: Optional[bool] = Field(False, description="是否流式输出")

class CompletionRequest(BaseModel):
    """补全请求"""
    prompt: str = Field(..., description="输入提示词")
    temperature: Optional[float] = Field(0.7, ge=0.0, le=2.0, description="温度参数")
    max_tokens: Optional[int] = Field(2048, ge=1, le=8192, description="最大生成token数")
    stream: Optional[bool] = Field(False, description="是否流式输出")

class APIResponse(BaseModel):
    """API响应"""
    success: bool = Field(..., description="请求是否成功")
    data: Optional[Dict[str, Any]] = Field(None, description="响应数据")
    message: Optional[str] = Field(None, description="提示信息")
    timestamp: datetime = Field(default_factory=datetime.now, description="响应时间")

这些模型定义了API的输入输出格式。用Pydantic的好处是它会自动验证数据,比如temperature必须在0到2之间,max_tokens不能超过8192等。

4.2 实现Ollama客户端

创建app/ollama_client.py,这是和Ollama服务通信的核心:

import httpx
import logging
from typing import Dict, Any, Optional, AsyncIterator
from app.config import settings

logger = logging.getLogger(__name__)

class OllamaClient:
    """Ollama API客户端"""
    
    def __init__(self):
        self.base_url = settings.OLLAMA_BASE_URL
        self.model_name = settings.MODEL_NAME
        self.client = httpx.AsyncClient(timeout=settings.REQUEST_TIMEOUT)
        logger.info(f"Ollama客户端初始化完成,模型:{self.model_name}")
    
    async def generate_completion(self, prompt: str, **kwargs) -> Dict[str, Any]:
        """生成文本补全"""
        try:
            payload = {
                "model": self.model_name,
                "prompt": prompt,
                "stream": False,
                **kwargs
            }
            
            logger.debug(f"发送请求到Ollama: {payload}")
            response = await self.client.post(
                f"{self.base_url}/api/generate",
                json=payload
            )
            response.raise_for_status()
            
            result = response.json()
            logger.debug(f"收到Ollama响应")
            return result
            
        except httpx.RequestError as e:
            logger.error(f"请求Ollama失败: {e}")
            raise Exception(f"无法连接到Ollama服务: {e}")
        except httpx.HTTPStatusError as e:
            logger.error(f"Ollama返回错误: {e.response.status_code}")
            raise Exception(f"Ollama服务错误: {e.response.text}")
    
    async def generate_chat(self, messages: list, **kwargs) -> Dict[str, Any]:
        """生成聊天回复"""
        try:
            # 将消息格式转换为Ollama需要的格式
            formatted_messages = []
            for msg in messages:
                formatted_messages.append({
                    "role": msg.role,
                    "content": msg.content
                })
            
            payload = {
                "model": self.model_name,
                "messages": formatted_messages,
                "stream": False,
                **kwargs
            }
            
            logger.debug(f"发送聊天请求到Ollama: {payload}")
            response = await self.client.post(
                f"{self.base_url}/api/chat",
                json=payload
            )
            response.raise_for_status()
            
            result = response.json()
            logger.debug(f"收到Ollama聊天响应")
            return result
            
        except httpx.RequestError as e:
            logger.error(f"请求Ollama聊天失败: {e}")
            raise Exception(f"无法连接到Ollama服务: {e}")
        except httpx.HTTPStatusError as e:
            logger.error(f"Ollama聊天返回错误: {e.response.status_code}")
            raise Exception(f"Ollama服务错误: {e.response.text}")
    
    async def stream_completion(self, prompt: str, **kwargs) -> AsyncIterator[str]:
        """流式生成文本补全"""
        try:
            payload = {
                "model": self.model_name,
                "prompt": prompt,
                "stream": True,
                **kwargs
            }
            
            async with httpx.AsyncClient(timeout=None) as client:
                async with client.stream(
                    "POST",
                    f"{self.base_url}/api/generate",
                    json=payload
                ) as response:
                    response.raise_for_status()
                    
                    async for chunk in response.aiter_lines():
                        if chunk:
                            yield chunk
                            
        except Exception as e:
            logger.error(f"流式生成失败: {e}")
            raise
    
    async def close(self):
        """关闭客户端"""
        await self.client.aclose()
        logger.info("Ollama客户端已关闭")

# 创建全局客户端实例
ollama_client = OllamaClient()

这个客户端类封装了所有和Ollama API的交互。注意几个关键点:

  1. 用了httpx的异步客户端,性能更好
  2. 有完整的错误处理,方便排查问题
  3. 支持流式输出,适合需要实时显示的场景
  4. 日志记录很详细,调试的时候特别有用

4.3 实现API路由

现在创建app/api.py,这里定义具体的API端点:

from fastapi import APIRouter, HTTPException, Depends
from fastapi.responses import StreamingResponse
import json
import logging
from typing import List

from app.models import ChatRequest, CompletionRequest, APIResponse, ChatMessage
from app.ollama_client import ollama_client

router = APIRouter()
logger = logging.getLogger(__name__)

@router.get("/health")
async def health_check():
    """健康检查端点"""
    return APIResponse(
        success=True,
        message="服务运行正常",
        data={"status": "healthy", "model": ollama_client.model_name}
    )

@router.post("/completions")
async def create_completion(request: CompletionRequest):
    """文本补全接口"""
    try:
        logger.info(f"收到补全请求,prompt长度: {len(request.prompt)}")
        
        if request.stream:
            # 流式响应
            async def generate():
                try:
                    async for chunk in ollama_client.stream_completion(
                        prompt=request.prompt,
                        temperature=request.temperature,
                        max_tokens=request.max_tokens
                    ):
                        yield f"data: {chunk}\n\n"
                except Exception as e:
                    logger.error(f"流式生成错误: {e}")
                    yield f"data: {json.dumps({'error': str(e)})}\n\n"
            
            return StreamingResponse(
                generate(),
                media_type="text/event-stream",
                headers={
                    "Cache-Control": "no-cache",
                    "Connection": "keep-alive",
                }
            )
        else:
            # 非流式响应
            result = await ollama_client.generate_completion(
                prompt=request.prompt,
                temperature=request.temperature,
                max_tokens=request.max_tokens
            )
            
            return APIResponse(
                success=True,
                data={
                    "text": result.get("response", ""),
                    "model": result.get("model", ""),
                    "usage": {
                        "prompt_tokens": len(request.prompt) // 4,  # 粗略估算
                        "completion_tokens": len(result.get("response", "")) // 4,
                        "total_tokens": (len(request.prompt) + len(result.get("response", ""))) // 4
                    }
                }
            )
            
    except Exception as e:
        logger.error(f"补全请求处理失败: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@router.post("/chat/completions")
async def create_chat_completion(request: ChatRequest):
    """聊天补全接口(支持多轮对话)"""
    try:
        logger.info(f"收到聊天请求,消息数: {len(request.messages)}")
        
        # 验证消息格式
        if not request.messages:
            raise HTTPException(status_code=400, detail="消息列表不能为空")
        
        # 确保最后一条消息是用户消息
        if request.messages[-1].role != "user":
            raise HTTPException(
                status_code=400, 
                detail="最后一条消息必须是用户消息"
            )
        
        if request.stream:
            # 流式响应(简化版,实际需要适配Ollama的流式聊天API)
            async def generate():
                # 这里先返回非流式结果,实际可以根据Ollama API调整
                result = await ollama_client.generate_chat(
                    messages=request.messages,
                    temperature=request.temperature,
                    max_tokens=request.max_tokens
                )
                yield f"data: {json.dumps(result)}\n\n"
            
            return StreamingResponse(
                generate(),
                media_type="text/event-stream"
            )
        else:
            # 非流式响应
            result = await ollama_client.generate_chat(
                messages=request.messages,
                temperature=request.temperature,
                max_tokens=request.max_tokens
            )
            
            # 构建响应消息
            assistant_message = ChatMessage(
                role="assistant",
                content=result.get("message", {}).get("content", "")
            )
            
            return APIResponse(
                success=True,
                data={
                    "choices": [{
                        "message": assistant_message.dict(),
                        "finish_reason": "stop"
                    }],
                    "model": result.get("model", ""),
                    "usage": {
                        "prompt_tokens": sum(len(msg.content) for msg in request.messages) // 4,
                        "completion_tokens": len(assistant_message.content) // 4,
                        "total_tokens": sum(len(msg.content) for msg in request.messages + [assistant_message]) // 4
                    }
                }
            )
            
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"聊天请求处理失败: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@router.get("/models")
async def list_models():
    """获取可用模型列表"""
    try:
        # 这里可以扩展为从配置或数据库读取
        return APIResponse(
            success=True,
            data={
                "models": [
                    {
                        "id": "deepseek-r1:8b",
                        "name": "DeepSeek-R1-Distill-Llama-8B",
                        "description": "DeepSeek R1蒸馏版Llama-8B模型",
                        "max_tokens": 8192
                    }
                ]
            }
        )
    except Exception as e:
        logger.error(f"获取模型列表失败: {e}")
        raise HTTPException(status_code=500, detail=str(e))

这个文件定义了三个主要的API端点:

  1. /health - 健康检查,用来测试服务是否正常
  2. /completions - 文本补全,适合单轮问答
  3. /chat/completions - 聊天补全,支持多轮对话
  4. /models - 获取模型信息

注意聊天接口里对消息格式的验证,确保最后一条消息是用户的,这样模型才知道要回复什么。

5. 第四步:整合和运行服务

所有组件都准备好了,现在把它们整合起来。

5.1 创建主应用文件

app/main.py中:

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import logging
import uvicorn

from app.config import settings
from app.api import router

# 配置日志
logging.basicConfig(
    level=settings.LOG_LEVEL,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# 创建FastAPI应用
app = FastAPI(
    title=settings.API_TITLE,
    description=settings.API_DESCRIPTION,
    version=settings.API_VERSION,
    docs_url="/docs",
    redoc_url="/redoc"
)

# 添加CORS中间件(允许跨域请求)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 生产环境应该限制具体的域名
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 包含路由
app.include_router(router, prefix="/api/v1")

@app.on_event("startup")
async def startup_event():
    """应用启动时执行"""
    logger.info(f"启动{settings.API_TITLE}...")
    logger.info(f"使用模型: {settings.MODEL_NAME}")
    logger.info(f"Ollama服务地址: {settings.OLLAMA_BASE_URL}")

@app.on_event("shutdown")
async def shutdown_event():
    """应用关闭时执行"""
    from app.ollama_client import ollama_client
    await ollama_client.close()
    logger.info("应用已关闭")

@app.get("/")
async def root():
    """根路径"""
    return {
        "message": f"欢迎使用{settings.API_TITLE}",
        "docs": "/docs",
        "health_check": "/api/v1/health"
    }

if __name__ == "__main__":
    uvicorn.run(
        "app.main:app",
        host="0.0.0.0",
        port=8000,
        reload=True,  # 开发模式启用热重载
        log_level=settings.LOG_LEVEL.lower()
    )

5.2 运行服务

现在可以运行我们的服务了。在项目根目录执行:

python -m app.main

你会看到类似这样的输出:

INFO:     Started server process [12345]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)

服务启动后,打开浏览器访问 http://localhost:8000/docs,你会看到自动生成的API文档。这个文档是交互式的,可以直接在页面上测试API。

6. 第五步:测试和使用API

服务跑起来了,我们来测试一下各个功能是否正常。

6.1 健康检查测试

用curl或者直接在浏览器访问:

curl http://localhost:8000/api/v1/health

应该返回类似这样的响应:

{
  "success": true,
  "message": "服务运行正常",
  "data": {
    "status": "healthy",
    "model": "deepseek-r1:8b"
  },
  "timestamp": "2024-01-15T10:30:00.123456"
}

6.2 文本补全测试

测试单轮问答:

curl -X POST "http://localhost:8000/api/v1/completions" \
  -H "Content-Type: application/json" \
  -d '{
    "prompt": "用Python写一个快速排序算法",
    "temperature": 0.7,
    "max_tokens": 500
  }'

你会收到模型生成的Python代码。注意观察响应时间,DeepSeek-R1模型会有一个"思考"过程,所以响应可能比普通模型稍慢一些,但生成的质量会更高。

6.3 聊天补全测试

测试多轮对话:

curl -X POST "http://localhost:8000/api/v1/chat/completions" \
  -H "Content-Type: application/json" \
  -d '{
    "messages": [
      {
        "role": "user",
        "content": "什么是递归函数?"
      },
      {
        "role": "assistant", 
        "content": "递归函数是一种在函数内部调用自身的函数。它通常用于解决可以分解为相同问题的子问题的情况,比如计算阶乘、遍历树结构等。"
      },
      {
        "role": "user",
        "content": "能给我一个Python的例子吗?"
      }
    ],
    "temperature": 0.7,
    "max_tokens": 300
  }'

模型会根据对话历史来回答,给出Python的递归函数示例。

6.4 使用Python客户端调用

你也可以用Python代码来调用这个API服务:

import requests
import json

# 文本补全
def test_completion():
    url = "http://localhost:8000/api/v1/completions"
    payload = {
        "prompt": "解释一下机器学习中的过拟合现象",
        "temperature": 0.7,
        "max_tokens": 300
    }
    
    response = requests.post(url, json=payload)
    if response.status_code == 200:
        result = response.json()
        if result["success"]:
            print("回答:", result["data"]["text"])
        else:
            print("请求失败:", result["message"])
    else:
        print(f"HTTP错误: {response.status_code}")

# 聊天补全  
def test_chat():
    url = "http://localhost:8000/api/v1/chat/completions"
    payload = {
        "messages": [
            {"role": "user", "content": "帮我写一个Python函数,计算斐波那契数列"},
            {"role": "assistant", "content": "好的,这是一个计算斐波那契数列的Python函数:\n\n```python\ndef fibonacci(n):\n    if n <= 0:\n        return []\n    elif n == 1:\n        return [0]\n    elif n == 2:\n        return [0, 1]\n    \n    fib_sequence = [0, 1]\n    for i in range(2, n):\n        fib_sequence.append(fib_sequence[-1] + fib_sequence[-2])\n    return fib_sequence\n```"},
            {"role": "user", "content": "这个函数的时间复杂度是多少?能优化吗?"}
        ],
        "temperature": 0.7,
        "max_tokens": 400
    }
    
    response = requests.post(url, json=payload)
    if response.status_code == 200:
        result = response.json()
        if result["success"]:
            print("回答:", result["data"]["choices"][0]["message"]["content"])
        else:
            print("请求失败:", result["message"])
    else:
        print(f"HTTP错误: {response.status_code}")

if __name__ == "__main__":
    print("测试文本补全...")
    test_completion()
    
    print("\n测试聊天补全...")
    test_chat()

7. 进阶功能扩展

基础功能已经实现了,但一个完整的生产级服务还需要更多功能。下面我介绍几个常见的扩展方向。

7.1 添加身份验证

生产环境中,API通常需要身份验证。我们可以用FastAPI的依赖注入系统来实现:

# app/auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import secrets

security = HTTPBearer()

# 简单的API密钥验证(生产环境应该用更安全的方式)
API_KEYS = {
    "your-secret-api-key-here": "admin",
    "another-api-key": "user"
}

async def verify_api_key(
    credentials: HTTPAuthorizationCredentials = Depends(security)
):
    """验证API密钥"""
    if credentials.scheme != "Bearer":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication scheme"
        )
    
    if credentials.credentials not in API_KEYS:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid API key"
        )
    
    return API_KEYS[credentials.credentials]

# 在路由中使用
@router.post("/completions")
async def create_completion(
    request: CompletionRequest,
    user_role: str = Depends(verify_api_key)  # 添加依赖
):
    # ... 原有代码

7.2 添加速率限制

防止API被滥用,可以添加速率限制:

# app/rate_limit.py
from fastapi import HTTPException, Request
import time
from collections import defaultdict

class RateLimiter:
    def __init__(self, requests_per_minute: int = 60):
        self.requests_per_minute = requests_per_minute
        self.requests = defaultdict(list)
    
    async def __call__(self, request: Request):
        client_ip = request.client.host
        current_time = time.time()
        
        # 清理一分钟前的记录
        self.requests[client_ip] = [
            req_time for req_time in self.requests[client_ip]
            if current_time - req_time < 60
        ]
        
        # 检查是否超限
        if len(self.requests[client_ip]) >= self.requests_per_minute:
            raise HTTPException(
                status_code=429,
                detail="请求过于频繁,请稍后再试"
            )
        
        # 记录本次请求
        self.requests[client_ip].append(current_time)

# 创建限流器实例
rate_limiter = RateLimiter(requests_per_minute=30)

# 在路由中使用
@router.post("/completions", dependencies=[Depends(rate_limiter)])
async def create_completion(request: CompletionRequest):
    # ... 原有代码

7.3 添加请求日志

为了更好地监控和分析,可以添加详细的请求日志:

# app/middleware.py
from fastapi import Request
import time
import logging

logger = logging.getLogger(__name__)

async def log_requests(request: Request, call_next):
    """记录请求日志的中间件"""
    start_time = time.time()
    
    # 记录请求信息
    logger.info(f"收到请求: {request.method} {request.url.path}")
    
    # 处理请求
    response = await call_next(request)
    
    # 计算处理时间
    process_time = time.time() - start_time
    
    # 记录响应信息
    logger.info(
        f"请求完成: {request.method} {request.url.path} "
        f"状态码: {response.status_code} "
        f"耗时: {process_time:.3f}s"
    )
    
    # 添加处理时间到响应头
    response.headers["X-Process-Time"] = str(process_time)
    
    return response

# 在主应用中添加中间件
app.middleware("http")(log_requests)

7.4 支持流式响应

对于需要实时显示生成结果的场景,流式响应特别有用。我们已经在前面的代码中实现了基础版本,这里可以进一步完善:

@router.post("/completions/stream")
async def create_completion_stream(request: CompletionRequest):
    """流式文本补全接口"""
    async def event_generator():
        try:
            async with httpx.AsyncClient(timeout=None) as client:
                async with client.stream(
                    "POST",
                    f"{ollama_client.base_url}/api/generate",
                    json={
                        "model": ollama_client.model_name,
                        "prompt": request.prompt,
                        "stream": True,
                        "temperature": request.temperature,
                        "max_tokens": request.max_tokens
                    }
                ) as response:
                    async for chunk in response.aiter_bytes():
                        if chunk:
                            # 解析Ollama的流式响应
                            chunk_str = chunk.decode('utf-8')
                            if chunk_str.strip():
                                yield f"data: {chunk_str}\n\n"
        
        except Exception as e:
            logger.error(f"流式生成错误: {e}")
            yield f"data: {json.dumps({'error': str(e)})}\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"  # 禁用Nginx缓冲
        }
    )

8. 部署到生产环境

开发完成的服务,最终要部署到生产环境。这里有几个建议:

8.1 使用Gunicorn运行(Linux/Mac)

对于生产环境,建议用Gunicorn来运行FastAPI应用:

pip install gunicorn
gunicorn app.main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000

8.2 使用Docker容器化

创建Dockerfile:

FROM python:3.9-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    curl \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 暴露端口
EXPOSE 8000

# 运行应用
CMD ["python", "-m", "app.main"]

构建和运行:

# 构建镜像
docker build -t deepseek-api .

# 运行容器
docker run -d \
  -p 8000:8000 \
  --name deepseek-api \
  -e OLLAMA_BASE_URL=http://host.docker.internal:11434 \
  deepseek-api

8.3 使用Nginx反向代理

在生产环境中,通常会用Nginx做反向代理:

# /etc/nginx/sites-available/deepseek-api
server {
    listen 80;
    server_name your-domain.com;
    
    location / {
        proxy_pass http://localhost:8000;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # WebSocket支持(如果需要)
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
    
    # 限制请求体大小
    client_max_body_size 10M;
    
    # 超时设置
    proxy_read_timeout 300s;
    proxy_connect_timeout 75s;
}

8.4 使用Supervisor管理进程

确保服务在崩溃后自动重启:

# /etc/supervisor/conf.d/deepseek-api.conf
[program:deepseek-api]
command=/path/to/venv/bin/gunicorn app.main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
directory=/path/to/your/app
user=www-data
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stderr_logfile=/var/log/deepseek-api/error.log
stdout_logfile=/var/log/deepseek-api/access.log

9. 总结

通过这个教程,我们完成了一个完整的DeepSeek-R1-Distill-Llama-8B私有推理服务的搭建。整个过程可以总结为几个关键步骤:

9.1 核心收获

  1. 模型部署变得简单:用Ollama部署大模型,不再需要复杂的配置和依赖管理
  2. API服务标准化:通过FastAPI构建了符合REST规范的API接口,方便其他系统集成
  3. 功能完整实用:实现了文本补全、聊天对话、健康检查等核心功能
  4. 扩展性强:代码结构清晰,容易添加身份验证、速率限制、日志监控等生产级功能
  5. 部署灵活:支持本地开发、Docker容器化、生产环境部署等多种场景

9.2 实际应用建议

在实际使用中,我有几个建议:

对于个人开发者

  • 可以直接在本地运行,用于学习和开发测试
  • 可以结合其他工具,比如做成VS Code插件、命令行工具等
  • 注意模型对硬件的要求,8B模型至少需要16GB内存才能流畅运行

对于团队使用

  • 一定要添加身份验证和速率限制
  • 考虑使用数据库记录请求日志,方便分析和计费
  • 可以部署在多台服务器上,用负载均衡分发请求
  • 监控服务的响应时间和资源使用情况

性能优化方向

  1. 如果请求量大,可以考虑用Redis缓存一些常见问题的回答
  2. 调整Ollama的并行处理参数,提高吞吐量
  3. 对于长时间运行的推理任务,可以考虑用消息队列异步处理

9.3 遇到的坑和解决方案

在开发过程中,你可能会遇到一些问题,这里提前给你一些解决方案:

问题1:Ollama服务连接不上

  • 检查Ollama是否在运行:ollama list
  • 检查端口是否正确:默认是11434
  • 如果是Docker部署,注意网络配置

问题2:模型响应太慢

  • DeepSeek-R1是推理模型,需要"思考"时间,这是正常的
  • 可以调整temperature参数降低随机性,可能加快速度
  • 确保有足够的内存,避免频繁交换

问题3:API超时

  • 调整FastAPI和Ollama的超时设置
  • 对于长文本生成,适当增加max_tokens限制
  • 考虑实现异步任务和轮询机制

9.4 下一步学习方向

如果你对这个项目感兴趣,可以继续深入:

  1. 前端界面开发:用Vue或React开发一个聊天界面
  2. 多模型支持:扩展支持其他Ollama模型
  3. 微调集成:集成模型微调功能
  4. 知识库增强:结合向量数据库实现RAG功能
  5. 监控告警:添加Prometheus和Grafana监控

这个项目最大的价值在于,它给了你一个完全可控的AI推理服务。你可以根据自己的需求定制功能,不用担心API调用限制,也不用担心数据隐私问题。而且整个架构是模块化的,想加什么功能就加什么功能,非常灵活。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐