目录

一、引言

 二、准备工作

2.1安装 OpenAI SDK

2.2导入必要的库

 三、具体代码

1 历史记录管理

1.1 获取最新的历史记录文件

1.2 生成下一个历史记录文件名

1.3 保存历史记录

1.4 加载历史记录

1.5 删除历史记录

2.主程序逻辑

2.1 初始化对话历史

2.2 主循环

三、完整代码

四、腾讯云API接口

五、图形化界面GUI


一、引言

在当今的人工智能领域,自然语言处理(NLP)技术已经取得了显著的进展。OpenAI 的 GPT 系列模型是其中的佼佼者,而 DeepSeek 作为一个基于 OpenAI 的 API 服务,提供了强大的对话生成能力。本文将详细介绍如何使用OpenAI SDK 与 DeepSeek 进行对话,并深入解析每一段代码的实现细节。

 二、准备工作

2.1安装 OpenAI SDK

首先,我们需要安装 OpenAI 的 Python SDK。可以通过以下命令进行安装:

pip3 install openai

2.2导入必要的库

在代码的开头,我们导入了几个必要的 Python 库:

from openai import OpenAI
import os
import json
import glob

openai: 这是 OpenAI 的官方 Python SDK,用于与 OpenAI 的 API 进行交互。

os: 用于处理文件和目录路径。

json: 用于处理 JSON 数据的编码和解码。

glob: 用于查找符合特定模式的文件路径。

 三、具体代码

接下来,我们初始化 OpenAI 客户端,并指定 API Key 和基础 URL:

client = OpenAI(
    api_key="sk-xxxx", # 替换为你的API Key
    base_url="https://api.deepseek.com",
)

api_key: 这是你的 OpenAI API Key,用于验证身份。

base_url: 这是 DeepSeek 的 API 地址。

1 历史记录管理

为了保存和加载对话历史,我们定义了一些函数来管理历史记录文件。

1.1 获取最新的历史记录文件

    history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
    if not history_files:
        return None
    latest_file = max(history_files, key=os.path.getctime)
    return latest_file

 glob.glob: 查找所有符合模式的文件路径。

os.path.getctime: 获取文件的创建时间。

max: 返回创建时间最新的文件。

1.2 生成下一个历史记录文件名

def get_next_history_file_name():
    """生成下一个历史记录文件名"""
    history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
    if not history_files:
        return os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}1.json")
    latest_file = max(history_files, key=os.path.getctime)
    latest_num = int(os.path.basename(latest_file).split("_")[1].split(".")[0])
    return os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}{latest_num + 1}.json")

os.path.basename: 获取文件名。

split: 分割文件名以提取序号。

1.3 保存历史记录

def save_history(history):
    """保存历史记录到文件"""
    file_name = get_next_history_file_name()
    with open(file_name, 'w', encoding='utf-8') as f:
        json.dump(history, f, ensure_ascii=False, indent=4)
    print(f"历史记录已保存到 {file_name},并清空缓存。")

json.dump: 将 Python 对象序列化为 JSON 格式并写入文件。

1.4 加载历史记录

def load_history(file_name):
    """从指定文件加载历史记录"""
    with open(file_name, 'r', encoding='utf-8') as f:
        history = json.load(f)
        return history
    return []

json.load: 从文件中读取 JSON 数据并反序列化为 Python 对象。

1.5 删除历史记录

def delete_latest_history():
    """删除最新的历史记录文件"""
    latest_file = get_latest_history_file()
    if latest_file:
        os.remove(latest_file)
        print(f"已删除最新的历史记录文件:{latest_file}")
    else:
        print("没有历史记录文件可删除。")

os.remove: 删除指定文件。

2.主程序逻辑

2.1 初始化对话历史

conversation_history = [
    {"role": "system", "content": "You are a helpful assistant"}
]

conversation_history: 存储对话历史的列表,初始包含系统消息。

2.2 主循环

