引言

目前大火的Deepseek-R1由于真正的开源,这使得许多公司可以免费的进行部署,但是部署后的推理仍然需要成本。Deepseek官网本身提供的API有限,目前再想上车已经不可能了。但是好在有很多第三方的API可以使用。早期有说硅基流动的API还可以,但是我试了一下,整体推理还是太慢了。

也看到抖音上说,一个10岁的孩子都可以10分钟部署Deepseek-R1了,实际结果就是调用已经封装好的一键安装程序,有点像10分钟安装一个app,这样说的话,是不是就没那么神奇了。

那么,我们该怎么像10岁的孩子一样,能安装呢?我看淘宝上还有59米的教程,结果就是一个说明文档,和相应的下载链接,而且里面还有二次付费,这不相当于“智商税”了么。

下面,我们以火山引擎为例,说明我们如何自己调用Deepseek-R1模型,其实也没什么神秘的,跟原来的调用一样的。

该用哪个API?

我也去看了很多第三方的测评,例如www.SuperCLUEai.com的,它给出的评估结果如下:
在这里插入图片描述
可以看到火山引擎是比较完整的,为此,我特地自己尝试了一下整个调用过程利用火山引擎。

如何使用火山引擎的API?

首先,大家可以注册一下火山引擎:火山方舟大模型体验中心-火山引擎,首次使用可以领取15米的体验券。

然后,可以登录到火山引擎的开通管理(https://console.volcengine.com/ark/region:ark+cn-beijing/openManagement?LLM=%7B%7D&OpenTokenDrawer=false),将DeepSeek-R1DeepSeek-V3的模型都开通了。

在这里插入图片描述接着,进入在线推理页面(https://console.volcengine.com/ark/region:ark+cn-beijing/endpoint
)就可以获得api-key模型名了。这里要注意一点的是,Deepseek-R1在这里的模型名是deepseek-r1-250120
在这里插入图片描述
这里最困难的地方在于火山引擎的内部管理系统稍微有些混乱,很难很快的完成跳转,也有很多其他的页面充满了迷惑感,都不知道具体要去哪个页面才能调用。

ChatBox调用

作为目前最流行的桌面端的大模型对话UI,可以从官网(https://chatboxai.app/zh)获取。下载以后,可以点击左下角的设置,然后选择自定义API即可,然后选择OpenAI AP兼容,并输入相应的api域名即可。
在这里插入图片描述
当全部填完以后,就可以进行一个愉快的体验了。

在这里插入图片描述

python代码调用

如果想用代码调用,也非常容易,依然走OpenAPI的兼容形式即可:

import requests
import json


def deepseek_chat(api_url, api_key, model, messages, timeout=360):
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {api_key}"
    }

    payload = {
        "model": model,
        "messages": messages
    }

    response = requests.post(api_url, headers=headers, data=json.dumps(payload), timeout=timeout)

    if response.status_code == 200:
        return response.json()  # 返回 JSON 格式的响应
    else:
        return f"Error: {response.status_code} - {response.text}"


if __name__ == '__main__':
    # 配置部分
    api_url = "https://ark.cn-beijing.volces.com/api/v3/chat/completions"
    api_key = "<your api-key>"
    model = "deepseek-r1-250120"
    messages = [
        {"role": "system", "content": "你是人工智能助手."},
        {"role": "user", "content": "常见的十字花科植物有哪些?"}
    ]

    # 调用 Deep-seek API
    response = deepseek_chat(api_url, api_key, model, messages)

    # 打印返回的结果
    print(json.dumps(response, indent=4, ensure_ascii=False))

这里需要注意的是,不同于一般的模型输出,这里的message里面有两个输出,分别是content以及reasoning_content,表示的是说话和推理的内容。
在这里插入图片描述
如果想批量调用的话,也可以使用下面的代码:

import requests
import json
import concurrent.futures
import os
import threading
from tqdm import tqdm  # 导入 tqdm

# 创建一个锁对象
lock = threading.Lock()

def deepseek_chat(api_url, api_key, model, question, timeout=360):
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {api_key}"
    }

    messages = [
        {"role": "system", "content": "你是一个极其聪明的人工智能助手。"},
        {"role": "user", "content": question}
    ]

    payload = {
        "model": model,
        "messages": messages
    }

    try:
        response = requests.post(api_url, headers=headers, data=json.dumps(payload), timeout=timeout)

        if response.status_code == 200:
            return response.json()  # 返回 JSON 格式的响应和请求的完整消息
        else:
            return f"Error: {response.status_code} - {response.text}"
    except requests.exceptions.Timeout:
        return f"Error: The request for '{question}' timed out."
    except requests.exceptions.RequestException as e:
        return f"Error: An unexpected error occurred for '{question}': {e}"

def process_questions_from_file(file_path):
    """从文件读取问题列表"""
    with open(file_path, 'r', encoding='utf-8') as file:
        questions = file.readlines()
    return [q.strip() for q in questions]

def load_progress(output_path):
    """从输出文件读取已经处理的进度"""
    if os.path.exists(output_path):
        with open(output_path, 'r', encoding='utf-8') as file:
            try:
                data = json.load(file)
                return data.get('processed_indexes', [])
            except json.JSONDecodeError:
                return []
    return []

def save_progress(output_path, processed_indexes, response):
    """保存进度和每个问题的回答到文件"""
    result_data = {}
    if os.path.exists(output_path):
        with open(output_path, 'r', encoding='utf-8') as file:
            try:
                result_data = json.load(file)
            except json.JSONDecodeError:
                pass

    # 更新进度和返回的结果
    result_data['processed_indexes'] = processed_indexes
    if 'responses' not in result_data:
        result_data['responses'] = []

    result_data['responses'].append({"response": response})

    # 使用锁确保文件写入是线程安全的
    with lock:
        with open(output_path, 'w', encoding='utf-8') as file:
            json.dump(result_data, file, ensure_ascii=False, indent=4)

def batch_process(api_url, api_key, model, questions, num_threads=10, output_path='output.json'):
    """批量处理问题,并保存进度和返回结果"""
    processed_indexes = load_progress(output_path)  # 获取已经处理的索引
    remaining_questions = [q for idx, q in enumerate(questions) if idx not in processed_indexes]

    # 使用 tqdm 显示进度条
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = []
        # 将 tqdm 包装到循环中,显示进度
        with tqdm(total=len(remaining_questions), desc="Processing Questions", ncols=100) as pbar:
            for idx, question in enumerate(remaining_questions):
                futures.append(executor.submit(process_and_save, idx, question, api_url, api_key, model, output_path, processed_indexes, pbar))

            # 等待所有任务完成
            concurrent.futures.wait(futures)

def process_and_save(idx, question, api_url, api_key, model, output_path, processed_indexes, pbar):
    """处理每个问题并保存进度"""
    response = deepseek_chat(api_url, api_key, model, question)

    # 更新进度和保存结果
    processed_indexes.append(idx)
    save_progress(output_path, processed_indexes, response)

    # 更新进度条
    pbar.update(1)
    print(f"Processed {idx + 1} questions.")

if __name__ == '__main__':
    # 配置部分
    api_url = "https://ark.cn-beijing.volces.com/api/v3/chat/completions"
    api_key = "<your api-key>"
    model = "deepseek-r1-250120"
    output_path = "output.json"

    # 从文件中读取问题列表
    file_path = "Input.txt"  # 输入问题的文本文件
    questions = process_questions_from_file(file_path)

    # 批量处理问题并获取返回结果
    batch_process(api_url, api_key, model, questions, num_threads=10, output_path=output_path)

这样就可以有一个批量调用的结果。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