Python接入deepseek API(官网和腾讯)
在当今的人工智能领域,自然语言处理(NLP)技术已经取得了显著的进展。OpenAI 的 GPT 系列模型是其中的佼佼者,而 DeepSeek 作为一个基于 OpenAI 的 API 服务,提供了强大的对话生成能力。本文将详细介绍如何使用OpenAI SDK 与 DeepSeek 进行对话,并深入解析每一段代码的实现细节。
目录
一、引言
在当今的人工智能领域,自然语言处理(NLP)技术已经取得了显著的进展。OpenAI 的 GPT 系列模型是其中的佼佼者,而 DeepSeek 作为一个基于 OpenAI 的 API 服务,提供了强大的对话生成能力。本文将详细介绍如何使用OpenAI SDK 与 DeepSeek 进行对话,并深入解析每一段代码的实现细节。
二、准备工作
2.1安装 OpenAI SDK
首先,我们需要安装 OpenAI 的 Python SDK。可以通过以下命令进行安装:
pip3 install openai
2.2导入必要的库
在代码的开头,我们导入了几个必要的 Python 库:
from openai import OpenAI
import os
import json
import glob
openai: 这是 OpenAI 的官方 Python SDK,用于与 OpenAI 的 API 进行交互。
os: 用于处理文件和目录路径。
json: 用于处理 JSON 数据的编码和解码。
glob: 用于查找符合特定模式的文件路径。
三、具体代码
接下来,我们初始化 OpenAI 客户端,并指定 API Key 和基础 URL:
client = OpenAI(
api_key="sk-xxxx", # 替换为你的API Key
base_url="https://api.deepseek.com",
)
api_key: 这是你的 OpenAI API Key,用于验证身份。
base_url: 这是 DeepSeek 的 API 地址。
1 历史记录管理
为了保存和加载对话历史,我们定义了一些函数来管理历史记录文件。
1.1 获取最新的历史记录文件
history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
if not history_files:
return None
latest_file = max(history_files, key=os.path.getctime)
return latest_file
glob.glob: 查找所有符合模式的文件路径。
os.path.getctime: 获取文件的创建时间。
max: 返回创建时间最新的文件。
1.2 生成下一个历史记录文件名
def get_next_history_file_name():
"""生成下一个历史记录文件名"""
history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
if not history_files:
return os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}1.json")
latest_file = max(history_files, key=os.path.getctime)
latest_num = int(os.path.basename(latest_file).split("_")[1].split(".")[0])
return os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}{latest_num + 1}.json")
os.path.basename: 获取文件名。
split: 分割文件名以提取序号。
1.3 保存历史记录
def save_history(history):
"""保存历史记录到文件"""
file_name = get_next_history_file_name()
with open(file_name, 'w', encoding='utf-8') as f:
json.dump(history, f, ensure_ascii=False, indent=4)
print(f"历史记录已保存到 {file_name},并清空缓存。")
json.dump: 将 Python 对象序列化为 JSON 格式并写入文件。
1.4 加载历史记录
def load_history(file_name):
"""从指定文件加载历史记录"""
with open(file_name, 'r', encoding='utf-8') as f:
history = json.load(f)
return history
return []
json.load: 从文件中读取 JSON 数据并反序列化为 Python 对象。
1.5 删除历史记录
def delete_latest_history():
"""删除最新的历史记录文件"""
latest_file = get_latest_history_file()
if latest_file:
os.remove(latest_file)
print(f"已删除最新的历史记录文件:{latest_file}")
else:
print("没有历史记录文件可删除。")
os.remove: 删除指定文件。
2.主程序逻辑
2.1 初始化对话历史
conversation_history = [
{"role": "system", "content": "You are a helpful assistant"}
]
conversation_history: 存储对话历史的列表,初始包含系统消息。
2.2 主循环
主循环负责处理用户输入、调用 API 并输出结果。
while True:
try:
# 获取用户输入
user_input = input("请输入你的问题:")
# 处理特殊命令
if user_input.lower().strip() == "exit":
break
# 将用户输入添加到对话历史中
conversation_history.append({"role": "user", "content": user_input.strip()})
# 调用 API
response = client.chat.completions.create(
model="deepseek-chat",
messages=conversation_history,
stream=True
)
# 处理流式输出
for chunk in response:
if chunk.choices:
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content is not None:
print(delta.content, end='', flush=True)
# 将模型回复添加到对话历史中
conversation_history.append({"role": "assistant", "content": model_response})
except KeyboardInterrupt:
print("\n程序已手动终止,您可以继续提问。")
continue
except Exception as e:
print(f"发生错误:{e}")
continue
client.chat.completions.create: 调用 DeepSeek API 生成对 话回复。
stream=True: 启用流式输出,逐块返回结果。
三、完整代码
# Please install OpenAI SDK first: `pip3 install openai`
from openai import OpenAI
import os
import json
import glob
# 初始化OpenAI客户端
client = OpenAI(
api_key="sk-xxxxxx", # 替换为你的API Key
base_url="https://api.deepseek.com",
)
# 定义历史记录文件前缀和目录
HISTORY_FILE_PREFIX = "history_"
HISTORY_FILE_DIR = "history_records"
# 确保历史记录目录存在
if not os.path.exists(HISTORY_FILE_DIR):
os.makedirs(HISTORY_FILE_DIR)
def get_latest_history_file():
"""获取最新的历史记录文件"""
history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
if not history_files:
return None
latest_file = max(history_files, key=os.path.getctime)
return latest_file
def get_next_history_file_name():
"""生成下一个历史记录文件名"""
history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
if not history_files:
return os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}1.json")
latest_file = max(history_files, key=os.path.getctime)
latest_num = int(os.path.basename(latest_file).split("_")[1].split(".")[0])
return os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}{latest_num + 1}.json")
def save_history(history):
"""保存历史记录到文件"""
file_name = get_next_history_file_name()
with open(file_name, 'w', encoding='utf-8') as f:
json.dump(history, f, ensure_ascii=False, indent=4)
print(f"历史记录已保存到 {file_name},并清空缓存。")
def load_history(file_name):
"""从指定文件加载历史记录"""
with open(file_name, 'r', encoding='utf-8') as f:
history = json.load(f)
return history
return []
def delete_latest_history():
"""删除最新的历史记录文件"""
latest_file = get_latest_history_file()
if latest_file:
os.remove(latest_file)
print(f"已删除最新的历史记录文件:{latest_file}")
else:
print("没有历史记录文件可删除。")
def delete_all_history():
"""删除所有历史记录文件"""
history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
if history_files:
for file in history_files:
os.remove(file)
print("已删除所有历史记录文件。")
else:
print("没有历史记录文件可删除。")
def load_specific_history(file_num):
"""加载指定序号的历史记录文件"""
file_name = os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}{file_num}.json")
if os.path.exists(file_name):
history = load_history(file_name)
print(f"加载的历史记录如下(极简版):")
for entry in history:
role = entry["role"]
content = entry["content"]
print(f"{role.capitalize()}: {content}")
print("=" * 40) # 分隔线
return history
else:
print(f"未找到历史记录文件:{file_name}")
return []
def main():
# 初始化对话历史
conversation_history = [
{"role": "system", "content": "You are a helpful assistant"}
]
model = "deepseek-chat" # 默认模型
long_text_mode = False # 默认不启用长文本模式
skip_processing = False # 新增跳过处理标记
while True:
try:
if skip_processing: # 优先处理跳过标记
skip_processing = False
continue
if long_text_mode:
print("长文本模式已启用(输入 `ds` 提交内容,输入 `es` 返回普通模式):")
else:
print("请输入你的问题(输入 `exit` 退出,输入 `ds` 启用 长文本模式,输入 `r1` 切换至 deepseek-reasoner 模型,输入 `v3` 切换至 deepseek-chat 模型,输入 `coder` 切换至 deepseek-coder 模型,输入 CTRL+C 手动终止):")
user_input = ""
flag = False
# 获取用户输入
if long_text_mode:
while True:
line = input()
if line.strip().lower() == "exit":
print("程序已退出。")
flag = True
break
if line.strip().lower() == "ds": # 检测到 `ds`,提交内容
break
if line.strip().lower() == "es": # 检测到 `es`,返回普通模式
long_text_mode = False
print("已返回普通模式。")
skip_processing = True # 设置跳过标记
break
if line.strip().lower() in ["r1", "v3", "coder"]: # 在长文本模式下切换模型
if line.strip().lower() == "r1":
model = "deepseek-reasoner"
print(f"已切换模型为:{model}")
elif line.strip().lower() == "v3":
model = "deepseek-chat"
print(f"已切换模型为:{model}")
elif line.strip().lower() == "coder":
model = "deepseek-coder"
print(f"已切换模型为:{model}")
continue
user_input += line + "\n" # 将每行输入添加到用户输入中
else:
user_input = input()
if flag:
break
# 跳过处理标记检查
if skip_processing:
skip_processing = False
continue
# 如果用户输入 `uh`,加载最新的历史记录并打印
if user_input.lower().strip() == "uh":
latest_file = get_latest_history_file()
if latest_file:
conversation_history = load_history(latest_file)
print(f"加载的历史记录如下({latest_file}):")
for entry in conversation_history:
role = entry["role"]
content = entry["content"]
print(f"{role.capitalize()}: {content}")
print("=" * 40) # 分隔线
else:
print("没有历史记录文件可加载。")
continue
# 如果用户输入 `dh`,清空历史记录
if user_input.lower().strip() == "dh":
conversation_history = [
{"role": "system", "content": "You are a helpful assistant"}
]
print("历史记录已清空。")
continue
# 如果用户输入 `sh`,保存历史记录并清空缓存
if user_input.lower().strip() == "sh":
save_history(conversation_history)
conversation_history = [
{"role": "system", "content": "You are a helpful assistant"}
]
print("历史记录已清空。")
continue
# 如果用户输入 `dah`,删除最新的历史记录文件
if user_input.lower().strip() == "dah":
delete_latest_history()
continue
# 如果用户输入 `dallh`,删除所有历史记录文件
if user_input.lower().strip() == "dallh":
delete_all_history()
continue
# 如果用户输入 `ch x`,加载指定序号的历史记录并打印
if user_input.lower().strip().startswith("ch "):
try:
file_num = int(user_input.strip().split()[1]) # 提取序号并转换为整数
if file_num > 0:
conversation_history = load_specific_history(file_num)
else:
print("序号必须为正整数。")
except (ValueError, IndexError):
print("请输入正确的格式:`ch x`,其中 x 为正整数。")
continue
# 如果用户输入 `es`,跳过本次循环
if user_input.lower().strip() == "es":
continue
# 如果用户输入为空,跳过本次循环
if not user_input.strip():
print("输入为空,请重新输入。")
continue
# 如果用户输入 `exit`,则退出程序
if user_input.lower().strip() == "exit":
print("程序已退出。")
break
# 如果用户输入 `ds`,启用长文本模式并继续循环
if user_input.lower().strip() == "ds" and not long_text_mode:
long_text_mode = True
print("已启用长文本模式。")
continue
# 如果用户输入 `r1` 或 `v3` 或 `coder`,切换模型并继续循环
if user_input.lower().strip() == "r1":
model = "deepseek-reasoner"
print(f"已切换模型为:{model}")
continue
elif user_input.lower().strip() == "v3":
model = "deepseek-chat"
print(f"已切换模型为:极简版")
continue
elif user_input.lower().strip() == "coder":
model = "deepseek-coder"
print(f"已切换模型为:{model}")
continue
# 将用户输入添加到对话历史中
if user_input.strip(): # 确保输入内容不为空
conversation_history.append({"role": "user", "content": user_input.strip()})
# 创建聊天完成请求,启用流式输出
stream = client.chat.completions.create(
model=model, # 使用当前选择的模型
messages=conversation_history, # 使用对话历史作为上下文
stream=True
)
# 初始化模型回复内容和思考过程
reasoning_content = ""
model_response = ""
is_answering = False # 标记是否开始输出完整回复
is_reasoning_started = False # 标记是否已经开始输出思考过程
# 处理流式输出
for chunk in stream:
if chunk.choices:
delta = chunk.choices[0].delta
# 检查是否有思考过程内容(仅对 deepseek-reasoner 模型)
if model == "deepseek-reasoner" and hasattr(delta, 'reasoning_content') and delta.reasoning_content is not None:
if not is_reasoning_started: # 如果是第一次输出思考过程,添加分隔线
print("=" * 20 + " 思考过程 " + "=" * 20)
is_reasoning_started = True
reasoning = delta.reasoning_content
print(reasoning, end='', flush=True) # 流式输出思考过程
reasoning_content += reasoning
# 检查是否有回复内容
if hasattr(delta, 'content') and delta.content is not None:
if not is_answering: # 如果尚未开始输出完整回复
print("\n" + "=" * 20 + " 完整回复 " + "=" * 20 + "\n")
is_answering = True
content = delta.content
print(content, end='', flush=True) # 流式输出回复内容
model_response += content
# 将模型回复添加到对话历史中
conversation_history.append({"role": "assistant", "content": model_response})
print("\n" + "=" * 40 + "\n") # 分隔线
except KeyboardInterrupt: # 捕获用户按下 Ctrl+C
print("\n程序已手动终止,您可以继续提问。")
continue # 继续主循环
except Exception as e:
print(f"发生错误:{e}")
continue # 继续主循环
if __name__ == "__main__":
main()
四、腾讯云API接口
与上面的类似
完整代码
import os
import json
from openai import OpenAI
import glob
# 初始化OpenAI客户端
client = OpenAI(
api_key="sk-xxxxxxx", # 替换为你的API Key
base_url="https://api.lkeap.cloud.tencent.com/v1",
)
# 定义历史记录文件前缀和目录
HISTORY_FILE_PREFIX = "history_"
HISTORY_FILE_DIR = "history_records"
# 确保历史记录目录存在
if not os.path.exists(HISTORY_FILE_DIR):
os.makedirs(HISTORY_FILE_DIR)
def get_latest_history_file():
"""获取最新的历史记录文件"""
history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
if not history_files:
return None
latest_file = max(history_files, key=os.path.getctime)
return latest_file
def get_next_history_file_name():
"""生成下一个历史记录文件名"""
history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
if not history_files:
return os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}1.json")
latest_file = max(history_files, key=os.path.getctime)
latest_num = int(os.path.basename(latest_file).split("_")[1].split(".")[0])
return os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}{latest_num + 1}.json")
def save_history(history):
"""保存历史记录到文件"""
file_name = get_next_history_file_name()
with open(file_name, 'w', encoding='utf-8') as f:
json.dump(history, f, ensure_ascii=False, indent=4)
print(f"历史记录已保存到 {file_name},并清空缓存。")
def load_history(file_name):
"""从指定文件加载历史记录"""
with open(file_name, 'r', encoding='utf-8') as f:
history = json.load(f)
return history
return []
def delete_latest_history():
"""删除最新的历史记录文件"""
latest_file = get_latest_history_file()
if latest_file:
os.remove(latest_file)
print(f"已删除最新的历史记录文件:{latest_file}")
else:
print("没有历史记录文件可删除。")
def delete_all_history():
"""删除所有历史记录文件"""
history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
if history_files:
for file in history_files:
os.remove(file)
print("已删除所有历史记录文件。")
else:
print("没有历史记录文件可删除。")
def load_specific_history(file_num):
"""加载指定序号的历史记录文件"""
file_name = os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}{file_num}.json")
if os.path.exists(file_name):
history = load_history(file_name)
print(f"加载的历史记录如下({file_name}):")
for entry in history:
role = entry["role"]
content = entry["content"]
print(f"{role.capitalize()}: {content}")
print("=" * 40) # 分隔线
return history
else:
print(f"未找到历史记录文件:{file_name}")
return []
def main():
# 初始化对话历史
conversation_history = []
model = "deepseek-r1" # 默认模型
long_text_mode = False # 默认不启用长文本模式
skip_processing = False # 新增跳过处理标记
while True:
try:
if skip_processing: # 优先处理跳过标记
skip_processing = False
continue
if long_text_mode:
print("长文本模式已启用(输入 `ds` 提交内容,输入 `es` 返回普通模式):")
else:
print("请输入你的问题(输入 `exit` 退出,输入 `ds` 启用 长文本模式,输入v3切换至deepseek-v3模型,r1亦然,输入CTRL+C手动终止):")
user_input = ""
flag = False
# 获取用户输入
if long_text_mode:
while True:
line = input()
if line.strip().lower() == "exit":
print("程序已退出。")
flag = True
break
if line.strip().lower() == "ds": # 检测到 `ds`,提交内容
break
if line.strip().lower() == "es": # 检测到 `es`,返回普通模式
long_text_mode = False
print("已返回普通模式。")
skip_processing = True # 设置跳过标记
break
if line.strip().lower() in ["r1", "v3"]: # 在长文本模式下切换模型
if line.strip().lower() == "r1":
model = "deepseek-r1"
print(f"已切换模型为:{model}")
elif line.strip().lower() == "v3":
model = "deepseek-v3"
print(f"已切换模型为:{model}")
continue
user_input += line + "\n" # 将每行输入添加到用户输入中
else:
user_input = input()
if flag:
break
# 跳过处理标记检查
if skip_processing:
skip_processing = False
continue
# 如果用户输入 `uh`,加载最新的历史记录并打印
if user_input.lower().strip() == "uh":
latest_file = get_latest_history_file()
if latest_file:
conversation_history = load_history(latest_file)
print(f"加载的历史记录如下({latest_file}):")
for entry in conversation_history:
role = entry["role"]
content = entry["content"]
print(f"{role.capitalize()}: {content}")
print("=" * 40) # 分隔线
else:
print("没有历史记录文件可加载。")
continue
# 如果用户输入 `dh`,清空历史记录
if user_input.lower().strip() == "dh":
conversation_history = []
print("历史记录已清空。")
continue
# 如果用户输入 `sh`,保存历史记录并清空缓存
if user_input.lower().strip() == "sh":
save_history(conversation_history)
conversation_history = []
continue
# 如果用户输入 `dah`,删除最新的历史记录文件
if user_input.lower().strip() == "dah":
delete_latest_history()
continue
# 如果用户输入 `dallh`,删除所有历史记录文件
if user_input.lower().strip() == "dallh":
delete_all_history()
continue
# 如果用户输入 `ch x`,加载指定序号的历史记录并打印
if user_input.lower().strip().startswith("ch "):
try:
file_num = int(user_input.strip().split()[1]) # 提取序号并转换为整数
if file_num > 0:
conversation_history = load_specific_history(file_num)
else:
print("序号必须为正整数。")
except (ValueError, IndexError):
print("请输入正确的格式:`ch x`,其中 x 为正整数。")
continue
# 如果用户输入 `es`,跳过本次循环
if user_input.lower().strip() == "es":
continue
# 如果用户输入为空,跳过本次循环
if not user_input.strip():
print("输入为空,请重新输入。")
continue
# 如果用户输入 `exit`,则退出程序
if user_input.lower().strip() == "exit":
print("程序已退出。")
break
# 如果用户输入 `ds`,启用长文本模式并继续循环
if user_input.lower().strip() == "ds" and not long_text_mode:
long_text_mode = True
print("已启用长文本模式。")
continue
# 如果用户输入 `r1` 或 `v3`,切换模型并继续循环
if user_input.lower().strip() == "r1":
model = "deepseek-r1"
print(f"已切换模型为:{model}")
continue
elif user_input.lower().strip() == "v3":
model = "deepseek-v3"
print(f"已切换模型为:{model}")
continue
# 将用户输入添加到对话历史中
if user_input.strip(): # 确保输入内容不为空
conversation_history.append({"role": "user", "content": user_input.strip()})
# 创建聊天完成请求,启用流式输出
stream = client.chat.completions.create(
model=model, # 使用当前选择的模型
messages=conversation_history, # 使用对话历史作为上下文
stream=True
)
# 如果模型是 deepseek-v3,不显示思考过程
if model != "deepseek-v3":
print("\n" + "=" * 20 + "思考过程" + "=" * 20 + "\n")
# 初始化模型回复内容和思考过程
reasoning_content = ""
model_response = ""
is_answering = False # 标记是否开始输出完整回复
# 处理流式输出
for chunk in stream:
if chunk.choices:
delta = chunk.choices[0].delta
# 检查是否有思考过程内容
if hasattr(delta, 'reasoning_content') and delta.reasoning_content is not None and model != "deepseek-v3":
reasoning = delta.reasoning_content
print(reasoning, end='', flush=True) # 流式输出思考过程
reasoning_content += reasoning
# 检查是否有回复内容
if hasattr(delta, 'content') and delta.content is not None:
if not is_answering: # 如果尚未开始输出完整回复
print("\n" + "=" * 20 + "完整回复" + "=" * 20 + "\n")
is_answering = True
content = delta.content
print(content, end='', flush=True) # 流式输出回复内容
model_response += content
# 将模型回复添加到对话历史中
conversation_history.append({"role": "assistant", "content": model_response})
print("\n" + "=" * 40 + "\n") # 分隔线
except KeyboardInterrupt: # 捕获用户按下 Ctrl+C
print("\n程序已手动终止,您可以继续提问。")
continue # 继续主循环
except Exception as e:
print(f"发生错误:{e}")
continue # 继续主循环
if __name__ == "__main__":
main()
五、图形化界面GUI
下面代码仅仅作为参考 完整代码请看:Python调用DeepSeek API实现图形化窗口
from openai import OpenAI
import json
import glob
import threading
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox, filedialog
# 初始化OpenAI客户端
client = OpenAI(
api_key="OPENAI_API_KEY",
base_url="https://api.deepseek.com",
)
#定义历史记录文件前缀和目录
HISTORY_FILE_PREFIX = "history_"
HISTORY_FILE_DIR = "history_records"
# 确保历史记录目录存在
if not os.path.exists(HISTORY_FILE_DIR):
os.makedirs(HISTORY_FILE_DIR)
class ChatGUI:
def __init__(self, master):
self.master = master
self.model = "deepseek-chat"
self.long_text_mode = False
self.conversation_history = [{"role": "system", "content": "You are a helpful assistant"}]
self.is_streaming = False
self.setup_ui()
def setup_ui(self):
self.master.title("DeepSeek Chat GUI")
self.master.geometry("800x600")
# 顶部控制栏
control_frame = ttk.Frame(self.master)
control_frame.pack(fill=tk.X, padx=5, pady=5)
self.model_var = tk.StringVar(value=self.model)
model_combobox = ttk.Combobox(control_frame, textvariable=self.model_var,
values=["deepseek-chat", "deepseek-reasoner", "deepseek-coder"], width=15)
model_combobox.pack(side=tk.LEFT, padx=5)
model_combobox.bind("<<ComboboxSelected>>", self.on_model_change)
ttk.Button(control_frame, text="保存历史", command=self.save_history).pack(side=tk.LEFT, padx=5)
ttk.Button(control_frame, text="清空历史", command=self.clear_history).pack(side=tk.LEFT, padx=5)
ttk.Button(control_frame, text="加载历史", command=self.load_history_dialog).pack(side=tk.LEFT, padx=5)
# 对话显示区域
self.chat_area = scrolledtext.ScrolledText(self.master, wrap=tk.WORD, state=tk.DISABLED)
self.chat_area.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# 输入区域
input_frame = ttk.Frame(self.master)
input_frame.pack(fill=tk.X, padx=5, pady=5)
self.input_text = tk.Text(input_frame, height=4)
self.input_text.pack(fill=tk.X, pady=5)
self.input_text.bind("<Return>", self.on_enter_pressed)
# 底部按钮
btn_frame = ttk.Frame(input_frame)
btn_frame.pack(fill=tk.X)
ttk.Button(btn_frame, text="发送", command=self.send_message).pack(side=tk.RIGHT, padx=5)
ttk.Checkbutton(btn_frame, text="长文本模式", command=self.toggle_long_text).pack(side=tk.LEFT, padx=5)
def on_model_change(self, event):
self.model = self.model_var.get()
self.append_to_chat("系统", f"模型已切换为: {self.model}")
def toggle_long_text(self):
self.long_text_mode = not self.long_text_mode
status = "启用" if self.long_text_mode else "禁用"
self.append_to_chat("系统", f"长文本模式已{status}")
def on_enter_pressed(self, event):
if not event.state & 0x1: # 检查是否按下Control键
if not self.long_text_mode:
self.send_message()
return "break"
# 允许换行输入
return None
def append_to_chat(self, role, content):
self.chat_area.configure(state=tk.NORMAL)
self.chat_area.insert(tk.END, f"\n{role}: {content}\n")
self.chat_area.configure(state=tk.DISABLED)
self.chat_area.see(tk.END)
def save_history(self):
file_name = self.get_next_history_file_name()
with open(file_name, 'w', encoding='utf-8') as f:
json.dump(self.conversation_history, f, ensure_ascii=False, indent=4)
self.append_to_chat("系统", "历史记录已保存")
def clear_history(self):
self.conversation_history = [{"role": "system", "content": "You are a helpful assistant"}]
self.append_to_chat("系统", "对话历史已清空")
def load_history_dialog(self):
files = [f for f in os.listdir(HISTORY_FILE_DIR) if f.startswith(HISTORY_FILE_PREFIX)]
if not files:
messagebox.showinfo("提示", "没有历史记录文件")
return
dialog = tk.Toplevel(self.master)
dialog.title("选择历史记录")
lb = tk.Listbox(dialog, width=50, height=15)
lb.pack(padx=10, pady=10)
for f in files:
lb.insert(tk.END, f)
def on_select():
selected = lb.curselection()
if selected:
file_name = os.path.join(HISTORY_FILE_DIR, lb.get(selected[0]))
with open(file_name, 'r', encoding='utf-8') as f:
self.conversation_history = json.load(f)
self.refresh_chat_display()
dialog.destroy()
ttk.Button(dialog, text="加载", command=on_select).pack(pady=5)
def refresh_chat_display(self):
self.chat_area.configure(state=tk.NORMAL)
self.chat_area.delete(1.0, tk.END)
for msg in self.conversation_history:
if msg["role"] == "user":
self.chat_area.insert(tk.END, f"\n用户: {msg['content']}\n")
elif msg["role"] == "assistant":
self.chat_area.insert(tk.END, f"\n助手: {msg['content']}\n")
self.chat_area.configure(state=tk.DISABLED)
def send_message(self):
if self.is_streaming:
return
user_input = self.input_text.get("1.0", tk.END).strip()
if not user_input:
return
self.input_text.delete("1.0", tk.END)
self.conversation_history.append({"role": "user", "content": user_input})
self.append_to_chat("用户", user_input)
# 在新线程中处理API请求
self.is_streaming = True
threading.Thread(target=self.process_stream, args=(user_input,)).start()
def process_stream(self, user_input):
try:
stream = client.chat.completions.create(
model=self.model,
messages=self.conversation_history,
stream=True
)
response = []
reasoning = []
for chunk in stream:
if chunk.choices:
delta = chunk.choices[0].delta
if self.model == "deepseek-reasoner" and hasattr(delta, 'reasoning_content') and delta.reasoning_content:
reasoning.append(delta.reasoning_content)
self.update_reasoning(delta.reasoning_content)
if hasattr(delta, 'content') and delta.content:
response.append(delta.content)
self.update_response(delta.content)
full_response = ''.join(response)
self.conversation_history.append({"role": "assistant", "content": full_response})
except Exception as e:
self.append_to_chat("系统", f"错误: {str(e)}")
finally:
self.is_streaming = False
def update_reasoning(self, content):
self.master.after(0, lambda: self.append_to_chat("思考", content))
def update_response(self, content):
self.master.after(0, lambda: self.append_to_chat("助手", content))
# 历史文件管理相关方法
def get_next_history_file_name(self):
history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
if not history_files:
return os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}1.json")
latest_file = max(history_files, key=os.path.getctime)
latest_num = int(os.path.basename(latest_file).split("_")[1].split(".")[0])
return os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}{latest_num + 1}.json")
if __name__ == "__main__":
root = tk.Tk()
app = ChatGUI(root)
root.mainloop()
更多推荐
所有评论(0)