主循环负责处理用户输入、调用 API 并输出结果。

while True:
    try:
        # 获取用户输入
        user_input = input("请输入你的问题:")

        # 处理特殊命令
        if user_input.lower().strip() == "exit":
            break

        # 将用户输入添加到对话历史中
        conversation_history.append({"role": "user", "content": user_input.strip()})

        # 调用 API
        response = client.chat.completions.create(
            model="deepseek-chat",
            messages=conversation_history,
            stream=True
        )

        # 处理流式输出
        for chunk in response:
            if chunk.choices:
                delta = chunk.choices[0].delta
                if hasattr(delta, 'content') and delta.content is not None:
                    print(delta.content, end='', flush=True)

        # 将模型回复添加到对话历史中
        conversation_history.append({"role": "assistant", "content": model_response})

    except KeyboardInterrupt:
        print("\n程序已手动终止,您可以继续提问。")
        continue
    except Exception as e:
        print(f"发生错误:{e}")
        continue

client.chat.completions.create: 调用 DeepSeek API 生成对 话回复。

stream=True: 启用流式输出,逐块返回结果。

三、完整代码

# Please install OpenAI SDK first: `pip3 install openai`

from openai import OpenAI
import os
import json
import glob

# 初始化OpenAI客户端
client = OpenAI(
    api_key="sk-xxxxxx",  # 替换为你的API Key
    base_url="https://api.deepseek.com",
)

# 定义历史记录文件前缀和目录
HISTORY_FILE_PREFIX = "history_"
HISTORY_FILE_DIR = "history_records"

# 确保历史记录目录存在
if not os.path.exists(HISTORY_FILE_DIR):
    os.makedirs(HISTORY_FILE_DIR)

def get_latest_history_file():
    """获取最新的历史记录文件"""
    history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
    if not history_files:
        return None
    latest_file = max(history_files, key=os.path.getctime)
    return latest_file

def get_next_history_file_name():
    """生成下一个历史记录文件名"""
    history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
    if not history_files:
        return os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}1.json")
    latest_file = max(history_files, key=os.path.getctime)
    latest_num = int(os.path.basename(latest_file).split("_")[1].split(".")[0])
    return os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}{latest_num + 1}.json")

def save_history(history):
    """保存历史记录到文件"""
    file_name = get_next_history_file_name()
    with open(file_name, 'w', encoding='utf-8') as f:
        json.dump(history, f, ensure_ascii=False, indent=4)
    print(f"历史记录已保存到 {file_name},并清空缓存。")

def load_history(file_name):
    """从指定文件加载历史记录"""
    with open(file_name, 'r', encoding='utf-8') as f:
        history = json.load(f)
        return history
    return []

def delete_latest_history():
    """删除最新的历史记录文件"""
    latest_file = get_latest_history_file()
    if latest_file:
        os.remove(latest_file)
        print(f"已删除最新的历史记录文件:{latest_file}")
    else:
        print("没有历史记录文件可删除。")

def delete_all_history():
    """删除所有历史记录文件"""
    history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
    if history_files:
        for file in history_files:
            os.remove(file)
        print("已删除所有历史记录文件。")
    else:
        print("没有历史记录文件可删除。")

def load_specific_history(file_num):
    """加载指定序号的历史记录文件"""
    file_name = os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}{file_num}.json")
    if os.path.exists(file_name):
        history = load_history(file_name)
        print(f"加载的历史记录如下(极简版):")
        for entry in history:
            role = entry["role"]
            content = entry["content"]
            print(f"{role.capitalize()}: {content}")
        print("=" * 40)  # 分隔线
        return history
    else:
        print(f"未找到历史记录文件:{file_name}")
        return []

