
10岁孩子可以的,我也可以!Deepseek-R1调用的N种方法
目前大火的Deepseek-R1由于真正的开源,这使得许多公司可以免费的进行部署,但是部署后的推理仍然需要成本。Deepseek官网本身提供的API有限,目前再想上车已经不可能了。但是好在有很多第三方的API可以使用。早期有说硅基流动的API还可以,但是我试了一下,整体推理还是太慢了。也看到抖音上说,一个10岁的孩子都可以10分钟部署Deepseek-R1了,实际结果就是调用已经封装好的一键安装程
引言
目前大火的Deepseek-R1由于真正的开源,这使得许多公司可以免费的进行部署,但是部署后的推理仍然需要成本。Deepseek官网本身提供的API有限,目前再想上车已经不可能了。但是好在有很多第三方的API可以使用。早期有说硅基流动的API还可以,但是我试了一下,整体推理还是太慢了。
也看到抖音上说,一个10岁的孩子都可以10分钟部署Deepseek-R1了,实际结果就是调用已经封装好的一键安装程序,有点像10分钟安装一个app,这样说的话,是不是就没那么神奇了。
那么,我们该怎么像10岁的孩子一样,能安装呢?我看淘宝上还有59米的教程,结果就是一个说明文档,和相应的下载链接,而且里面还有二次付费,这不相当于“智商税”了么。
下面,我们以火山引擎为例,说明我们如何自己调用Deepseek-R1模型,其实也没什么神秘的,跟原来的调用一样的。
该用哪个API?
我也去看了很多第三方的测评,例如www.SuperCLUEai.com的,它给出的评估结果如下:
可以看到火山引擎是比较完整的,为此,我特地自己尝试了一下整个调用过程利用火山引擎。
如何使用火山引擎的API?
首先,大家可以注册一下火山引擎:火山方舟大模型体验中心-火山引擎,首次使用可以领取15米的体验券。
然后,可以登录到火山引擎的开通管理(https://console.volcengine.com/ark/region:ark+cn-beijing/openManagement?LLM=%7B%7D&OpenTokenDrawer=false),将DeepSeek-R1
和DeepSeek-V3
的模型都开通了。
接着,进入在线推理页面(https://console.volcengine.com/ark/region:ark+cn-beijing/endpoint
)就可以获得api-key和模型名了。这里要注意一点的是,Deepseek-R1在这里的模型名是deepseek-r1-250120
。
这里最困难的地方在于火山引擎的内部管理系统稍微有些混乱,很难很快的完成跳转,也有很多其他的页面充满了迷惑感,都不知道具体要去哪个页面才能调用。
ChatBox调用
作为目前最流行的桌面端的大模型对话UI,可以从官网(https://chatboxai.app/zh)获取。下载以后,可以点击左下角的设置,然后选择自定义API即可,然后选择OpenAI AP兼容,并输入相应的api域名即可。
当全部填完以后,就可以进行一个愉快的体验了。
python代码调用
如果想用代码调用,也非常容易,依然走OpenAPI的兼容形式即可:
import requests
import json
def deepseek_chat(api_url, api_key, model, messages, timeout=360):
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
payload = {
"model": model,
"messages": messages
}
response = requests.post(api_url, headers=headers, data=json.dumps(payload), timeout=timeout)
if response.status_code == 200:
return response.json() # 返回 JSON 格式的响应
else:
return f"Error: {response.status_code} - {response.text}"
if __name__ == '__main__':
# 配置部分
api_url = "https://ark.cn-beijing.volces.com/api/v3/chat/completions"
api_key = "<your api-key>"
model = "deepseek-r1-250120"
messages = [
{"role": "system", "content": "你是人工智能助手."},
{"role": "user", "content": "常见的十字花科植物有哪些?"}
]
# 调用 Deep-seek API
response = deepseek_chat(api_url, api_key, model, messages)
# 打印返回的结果
print(json.dumps(response, indent=4, ensure_ascii=False))
这里需要注意的是,不同于一般的模型输出,这里的message里面有两个输出,分别是content以及reasoning_content,表示的是说话和推理的内容。
如果想批量调用的话,也可以使用下面的代码:
import requests
import json
import concurrent.futures
import os
import threading
from tqdm import tqdm # 导入 tqdm
# 创建一个锁对象
lock = threading.Lock()
def deepseek_chat(api_url, api_key, model, question, timeout=360):
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
messages = [
{"role": "system", "content": "你是一个极其聪明的人工智能助手。"},
{"role": "user", "content": question}
]
payload = {
"model": model,
"messages": messages
}
try:
response = requests.post(api_url, headers=headers, data=json.dumps(payload), timeout=timeout)
if response.status_code == 200:
return response.json() # 返回 JSON 格式的响应和请求的完整消息
else:
return f"Error: {response.status_code} - {response.text}"
except requests.exceptions.Timeout:
return f"Error: The request for '{question}' timed out."
except requests.exceptions.RequestException as e:
return f"Error: An unexpected error occurred for '{question}': {e}"
def process_questions_from_file(file_path):
"""从文件读取问题列表"""
with open(file_path, 'r', encoding='utf-8') as file:
questions = file.readlines()
return [q.strip() for q in questions]
def load_progress(output_path):
"""从输出文件读取已经处理的进度"""
if os.path.exists(output_path):
with open(output_path, 'r', encoding='utf-8') as file:
try:
data = json.load(file)
return data.get('processed_indexes', [])
except json.JSONDecodeError:
return []
return []
def save_progress(output_path, processed_indexes, response):
"""保存进度和每个问题的回答到文件"""
result_data = {}
if os.path.exists(output_path):
with open(output_path, 'r', encoding='utf-8') as file:
try:
result_data = json.load(file)
except json.JSONDecodeError:
pass
# 更新进度和返回的结果
result_data['processed_indexes'] = processed_indexes
if 'responses' not in result_data:
result_data['responses'] = []
result_data['responses'].append({"response": response})
# 使用锁确保文件写入是线程安全的
with lock:
with open(output_path, 'w', encoding='utf-8') as file:
json.dump(result_data, file, ensure_ascii=False, indent=4)
def batch_process(api_url, api_key, model, questions, num_threads=10, output_path='output.json'):
"""批量处理问题,并保存进度和返回结果"""
processed_indexes = load_progress(output_path) # 获取已经处理的索引
remaining_questions = [q for idx, q in enumerate(questions) if idx not in processed_indexes]
# 使用 tqdm 显示进度条
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = []
# 将 tqdm 包装到循环中,显示进度
with tqdm(total=len(remaining_questions), desc="Processing Questions", ncols=100) as pbar:
for idx, question in enumerate(remaining_questions):
futures.append(executor.submit(process_and_save, idx, question, api_url, api_key, model, output_path, processed_indexes, pbar))
# 等待所有任务完成
concurrent.futures.wait(futures)
def process_and_save(idx, question, api_url, api_key, model, output_path, processed_indexes, pbar):
"""处理每个问题并保存进度"""
response = deepseek_chat(api_url, api_key, model, question)
# 更新进度和保存结果
processed_indexes.append(idx)
save_progress(output_path, processed_indexes, response)
# 更新进度条
pbar.update(1)
print(f"Processed {idx + 1} questions.")
if __name__ == '__main__':
# 配置部分
api_url = "https://ark.cn-beijing.volces.com/api/v3/chat/completions"
api_key = "<your api-key>"
model = "deepseek-r1-250120"
output_path = "output.json"
# 从文件中读取问题列表
file_path = "Input.txt" # 输入问题的文本文件
questions = process_questions_from_file(file_path)
# 批量处理问题并获取返回结果
batch_process(api_url, api_key, model, questions, num_threads=10, output_path=output_path)
这样就可以有一个批量调用的结果。
更多推荐
所有评论(0)