def main():
    # 初始化对话历史
    conversation_history = [
        {"role": "system", "content": "You are a helpful assistant"}
    ]
    model = "deepseek-chat"  # 默认模型
    long_text_mode = False  # 默认不启用长文本模式
    skip_processing = False  # 新增跳过处理标记

    while True:
        try:
            if skip_processing:  # 优先处理跳过标记
                skip_processing = False
                continue

            if long_text_mode:
                print("长文本模式已启用(输入 `ds` 提交内容,输入 `es` 返回普通模式):")
            else:
                print("请输入你的问题(输入 `exit` 退出,输入 `ds` 启用 长文本模式,输入 `r1` 切换至 deepseek-reasoner 模型,输入 `v3` 切换至 deepseek-chat 模型,输入 `coder` 切换至 deepseek-coder 模型,输入 CTRL+C 手动终止):")

            user_input = ""
            flag = False

            # 获取用户输入
            if long_text_mode:
                while True:
                    line = input()
                    if line.strip().lower() == "exit":
                        print("程序已退出。")
                        flag = True
                        break
                    if line.strip().lower() == "ds":  # 检测到 `ds`,提交内容
                        break
                    if line.strip().lower() == "es":  # 检测到 `es`,返回普通模式
                        long_text_mode = False
                        print("已返回普通模式。")
                        skip_processing = True  # 设置跳过标记
                        break
                    if line.strip().lower() in ["r1", "v3", "coder"]:  # 在长文本模式下切换模型
                        if line.strip().lower() == "r1":
                            model = "deepseek-reasoner"
                            print(f"已切换模型为:{model}")
                        elif line.strip().lower() == "v3":
                            model = "deepseek-chat"
                            print(f"已切换模型为:{model}")
                        elif line.strip().lower() == "coder":
                            model = "deepseek-coder"
                            print(f"已切换模型为:{model}")
                        continue
                    user_input += line + "\n"  # 将每行输入添加到用户输入中
            else:
                user_input = input()

            if flag:
                break

            # 跳过处理标记检查
            if skip_processing:
                skip_processing = False
                continue

            # 如果用户输入 `uh`,加载最新的历史记录并打印
            if user_input.lower().strip() == "uh":
                latest_file = get_latest_history_file()
                if latest_file:
                    conversation_history = load_history(latest_file)
                    print(f"加载的历史记录如下({latest_file}):")
                    for entry in conversation_history:
                        role = entry["role"]
                        content = entry["content"]
                        print(f"{role.capitalize()}: {content}")
                    print("=" * 40)  # 分隔线
                else:
                    print("没有历史记录文件可加载。")
                continue

            # 如果用户输入 `dh`,清空历史记录
            if user_input.lower().strip() == "dh":
                conversation_history = [
                    {"role": "system", "content": "You are a helpful assistant"}
                ]
                print("历史记录已清空。")
                continue

            # 如果用户输入 `sh`,保存历史记录并清空缓存
            if user_input.lower().strip() == "sh":
                save_history(conversation_history)
                conversation_history = [
                    {"role": "system", "content": "You are a helpful assistant"}
                ]
                print("历史记录已清空。")
                continue

            # 如果用户输入 `dah`,删除最新的历史记录文件
            if user_input.lower().strip() == "dah":
                delete_latest_history()
                continue

            # 如果用户输入 `dallh`,删除所有历史记录文件
            if user_input.lower().strip() == "dallh":
                delete_all_history()
                continue

            # 如果用户输入 `ch x`,加载指定序号的历史记录并打印
            if user_input.lower().strip().startswith("ch "):
                try:
                    file_num = int(user_input.strip().split()[1])  # 提取序号并转换为整数
                    if file_num > 0:
                        conversation_history = load_specific_history(file_num)
                    else:
                        print("序号必须为正整数。")
                except (ValueError, IndexError):
                    print("请输入正确的格式:`ch x`,其中 x 为正整数。")
                continue

            # 如果用户输入 `es`,跳过本次循环
            if user_input.lower().strip() == "es":
                continue

            # 如果用户输入为空,跳过本次循环
            if not user_input.strip():
                print("输入为空,请重新输入。")
                continue

            # 如果用户输入 `exit`,则退出程序
            if user_input.lower().strip() == "exit":
                print("程序已退出。")
                break

            # 如果用户输入 `ds`,启用长文本模式并继续循环
            if user_input.lower().strip() == "ds" and not long_text_mode:
                long_text_mode = True
                print("已启用长文本模式。")
                continue

            # 如果用户输入 `r1` 或 `v3` 或 `coder`,切换模型并继续循环
            if user_input.lower().strip() == "r1":
                model = "deepseek-reasoner"
                print(f"已切换模型为:{model}")
                continue
            elif user_input.lower().strip() == "v3":
                model = "deepseek-chat"
                print(f"已切换模型为:极简版")
                continue
            elif user_input.lower().strip() == "coder":
                model = "deepseek-coder"
                print(f"已切换模型为:{model}")
                continue

            # 将用户输入添加到对话历史中
            if user_input.strip():  # 确保输入内容不为空
                conversation_history.append({"role": "user", "content": user_input.strip()})

            # 创建聊天完成请求,启用流式输出
            stream = client.chat.completions.create(
                model=model,  # 使用当前选择的模型
                messages=conversation_history,  # 使用对话历史作为上下文
                stream=True
            )

            # 初始化模型回复内容和思考过程
            reasoning_content = ""
            model_response = ""
            is_answering = False  # 标记是否开始输出完整回复
            is_reasoning_started = False  # 标记是否已经开始输出思考过程

            # 处理流式输出
            for chunk in stream:
                if chunk.choices:
                    delta = chunk.choices[0].delta

                    # 检查是否有思考过程内容(仅对 deepseek-reasoner 模型)
                    if model == "deepseek-reasoner" and hasattr(delta, 'reasoning_content') and delta.reasoning_content is not None:
                        if not is_reasoning_started:  # 如果是第一次输出思考过程,添加分隔线
                            print("=" * 20 + " 思考过程 " + "=" * 20)
                            is_reasoning_started = True
                        reasoning = delta.reasoning_content
                        print(reasoning, end='', flush=True)  # 流式输出思考过程
                        reasoning_content += reasoning

                    # 检查是否有回复内容
                    if hasattr(delta, 'content') and delta.content is not None:
                        if not is_answering:  # 如果尚未开始输出完整回复
                            print("\n" + "=" * 20 + " 完整回复 " + "=" * 20 + "\n")
                            is_answering = True
                        content = delta.content
                        print(content, end='', flush=True)  # 流式输出回复内容
                        model_response += content

            # 将模型回复添加到对话历史中
            conversation_history.append({"role": "assistant", "content": model_response})

            print("\n" + "=" * 40 + "\n")  # 分隔线

        except KeyboardInterrupt:  # 捕获用户按下 Ctrl+C
            print("\n程序已手动终止,您可以继续提问。")
            continue  # 继续主循环
        except Exception as e:
            print(f"发生错误:{e}")
            continue  # 继续主循环

if __name__ == "__main__":
    main()

四、腾讯云API接口

与上面的类似

完整代码

import os
import json
from openai import OpenAI
import glob

# 初始化OpenAI客户端
client = OpenAI(
    api_key="sk-xxxxxxx",  # 替换为你的API Key
    base_url="https://api.lkeap.cloud.tencent.com/v1",
)

# 定义历史记录文件前缀和目录
HISTORY_FILE_PREFIX = "history_"
HISTORY_FILE_DIR = "history_records"

# 确保历史记录目录存在
if not os.path.exists(HISTORY_FILE_DIR):
    os.makedirs(HISTORY_FILE_DIR)

def get_latest_history_file():
    """获取最新的历史记录文件"""
    history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
    if not history_files:
        return None
    latest_file = max(history_files, key=os.path.getctime)
    return latest_file

def get_next_history_file_name():
    """生成下一个历史记录文件名"""
    history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
    if not history_files:
        return os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}1.json")
    latest_file = max(history_files, key=os.path.getctime)
    latest_num = int(os.path.basename(latest_file).split("_")[1].split(".")[0])
    return os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}{latest_num + 1}.json")

def save_history(history):
    """保存历史记录到文件"""
    file_name = get_next_history_file_name()
    with open(file_name, 'w', encoding='utf-8') as f:
        json.dump(history, f, ensure_ascii=False, indent=4)
    print(f"历史记录已保存到 {file_name},并清空缓存。")

def load_history(file_name):
    """从指定文件加载历史记录"""
    with open(file_name, 'r', encoding='utf-8') as f:
        history = json.load(f)
        return history
    return []

def delete_latest_history():
    """删除最新的历史记录文件"""
    latest_file = get_latest_history_file()
    if latest_file:
        os.remove(latest_file)
        print(f"已删除最新的历史记录文件:{latest_file}")
    else:
        print("没有历史记录文件可删除。")

def delete_all_history():
    """删除所有历史记录文件"""
    history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
    if history_files:
        for file in history_files:
            os.remove(file)
        print("已删除所有历史记录文件。")
    else:
        print("没有历史记录文件可删除。")

def load_specific_history(file_num):
    """加载指定序号的历史记录文件"""
    file_name = os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}{file_num}.json")
    if os.path.exists(file_name):
        history = load_history(file_name)
        print(f"加载的历史记录如下({file_name}):")
        for entry in history:
            role = entry["role"]
            content = entry["content"]
            print(f"{role.capitalize()}: {content}")
        print("=" * 40)  # 分隔线
        return history
    else:
        print(f"未找到历史记录文件:{file_name}")
        return []

def main():
    # 初始化对话历史
    conversation_history = []
    model = "deepseek-r1"  # 默认模型
    long_text_mode = False  # 默认不启用长文本模式
    skip_processing = False  # 新增跳过处理标记

    while True:
        try:
            if skip_processing:  # 优先处理跳过标记
                skip_processing = False
                continue

            if long_text_mode:
                print("长文本模式已启用(输入 `ds` 提交内容,输入 `es` 返回普通模式):")
            else:
                print("请输入你的问题(输入 `exit` 退出,输入 `ds` 启用 长文本模式,输入v3切换至deepseek-v3模型,r1亦然,输入CTRL+C手动终止):")

            user_input = ""
            flag = False

            # 获取用户输入
            if long_text_mode:
                while True:
                    line = input()
                    if line.strip().lower() == "exit":
                        print("程序已退出。")
                        flag = True
                        break
                    if line.strip().lower() == "ds":  # 检测到 `ds`,提交内容
                        break
                    if line.strip().lower() == "es":  # 检测到 `es`,返回普通模式
                        long_text_mode = False
                        print("已返回普通模式。")
                        skip_processing = True  # 设置跳过标记
                        break
                    if line.strip().lower() in ["r1", "v3"]:  # 在长文本模式下切换模型
                        if line.strip().lower() == "r1":
                            model = "deepseek-r1"
                            print(f"已切换模型为:{model}")
                        elif line.strip().lower() == "v3":
                            model = "deepseek-v3"
                            print(f"已切换模型为:{model}")
                        continue
                    user_input += line + "\n"  # 将每行输入添加到用户输入中
            else:
                user_input = input()

            if flag:
                break

            # 跳过处理标记检查
            if skip_processing:
                skip_processing = False
                continue

            # 如果用户输入 `uh`,加载最新的历史记录并打印
            if user_input.lower().strip() == "uh":
                latest_file = get_latest_history_file()
                if latest_file:
                    conversation_history = load_history(latest_file)
                    print(f"加载的历史记录如下({latest_file}):")
                    for entry in conversation_history:
                        role = entry["role"]
                        content = entry["content"]
                        print(f"{role.capitalize()}: {content}")
                    print("=" * 40)  # 分隔线
                else:
                    print("没有历史记录文件可加载。")
                continue

            # 如果用户输入 `dh`,清空历史记录
            if user_input.lower().strip() == "dh":
                conversation_history = []
                print("历史记录已清空。")
                continue

            # 如果用户输入 `sh`,保存历史记录并清空缓存
            if user_input.lower().strip() == "sh":
                save_history(conversation_history)
                conversation_history = []
                continue

            # 如果用户输入 `dah`,删除最新的历史记录文件
            if user_input.lower().strip() == "dah":
                delete_latest_history()
                continue

            # 如果用户输入 `dallh`,删除所有历史记录文件
            if user_input.lower().strip() == "dallh":
                delete_all_history()
                continue

            # 如果用户输入 `ch x`,加载指定序号的历史记录并打印
            if user_input.lower().strip().startswith("ch "):
                try:
                    file_num = int(user_input.strip().split()[1])  # 提取序号并转换为整数
                    if file_num > 0:
                        conversation_history = load_specific_history(file_num)
                    else:
                        print("序号必须为正整数。")
                except (ValueError, IndexError):
                    print("请输入正确的格式:`ch x`,其中 x 为正整数。")
                continue

            # 如果用户输入 `es`,跳过本次循环
            if user_input.lower().strip() == "es":
                continue

            # 如果用户输入为空,跳过本次循环
            if not user_input.strip():
                print("输入为空,请重新输入。")
                continue

            # 如果用户输入 `exit`,则退出程序
            if user_input.lower().strip() == "exit":
                print("程序已退出。")
                break

            # 如果用户输入 `ds`,启用长文本模式并继续循环
            if user_input.lower().strip() == "ds" and not long_text_mode:
                long_text_mode = True
                print("已启用长文本模式。")
                continue

            # 如果用户输入 `r1` 或 `v3`,切换模型并继续循环
            if user_input.lower().strip() == "r1":
                model = "deepseek-r1"
                print(f"已切换模型为:{model}")
                continue
            elif user_input.lower().strip() == "v3":
                model = "deepseek-v3"
                print(f"已切换模型为:{model}")
                continue

            # 将用户输入添加到对话历史中
            if user_input.strip():  # 确保输入内容不为空
                conversation_history.append({"role": "user", "content": user_input.strip()})

            # 创建聊天完成请求,启用流式输出
            stream = client.chat.completions.create(
                model=model,  # 使用当前选择的模型
                messages=conversation_history,  # 使用对话历史作为上下文
                stream=True
            )

            # 如果模型是 deepseek-v3,不显示思考过程
            if model != "deepseek-v3":
                print("\n" + "=" * 20 + "思考过程" + "=" * 20 + "\n")

            # 初始化模型回复内容和思考过程
            reasoning_content = ""
            model_response = ""
            is_answering = False  # 标记是否开始输出完整回复

            # 处理流式输出
            for chunk in stream:
                if chunk.choices:
                    delta = chunk.choices[0].delta

                    # 检查是否有思考过程内容
                    if hasattr(delta, 'reasoning_content') and delta.reasoning_content is not None and model != "deepseek-v3":
                        reasoning = delta.reasoning_content
                        print(reasoning, end='', flush=True)  # 流式输出思考过程
                        reasoning_content += reasoning

                    # 检查是否有回复内容
                    if hasattr(delta, 'content') and delta.content is not None:
                        if not is_answering:  # 如果尚未开始输出完整回复
                            print("\n" + "=" * 20 + "完整回复" + "=" * 20 + "\n")
                            is_answering = True
                        content = delta.content
                        print(content, end='', flush=True)  # 流式输出回复内容
                        model_response += content

            # 将模型回复添加到对话历史中
            conversation_history.append({"role": "assistant", "content": model_response})

            print("\n" + "=" * 40 + "\n")  # 分隔线

        except KeyboardInterrupt:  # 捕获用户按下 Ctrl+C
            print("\n程序已手动终止,您可以继续提问。")
            continue  # 继续主循环
        except Exception as e:
            print(f"发生错误:{e}")
            continue  # 继续主循环

if __name__ == "__main__":
    main()

五、图形化界面GUI

下面代码仅仅作为参考 完整代码请看:Python调用DeepSeek API实现图形化窗口

from openai import OpenAI
import json
import glob
import threading
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox, filedialog

# 初始化OpenAI客户端
client = OpenAI(
    api_key="OPENAI_API_KEY",  
    base_url="https://api.deepseek.com",
)

#定义历史记录文件前缀和目录
HISTORY_FILE_PREFIX = "history_"
HISTORY_FILE_DIR = "history_records"

# 确保历史记录目录存在
if not os.path.exists(HISTORY_FILE_DIR):
    os.makedirs(HISTORY_FILE_DIR)

class ChatGUI:
    def __init__(self, master):
        self.master = master
        self.model = "deepseek-chat"
        self.long_text_mode = False
        self.conversation_history = [{"role": "system", "content": "You are a helpful assistant"}]
        self.is_streaming = False
        self.setup_ui()

    def setup_ui(self):
        self.master.title("DeepSeek Chat GUI")
        self.master.geometry("800x600")

        # 顶部控制栏
        control_frame = ttk.Frame(self.master)
        control_frame.pack(fill=tk.X, padx=5, pady=5)

        self.model_var = tk.StringVar(value=self.model)
        model_combobox = ttk.Combobox(control_frame, textvariable=self.model_var,
                                    values=["deepseek-chat", "deepseek-reasoner", "deepseek-coder"], width=15)
        model_combobox.pack(side=tk.LEFT, padx=5)
        model_combobox.bind("<<ComboboxSelected>>", self.on_model_change)

        ttk.Button(control_frame, text="保存历史", command=self.save_history).pack(side=tk.LEFT, padx=5)
        ttk.Button(control_frame, text="清空历史", command=self.clear_history).pack(side=tk.LEFT, padx=5)
        ttk.Button(control_frame, text="加载历史", command=self.load_history_dialog).pack(side=tk.LEFT, padx=5)

        # 对话显示区域
        self.chat_area = scrolledtext.ScrolledText(self.master, wrap=tk.WORD, state=tk.DISABLED)
        self.chat_area.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)

        # 输入区域
        input_frame = ttk.Frame(self.master)
        input_frame.pack(fill=tk.X, padx=5, pady=5)

        self.input_text = tk.Text(input_frame, height=4)
        self.input_text.pack(fill=tk.X, pady=5)
        self.input_text.bind("<Return>", self.on_enter_pressed)

        # 底部按钮
        btn_frame = ttk.Frame(input_frame)
        btn_frame.pack(fill=tk.X)

        ttk.Button(btn_frame, text="发送", command=self.send_message).pack(side=tk.RIGHT, padx=5)
        ttk.Checkbutton(btn_frame, text="长文本模式", command=self.toggle_long_text).pack(side=tk.LEFT, padx=5)

    def on_model_change(self, event):
        self.model = self.model_var.get()
        self.append_to_chat("系统", f"模型已切换为: {self.model}")

    def toggle_long_text(self):
        self.long_text_mode = not self.long_text_mode
        status = "启用" if self.long_text_mode else "禁用"
        self.append_to_chat("系统", f"长文本模式已{status}")

    def on_enter_pressed(self, event):
        if not event.state & 0x1:  # 检查是否按下Control键
            if not self.long_text_mode:
                self.send_message()
                return "break"
        # 允许换行输入
        return None

    def append_to_chat(self, role, content):
        self.chat_area.configure(state=tk.NORMAL)
        self.chat_area.insert(tk.END, f"\n{role}: {content}\n")
        self.chat_area.configure(state=tk.DISABLED)
        self.chat_area.see(tk.END)

    def save_history(self):
        file_name = self.get_next_history_file_name()
        with open(file_name, 'w', encoding='utf-8') as f:
            json.dump(self.conversation_history, f, ensure_ascii=False, indent=4)
        self.append_to_chat("系统", "历史记录已保存")

    def clear_history(self):
        self.conversation_history = [{"role": "system", "content": "You are a helpful assistant"}]
        self.append_to_chat("系统", "对话历史已清空")

    def load_history_dialog(self):
        files = [f for f in os.listdir(HISTORY_FILE_DIR) if f.startswith(HISTORY_FILE_PREFIX)]
        if not files:
            messagebox.showinfo("提示", "没有历史记录文件")
            return

        dialog = tk.Toplevel(self.master)
        dialog.title("选择历史记录")

        lb = tk.Listbox(dialog, width=50, height=15)
        lb.pack(padx=10, pady=10)

        for f in files:
            lb.insert(tk.END, f)

        def on_select():
            selected = lb.curselection()
            if selected:
                file_name = os.path.join(HISTORY_FILE_DIR, lb.get(selected[0]))
                with open(file_name, 'r', encoding='utf-8') as f:
                    self.conversation_history = json.load(f)
                self.refresh_chat_display()
                dialog.destroy()

        ttk.Button(dialog, text="加载", command=on_select).pack(pady=5)

    def refresh_chat_display(self):
        self.chat_area.configure(state=tk.NORMAL)
        self.chat_area.delete(1.0, tk.END)
        for msg in self.conversation_history:
            if msg["role"] == "user":
                self.chat_area.insert(tk.END, f"\n用户: {msg['content']}\n")
            elif msg["role"] == "assistant":
                self.chat_area.insert(tk.END, f"\n助手: {msg['content']}\n")
        self.chat_area.configure(state=tk.DISABLED)

    def send_message(self):
        if self.is_streaming:
            return

        user_input = self.input_text.get("1.0", tk.END).strip()  
        if not user_input:
            return

        self.input_text.delete("1.0", tk.END)
        self.conversation_history.append({"role": "user", "content": user_input})
        self.append_to_chat("用户", user_input)

        # 在新线程中处理API请求
        self.is_streaming = True
        threading.Thread(target=self.process_stream, args=(user_input,)).start()

    def process_stream(self, user_input):
        try:
            stream = client.chat.completions.create(
                model=self.model,
                messages=self.conversation_history,
                stream=True
            )

            response = []
            reasoning = []
            for chunk in stream:
                if chunk.choices:
                    delta = chunk.choices[0].delta

                    if self.model == "deepseek-reasoner" and hasattr(delta, 'reasoning_content') and delta.reasoning_content:  
                        reasoning.append(delta.reasoning_content)
                        self.update_reasoning(delta.reasoning_content)

                    if hasattr(delta, 'content') and delta.content:
                        response.append(delta.content)
                        self.update_response(delta.content)

            full_response = ''.join(response)
            self.conversation_history.append({"role": "assistant", "content": full_response})

        except Exception as e:
            self.append_to_chat("系统", f"错误: {str(e)}")
        finally:
            self.is_streaming = False 

    def update_reasoning(self, content):
        self.master.after(0, lambda: self.append_to_chat("思考", content))

    def update_response(self, content):
        self.master.after(0, lambda: self.append_to_chat("助手", content))

    # 历史文件管理相关方法
    def get_next_history_file_name(self):
        history_files = glob.glob(os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}*.json"))
        if not history_files:
            return os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}1.json")
        latest_file = max(history_files, key=os.path.getctime)
        latest_num = int(os.path.basename(latest_file).split("_")[1].split(".")[0])
        return os.path.join(HISTORY_FILE_DIR, f"{HISTORY_FILE_PREFIX}{latest_num + 1}.json")

if __name__ == "__main__":
    root = tk.Tk()
    app = ChatGUI(root)
    root.mainloop()

 

 

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