获取Siliconflow API Key

近期Deepseek官网频繁出现“服务器繁忙,请稍后再试”,而近期各大厂商都纷纷推出Deepseek满血版API接口,实测可用。
Siliconflow作为集合顶尖大模型的一站式云服务平台,致力于为开发者提供更快、更全面、体验更丝滑的模型 API,助力开发者和企业聚焦产品创新,无须担心产品大规模推广所带来的高昂算力成本,注册链接: https://cloud.siliconflow.cn/i/rIJNnSJP 新注册用户免费送14元,几乎用不完。

在这里插入图片描述

创建API Key

在这里插入图片描述
创建完成,后续需要使用,直接复制即可

在这里插入图片描述
更多详细教程,可以在用户手册中学习 https://docs.siliconflow.cn/cn/userguide/introduction

在这里插入图片描述

Streamlit搭建DeepSeek应用

实现效果如下:

在这里插入图片描述

自定义CSS样式

# 自定义CSS样式
def inject_custom_css():
    st.markdown("""
    <style>
        /* 主界面样式 */
        .stApp {
            background: #f8f9fa;
        }
        
        /* 模式选择器 */
        .mode-selector {
            background: #ffffff;
            border-radius: 12px;
            padding: 1rem;
            box-shadow: 0 2px 8px rgba(0,0,0,0.1);
        }
        
        /* 文件上传区域 */
        .uploader-box {
            border: 2px dashed #6366f1;
            border-radius: 12px;
            padding: 2rem;
            text-align: center;
        }
        
        /* 增强型消息气泡 */
        .enhanced-message {
            border-radius: 1rem;
            padding: 1.5rem;
            margin: 1rem 0;
            transition: all 0.3s ease;
        }
    </style>
    """, unsafe_allow_html=True)

初始化会话状态

# 初始化会话状态
def init_session_state():
    # 定义默认状态字典,包含所有需要维护的状态变量及其初始值
    default_state = {
        # 当前对话的消息列表,存储字典格式的消息记录
        # 每个消息包含 role(user/assistant)和 content
        "messages": [],
        # 历史对话存档列表,每个元素为一个完整对话的字典
        # 结构示例:{
        #   "id": "唯一标识", 
        #   "title": "对话标题",
        #   "messages": [消息列表],
        #   "timestamp": "时间戳"
        # }
        "conversations": [],
        # 当前活动对话的索引(指向conversations列表的索引位置)
        # None表示当前处于新建对话状态
        "current_conv": None,
        # 大模型 API 密钥存储,用于接口认证
        # 初始为空字符串,用户可在界面输入或从secrets加载
        "api_key": "",
        # 系统级提示词,用于控制AI的基本行为模式
        # 可在设置中修改,影响所有对话
        "system_prompt": "你是一个专业的人工智能助手",
        # 当前选择的对话模式,影响模型参数和响应方式
        # 可选值:"标准模式"/"深度思考"/"创意模式"/"代码专家"
        "current_mode": "标准模式",
        # 是否启用联网搜索功能的标志位
        # True表示自动补充网络搜索结果,False则禁用
        "web_search": False,
        # 已上传文件列表,存储字典格式的文件信息
        # 每个文件包含:id(唯一标识)、name(文件名)、content(提取内容)
        "uploaded_files": [],
        # 最近一次搜索的结果缓存
        # 存储格式化后的搜索结果文本,用于构建上下文提示
        "search_results": []
    }
    # 遍历默认状态字典,初始化会话状态
    # 采用先检查再赋值的策略,保留用户已修改的状态值
    for key, value in default_state.items():
        # 仅当会话状态中不存在该键时进行初始化
        # 确保不会覆盖用户操作产生的状态变化
        if key not in st.session_state:
            # 将默认值赋给会话状态
            # 使用浅拷贝避免引用类型数据共享问题
            st.session_state[key] = value.copy() if isinstance(value, (list, dict)) else value

侧边栏组件

# 侧边栏组件
def render_sidebar():
    with st.sidebar:
        st.header("⚙️ 控制中心")
        
        # API密钥输入
        api_key = st.text_input(
            label="大模型 API密钥", # 输入框标签
            type="password", # 密码类型(隐藏输入内容)
            value=st.session_state.api_key, # 绑定会话状态中的API密钥
            help="从下面链接中获取API密钥", # 悬浮提示信息
            key="api_key_input" # 唯一组件标识
        )
        "[获取最新大模型 API key](https://cloud.siliconflow.cn/i/rIJNnSJP)" 

        # 更新会话状态(优先使用用户输入,否则尝试从secrets获取) secrets文件即secrets.toml,创建在.streamlit目录下,内容为OpenAI_key = "xxxxxx"
        st.session_state.api_key = api_key or st.secrets.get("OpenAI_key")
        
        # 新对话按钮
        if st.button("✨ 开启新对话", 
                    use_container_width=True, # 占满容器宽度
                    help="清空当前对话并创建新会话" ):
            save_current_conversation() # 持久化当前对话
            reset_conversation() # 重置会话状态
        
        st.divider() # 区域分隔线
        
        # 历史对话管理
        st.subheader("📜 对话历史")
        render_conversation_history() # 渲染历史对话列表组件
        
        st.divider()

        st.header("🛠️ 系统设置")
        
        # 模式选择
        st.session_state.current_mode = st.selectbox(
            label="对话模式",
            options=["标准模式", "深度思考", "创意模式", "代码专家"],
            index=0,
            help="根据任务类型选择响应模式:\n"
                 "- 标准模式:通用对话\n"
                 "- 深度思考:复杂问题分析\n"
                 "- 创意模式:内容生成\n"
                 "- 代码专家:编程相关"
        )
        
        # 联网搜索开关
        st.session_state.web_search = st.checkbox(
            label="启用联网搜索",
            value=st.session_state.web_search,  # 绑定状态值
            help="当知识库信息不足时,自动通过搜索引擎获取最新网络信息"
        )
        
        # 文件上传
        st.divider()
        st.subheader("📁 文件管理")
        uploaded_files = st.file_uploader(
            label="上传文档(支持PDF/Word/图片/TXT)",
            type=["pdf", "docx", "txt", "png", "jpg"],
            accept_multiple_files=True, # 启用多文件选择
            key="file_uploader",         # 唯一组件标识
            help="最大单个文件50MB,支持OCR识别图片文字"  # 上传限制说明
        )

        # 处理上传文件(文本提取)
        process_uploaded_files(uploaded_files)
        
        # 显示已上传文件
        if st.session_state.uploaded_files:
            st.subheader("已上传文件")
            # 遍历每个已上传文件
            for file in st.session_state.uploaded_files:
                # 创建2列布局(3:1比例)
                cols = st.columns([3,1])
                # 左侧列显示文件名和图标
                with cols[0]:
                    st.caption(f"📄 {file['name']}")  # 带图标的文件名
                # 右侧列显示删除按钮
                with cols[1]:
                    # 动态生成唯一删除按钮
                    if st.button(
                        "×",  # 按钮符号
                        key=f"del_{file['id']}",  # 唯一键(基于文件ID)
                        help=f"删除 {file['name']}"  # 悬停提示
                    ):
                        # 点击时触发删除操作
                        delete_uploaded_file(file['id'])

对话处理

def delete_uploaded_file(file_id):
    # 过滤出需要保留的文件
    st.session_state.uploaded_files = [
        f for f in st.session_state.uploaded_files
        if f['id'] != file_id
    ]

# 渲染历史对话列表
def render_conversation_history():
    # 空状态处理:当没有历史对话时显示提示
    if len(st.session_state.conversations) == 0:
        st.caption("暂无历史对话")
        return
    # 遍历所有历史对话记录(带索引枚举)
    for idx, conv in enumerate(st.session_state.conversations):
        # 创建三列布局(比例1:0.2:0.2)
        # 布局说明:
        # [0] 对话标题按钮(主要区域)
        # [1] 空白间隔(占位缓冲)
        # [2] 删除按钮(操作区域)
        cols = st.columns([1, 0.2, 0.2])
        # 判断是否为当前活动对话(用于视觉反馈)
        is_active = idx == st.session_state.current_conv
        
        # 对话标题按钮
        with cols[0]:
            # 动态按钮样式(选中状态背景色)
            btn_style = "background: #f0f4ff;" if is_active else ""
            # 创建带样式的对话项按钮
            if st.button(
                f"🗨️ {conv['title']}",  # 带图标的标题文本
                key=f"conv_{idx}",      # 唯一键(基于索引)
                use_container_width=True,  # 占满列宽
                help=f"最后更新: {conv['time']}",  # 悬浮显示更新时间
                type="primary" if is_active else "secondary"  # 颜色类型
            ):
                # 点击回调:加载指定索引的对话
                load_conversation(idx)
        
        # 删除按钮
        with cols[2]:
            if st.button(
                "🗑️",  # 垃圾桶图标
                key=f"del_{idx}",  # 唯一键(基于索引)
                help="永久删除此对话"  # 悬浮提示
            ):
                # 点击回调:删除指定索引的对话
                delete_conversation(idx)

# 保存当前对话
def save_current_conversation():
    # 检查当前是否存在有效对话消息
    if len(st.session_state.messages) > 0:
        new_conv = {
            # 唯一标识:使用精确到秒的时间戳(示例:"20240220143015")
            "id": datetime.now().strftime("%Y%m%d%H%M%S"),             
            # 自动生成的对话标题(通过分析首条用户消息)
            "title": generate_conversation_title(),             
            # 格式化时间戳(示例:"02/20 14:30")
            "time": datetime.now().strftime("%m/%d %H:%M"),          
            # 深拷贝当前消息列表(避免引用问题)
            "messages": st.session_state.messages.copy(),            
            # 深拷贝已上传文件列表(保留文件上下文)
            "files": st.session_state.uploaded_files.copy(),       
            # 记录对话使用的模式(影响后续加载时的模式还原)
            "mode": st.session_state.current_mode 
        }
        
        # 更新或新增对话记录
        if st.session_state.current_conv is not None:
            # 更新模式:覆盖现有对话记录
            # current_conv 是 conversations 列表的有效索引
            st.session_state.conversations[st.session_state.current_conv] = new_conv
        else:
            # 新增模式:将新对话追加到历史记录末尾
            st.session_state.conversations.append(new_conv)

# 生成对话标题
def generate_conversation_title():
    if len(st.session_state.messages) >= 2:
        # 查找首条用户消息(生成器表达式提高效率)
        first_user_msg = next(
            (m["content"] for m in st.session_state.messages if m["role"] == "user"),
            "新对话" # 默认值(找不到用户消息时使用)
        )
        # 智能截断处理(避免截断半个中文字符)
        if len(first_user_msg) > 18:
            # 截取前18字符(中文安全)
            return first_user_msg[:18] + "..."
        else:
            return first_user_msg
    return "新对话"  # 消息不足时的默认标题

# 加载历史对话
def load_conversation(index):
    # 索引范围校验(防止越界访问)
    if 0 <= index < len(st.session_state.conversations):
        # 获取目标对话记录
        conv = st.session_state.conversations[index]      
        # 加载核心数据(深拷贝防止引用问题)
        st.session_state.messages = list(conv["messages"])  # 强制列表类型     
        # 加载关联文件(兼容无文件记录的旧对话)
        st.session_state.uploaded_files = list(conv.get("files", []))      
        # 恢复对话模式(默认"标准模式"防止模式字段缺失)
        st.session_state.current_mode = str(conv.get("mode", "标准模式"))
        # 更新当前对话指针
        st.session_state.current_conv = int(index)  # 确保索引为整数类型

# 删除对话
def delete_conversation(index):
    # 索引有效性检查(防止误删)
    if 0 <= index < len(st.session_state.conversations):
        # 删除目标对话
        del st.session_state.conversations[index]
        
        # 若删除的是当前对话,执行状态重置
        if st.session_state.current_conv == index:
            reset_conversation()  # 调用统一重置函数
            
            # 处理索引指针(因列表变动需特殊处理)
            if len(st.session_state.conversations) > 0:
                # 自动指向相邻对话(优先前一条)
                new_index = max(0, index-1)
                load_conversation(new_index)
            else:
                # 无历史对话时完全重置
                st.session_state.current_conv = None

# 重置对话
def reset_conversation():
    # 清空消息记录(保留系统提示词)
    st.session_state.messages = []
    # 清理上传文件(释放内存)
    st.session_state.uploaded_files = []
    # 重置对话指针(标记为新对话)
    st.session_state.current_conv = None

文件处理

# 文件处理函数
def process_uploaded_files(uploaded_files):
    MAX_FILE_SIZE = 50 * 1024 * 1024  # 50MB
    for file in uploaded_files:
        # 检查文件是否已存在(通过唯一file_id校验)
        # 使用any()进行存在性检查,避免重复处理相同文件
        if any(f["id"] == file.file_id for f in st.session_state.uploaded_files):
            continue # 跳过已处理文件

        # 验证文件大小
        if file.size > MAX_FILE_SIZE:
            st.warning(f"文件 {file.name} 超过{MAX_FILE_SIZE//1024//1024}MB限制,已跳过")
            continue

        # 初始化文件内容容器    
        file_content = ""

        try:
            if file.type == "application/pdf":
                # 使用PyPDF2解析PDF文本
                pdf_reader = PyPDF2.PdfReader(file)
                # 逐页提取文本并拼接(过滤空页)
                file_content = "\n".join([
                    page.extract_text() 
                    for page in pdf_reader.pages 
                    if page.extract_text().strip()
                ])
            elif file.type == "text/plain":
                # 读取字节流并解码为UTF-8
                # 注意:可能需处理其他编码格式(如GBK)
                file_content = str(file.read(), "utf-8")
            elif file.type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
                # 使用python-docx解析.docx格式
                doc = docx.Document(file)
                # 提取所有段落文本
                file_content = "\n".join([
                    para.text 
                    for para in doc.paragraphs 
                    if para.text.strip()
                ])
            elif file.type.startswith("image/"):
                # 使用PIL加载图片
                image = Image.open(file)
                # 使用pytesseract进行OCR识别(需安装Tesseract)
                # 建议配置语言参数:lang='chi_sim+eng'
                file_content = pytesseract.image_to_string(image)
                
            # 构建文件信息字典(限制内容长度)
            st.session_state.uploaded_files.append({
                "id": file.file_id,      # Streamlit生成的唯一文件ID
                "name": file.name,       # 原始文件名
                "content": file_content[:5000]  # 截断前5000字符防止内存溢出
            })
        except Exception as e:
            # 异常处理(捕获所有可能错误)
            st.error(f"处理文件 {file.name} 时出错: {str(e)}")

联网搜索

# 联网搜索功能
def perform_web_search(query):
    try:
        # 搜索引擎API
        search_api = "https://search-for-llmapi.dawne.cn/with-search/v1"
        params = {
            "q": query,  # 用户搜索词
            "api_key": st.session_state.api_key,
            "limit": 3  # 限制返回结果数
        }
        # 发送GET请求到搜索API
        response = requests.get(search_api, params=params)
        # 解析JSON响应并提取结果,默认空列表防止KeyError
        results = response.json().get("results", [])
        
        # 格式化搜索结果为Markdown
        formatted_results = []
        for result in results[:3]:  # 取前3个结果
            formatted_results.append(f"""
            ### {result['title']}
            **来源**: {result['source']}  
            {result['snippet'][:200]}...  # 截取摘要前200字符
            """)
            
        # 存储结果到会话状态
        st.session_state.search_results = formatted_results
        # 返回拼接后的Markdown文本
        return "\n\n".join(formatted_results)
    except Exception as e:
        # 通用异常捕获并显示错误
        st.error(f"搜索失败: {str(e)}")
        return ""

生成增强

# 生成增强提示
def build_enhanced_prompt(user_input):

    # 初始化基础提示(直接使用用户输入)
    prompt = user_input  # 保留原始问题的完整性
    # 添加文件上下文
    if st.session_state.uploaded_files:
        # 生成结构化文件上下文信息
        # 格式示例:
        # 【文件 年度报告.pdf 内容】
        # <文件内容文本>
        file_context = "\n".join(
            [f"【文件 {f['name']} 内容】\n{f['content']}"  # 每个文件单独标记
             for f in st.session_state.uploaded_files]    # 遍历所有已上传文件
        )
        
        # 将文件上下文插入到提示顶部,保持上下文可见性
        # 新格式:
        # [文件上下文]
        # [原始问题]
        prompt = f"{file_context}\n\n基于以上资料,请回答:{prompt}"  # 使用空行分隔上下文和问题
    
    # 添加搜索结果
    if st.session_state.web_search:
        # 执行网络搜索(假设返回的是结构化文本)
        search_context = perform_web_search(user_input)  # 自定义搜索函数
        
        # 将搜索结果附加到提示底部
        # 格式示例:
        # [原始问题]
        # [网络搜索结果]
        prompt = f"{prompt}\n\n【网络搜索结果】\n{search_context}"  # 追加搜索结果
    
    return prompt  # 最终结构:文件上下文 + 原始问题 + 搜索结果

不同模式配置

# 不同模式配置
def get_mode_settings():
    modes = {
        "标准模式": {
            "model": "deepseek-ai/DeepSeek-V3",
            "temperature": 0.7,
            "max_tokens": 2000
        },
        "深度思考": {
            "model": "deepseek-ai/DeepSeek-R1",
            "temperature": 0.3,
            "max_tokens": 4000,
            "thinking_depth": "deep"
        },
        "创意模式": {
            "model": "deepseek-ai/DeepSeek-V3",
            "temperature": 1.0,
            "max_tokens": 1500
        },
        "代码专家": {
            "model": "deepseek-ai/DeepSeek-V3",
            "temperature": 0.2,
            "max_tokens": 3000
        }
    }
    return modes[st.session_state.current_mode]

主聊天界面

# 主聊天界面
def main_interface():
    st.title("AI Chat Assistant")
    st.markdown("---")
    
    # 显示聊天记录
    for msg in st.session_state.messages:
        # 确定消息角色(用户/助手)
        role = "assistant" if msg["role"] == "assistant" else "user"
        # 设置角色头像
        avatar = "👾" if role == "user" else "👽"
        # 创建带样式的消息容器
        with st.chat_message(role, avatar=avatar): # 使用Streamlit的聊天消息组件
             # 处理文件类型消息
            if msg.get("type") == "file":  # 检查是否存在特殊消息类型
                # 显示文件上传提示
                st.markdown(f"📎 **已上传文件**: {msg['content']}")  # 使用加粗显示文件名
            else:
                # 显示普通文本消息
                st.markdown(msg["content"])  # 渲染纯文本或Markdown内容
    
    # 处理用户实时输入
    if prompt := st.chat_input("输入消息..."):  # 海象运算符获取输入框内容
        # 调用输入处理函数
        handle_user_input(prompt)  # 将用户输入传递给处理逻辑

处理用户输入

# 处理用户输入
def handle_user_input(prompt):
    # 构建增强提示
    enhanced_prompt = build_enhanced_prompt(prompt)
    
    # 添加用户消息
    st.session_state.messages.append({"role": "user", "content": prompt})
    
    # 显示处理状态
    with st.status("正在处理...", expanded=True) as status:
        # 创建助手消息容器
        with st.chat_message("assistant",avatar = "👽"):
            response_placeholder = st.empty() # 创建空白占位符
            full_response = "" # 初始化完整响应
            
            try:
                # 获取模式配置
                settings = get_mode_settings()
                
                # 创建API客户端
                client = OpenAI(
                    api_key=st.session_state.api_key, # 从会话状态获取API密钥
                    base_url="https://api.siliconflow.cn/v1" # 指定私有化部署端点
                )
                
                # 构建消息列表
                messages = [
                    {"role": "system", "content": st.session_state.system_prompt}, # 系统级指令
                    *st.session_state.messages, # 解包历史消息(不含当前输入)
                    {"role": "user", "content": enhanced_prompt} # 当前增强版输入
                ]
                
                # 流式响应处理
                for chunk in client.chat.completions.create(
                    model=settings["model"],              # 模型选择
                    messages=messages,                    # 组合后的消息列表
                    stream=True,                          # 启用流式传输
                    temperature=settings["temperature"],  # 创意度参数 (0~2)
                    max_tokens=settings["max_tokens"]     # 最大输出长度
                ):
                    # 提取增量内容
                    if chunk.choices[0].delta.content:
                        full_response += chunk.choices[0].delta.content # 拼接响应片段
                        # 更新实时显示(带打字光标效果)
                        response_placeholder.markdown(full_response + "▌")
                
                # 显示最终结果
                response_placeholder.markdown(full_response) # 移除光标符号
                status.update(label="处理完成", state="complete")
                
                # 保存消息记录
                st.session_state.messages.append({
                    "role": "assistant",
                    "content": full_response,
                    "metadata": {  # 附加元数据
                        "mode": st.session_state.current_mode,  # 当前模式
                        "files": [f["name"] for f in st.session_state.uploaded_files],  # 使用文件
                        "search_used": st.session_state.web_search  # 是否启用搜索
                    }
                })

                
            except Exception as e:
                response_placeholder.error(f"请求失败: {str(e)}")
                status.update(label="处理出错", state="error")

完整程序

import streamlit as st
from openai import OpenAI
import requests
from PIL import Image
import PyPDF2, docx, pytesseract
from datetime import datetime

st.set_page_config(
    page_title="AI_Chat_Assistant",
    page_icon="🧠",
    layout="wide",
)

# 自定义CSS样式
def inject_custom_css():
    st.markdown("""
    <style>
        /* 主界面样式 */
        .stApp {
            background: #f8f9fa;
        }
        
        /* 模式选择器 */
        .mode-selector {
            background: #ffffff;
            border-radius: 12px;
            padding: 1rem;
            box-shadow: 0 2px 8px rgba(0,0,0,0.1);
        }
        
        /* 文件上传区域 */
        .uploader-box {
            border: 2px dashed #6366f1;
            border-radius: 12px;
            padding: 2rem;
            text-align: center;
        }
        
        /* 增强型消息气泡 */
        .enhanced-message {
            border-radius: 1rem;
            padding: 1.5rem;
            margin: 1rem 0;
            transition: all 0.3s ease;
        }
    </style>
    """, unsafe_allow_html=True)

# 初始化会话状态
def init_session_state():
    # 定义默认状态字典,包含所有需要维护的状态变量及其初始值
    default_state = {
        # 当前对话的消息列表,存储字典格式的消息记录
        # 每个消息包含 role(user/assistant)和 content
        "messages": [],
        # 历史对话存档列表,每个元素为一个完整对话的字典
        # 结构示例:{
        #   "id": "唯一标识", 
        #   "title": "对话标题",
        #   "messages": [消息列表],
        #   "timestamp": "时间戳"
        # }
        "conversations": [],
        # 当前活动对话的索引(指向conversations列表的索引位置)
        # None表示当前处于新建对话状态
        "current_conv": None,
        # 大模型 API 密钥存储,用于接口认证
        # 初始为空字符串,用户可在界面输入或从secrets加载
        "api_key": "",
        # 系统级提示词,用于控制AI的基本行为模式
        # 可在设置中修改,影响所有对话
        "system_prompt": "你是一个专业的人工智能助手",
        # 当前选择的对话模式,影响模型参数和响应方式
        # 可选值:"标准模式"/"深度思考"/"创意模式"/"代码专家"
        "current_mode": "标准模式",
        # 是否启用联网搜索功能的标志位
        # True表示自动补充网络搜索结果,False则禁用
        "web_search": False,
        # 已上传文件列表,存储字典格式的文件信息
        # 每个文件包含:id(唯一标识)、name(文件名)、content(提取内容)
        "uploaded_files": [],
        # 最近一次搜索的结果缓存
        # 存储格式化后的搜索结果文本,用于构建上下文提示
        "search_results": []
    }
    # 遍历默认状态字典,初始化会话状态
    # 采用先检查再赋值的策略,保留用户已修改的状态值
    for key, value in default_state.items():
        # 仅当会话状态中不存在该键时进行初始化
        # 确保不会覆盖用户操作产生的状态变化
        if key not in st.session_state:
            # 将默认值赋给会话状态
            # 使用浅拷贝避免引用类型数据共享问题
            st.session_state[key] = value.copy() if isinstance(value, (list, dict)) else value

# 侧边栏组件
def render_sidebar():
    with st.sidebar:
        st.header("⚙️ 控制中心")
        
        # API密钥输入
        api_key = st.text_input(
            label="大模型 API密钥", # 输入框标签
            type="password", # 密码类型(隐藏输入内容)
            value=st.session_state.api_key, # 绑定会话状态中的API密钥
            help="从下面链接中获取API密钥", # 悬浮提示信息
            key="api_key_input" # 唯一组件标识
        )
        "[获取最新大模型 API key](https://cloud.siliconflow.cn/i/rIJNnSJP)" 

        # 更新会话状态(优先使用用户输入,否则尝试从secrets获取)
        st.session_state.api_key = api_key or st.secrets.get("OpenAI_key")
        
        # 新对话按钮
        if st.button("✨ 开启新对话", 
                    use_container_width=True, # 占满容器宽度
                    help="清空当前对话并创建新会话" ):
            save_current_conversation() # 持久化当前对话
            reset_conversation() # 重置会话状态
        
        st.divider() # 区域分隔线
        
        # 历史对话管理
        st.subheader("📜 对话历史")
        render_conversation_history() # 渲染历史对话列表组件
        
        st.divider()

        st.header("🛠️ 系统设置")
        
        # 模式选择
        st.session_state.current_mode = st.selectbox(
            label="对话模式",
            options=["标准模式", "深度思考", "创意模式", "代码专家"],
            index=0,
            help="根据任务类型选择响应模式:\n"
                 "- 标准模式:通用对话\n"
                 "- 深度思考:复杂问题分析\n"
                 "- 创意模式:内容生成\n"
                 "- 代码专家:编程相关"
        )
        
        # 联网搜索开关
        st.session_state.web_search = st.checkbox(
            label="启用联网搜索",
            value=st.session_state.web_search,  # 绑定状态值
            help="当知识库信息不足时,自动通过搜索引擎获取最新网络信息"
        )
        
        # 文件上传
        st.divider()
        st.subheader("📁 文件管理")
        uploaded_files = st.file_uploader(
            label="上传文档(支持PDF/Word/图片/TXT)",
            type=["pdf", "docx", "txt", "png", "jpg"],
            accept_multiple_files=True, # 启用多文件选择
            key="file_uploader",         # 唯一组件标识
            help="最大单个文件50MB,支持OCR识别图片文字"  # 上传限制说明
        )

        # 处理上传文件(文本提取)
        process_uploaded_files(uploaded_files)
        
        # 显示已上传文件
        if st.session_state.uploaded_files:
            st.subheader("已上传文件")
            # 遍历每个已上传文件
            for file in st.session_state.uploaded_files:
                # 创建2列布局(3:1比例)
                cols = st.columns([3,1])
                # 左侧列显示文件名和图标
                with cols[0]:
                    st.caption(f"📄 {file['name']}")  # 带图标的文件名
                # 右侧列显示删除按钮
                with cols[1]:
                    # 动态生成唯一删除按钮
                    if st.button(
                        "×",  # 按钮符号
                        key=f"del_{file['id']}",  # 唯一键(基于文件ID)
                        help=f"删除 {file['name']}"  # 悬停提示
                    ):
                        # 点击时触发删除操作
                        delete_uploaded_file(file['id'])

def delete_uploaded_file(file_id):
    # 过滤出需要保留的文件
    st.session_state.uploaded_files = [
        f for f in st.session_state.uploaded_files
        if f['id'] != file_id
    ]

# 渲染历史对话列表
def render_conversation_history():
    # 空状态处理:当没有历史对话时显示提示
    if len(st.session_state.conversations) == 0:
        st.caption("暂无历史对话")
        return
    # 遍历所有历史对话记录(带索引枚举)
    for idx, conv in enumerate(st.session_state.conversations):
        # 创建三列布局(比例1:0.2:0.2)
        # 布局说明:
        # [0] 对话标题按钮(主要区域)
        # [1] 空白间隔(占位缓冲)
        # [2] 删除按钮(操作区域)
        cols = st.columns([1, 0.2, 0.2])
        # 判断是否为当前活动对话(用于视觉反馈)
        is_active = idx == st.session_state.current_conv
        
        # 对话标题按钮
        with cols[0]:
            # 动态按钮样式(选中状态背景色)
            btn_style = "background: #f0f4ff;" if is_active else ""
            # 创建带样式的对话项按钮
            if st.button(
                f"🗨️ {conv['title']}",  # 带图标的标题文本
                key=f"conv_{idx}",      # 唯一键(基于索引)
                use_container_width=True,  # 占满列宽
                help=f"最后更新: {conv['time']}",  # 悬浮显示更新时间
                type="primary" if is_active else "secondary"  # 颜色类型
            ):
                # 点击回调:加载指定索引的对话
                load_conversation(idx)
        
        # 删除按钮
        with cols[2]:
            if st.button(
                "🗑️",  # 垃圾桶图标
                key=f"del_{idx}",  # 唯一键(基于索引)
                help="永久删除此对话"  # 悬浮提示
            ):
                # 点击回调:删除指定索引的对话
                delete_conversation(idx)

# 保存当前对话
def save_current_conversation():
    # 检查当前是否存在有效对话消息
    if len(st.session_state.messages) > 0:
        new_conv = {
            # 唯一标识:使用精确到秒的时间戳(示例:"20240220143015")
            "id": datetime.now().strftime("%Y%m%d%H%M%S"),             
            # 自动生成的对话标题(通过分析首条用户消息)
            "title": generate_conversation_title(),             
            # 格式化时间戳(示例:"02/20 14:30")
            "time": datetime.now().strftime("%m/%d %H:%M"),          
            # 深拷贝当前消息列表(避免引用问题)
            "messages": st.session_state.messages.copy(),            
            # 深拷贝已上传文件列表(保留文件上下文)
            "files": st.session_state.uploaded_files.copy(),       
            # 记录对话使用的模式(影响后续加载时的模式还原)
            "mode": st.session_state.current_mode 
        }
        
        # 更新或新增对话记录
        if st.session_state.current_conv is not None:
            # 更新模式:覆盖现有对话记录
            # current_conv 是 conversations 列表的有效索引
            st.session_state.conversations[st.session_state.current_conv] = new_conv
        else:
            # 新增模式:将新对话追加到历史记录末尾
            st.session_state.conversations.append(new_conv)

# 生成对话标题
def generate_conversation_title():
    if len(st.session_state.messages) >= 2:
        # 查找首条用户消息(生成器表达式提高效率)
        first_user_msg = next(
            (m["content"] for m in st.session_state.messages if m["role"] == "user"),
            "新对话" # 默认值(找不到用户消息时使用)
        )
        # 智能截断处理(避免截断半个中文字符)
        if len(first_user_msg) > 18:
            # 截取前18字符(中文安全)
            return first_user_msg[:18] + "..."
        else:
            return first_user_msg
    return "新对话"  # 消息不足时的默认标题

# 加载历史对话
def load_conversation(index):
    # 索引范围校验(防止越界访问)
    if 0 <= index < len(st.session_state.conversations):
        # 获取目标对话记录
        conv = st.session_state.conversations[index]      
        # 加载核心数据(深拷贝防止引用问题)
        st.session_state.messages = list(conv["messages"])  # 强制列表类型     
        # 加载关联文件(兼容无文件记录的旧对话)
        st.session_state.uploaded_files = list(conv.get("files", []))      
        # 恢复对话模式(默认"标准模式"防止模式字段缺失)
        st.session_state.current_mode = str(conv.get("mode", "标准模式"))
        # 更新当前对话指针
        st.session_state.current_conv = int(index)  # 确保索引为整数类型

# 删除对话
def delete_conversation(index):
    # 索引有效性检查(防止误删)
    if 0 <= index < len(st.session_state.conversations):
        # 删除目标对话
        del st.session_state.conversations[index]
        
        # 若删除的是当前对话,执行状态重置
        if st.session_state.current_conv == index:
            reset_conversation()  # 调用统一重置函数
            
            # 处理索引指针(因列表变动需特殊处理)
            if len(st.session_state.conversations) > 0:
                # 自动指向相邻对话(优先前一条)
                new_index = max(0, index-1)
                load_conversation(new_index)
            else:
                # 无历史对话时完全重置
                st.session_state.current_conv = None

# 重置对话
def reset_conversation():
    # 清空消息记录(保留系统提示词)
    st.session_state.messages = []
    # 清理上传文件(释放内存)
    st.session_state.uploaded_files = []
    # 重置对话指针(标记为新对话)
    st.session_state.current_conv = None


# 文件处理函数
def process_uploaded_files(uploaded_files):
    MAX_FILE_SIZE = 50 * 1024 * 1024  # 50MB
    for file in uploaded_files:
        # 检查文件是否已存在(通过唯一file_id校验)
        # 使用any()进行存在性检查,避免重复处理相同文件
        if any(f["id"] == file.file_id for f in st.session_state.uploaded_files):
            continue # 跳过已处理文件

        # 验证文件大小
        if file.size > MAX_FILE_SIZE:
            st.warning(f"文件 {file.name} 超过{MAX_FILE_SIZE//1024//1024}MB限制,已跳过")
            continue

        # 初始化文件内容容器    
        file_content = ""

        try:
            if file.type == "application/pdf":
                # 使用PyPDF2解析PDF文本
                pdf_reader = PyPDF2.PdfReader(file)
                # 逐页提取文本并拼接(过滤空页)
                file_content = "\n".join([
                    page.extract_text() 
                    for page in pdf_reader.pages 
                    if page.extract_text().strip()
                ])
            elif file.type == "text/plain":
                # 读取字节流并解码为UTF-8
                # 注意:可能需处理其他编码格式(如GBK)
                file_content = str(file.read(), "utf-8")
            elif file.type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
                # 使用python-docx解析.docx格式
                doc = docx.Document(file)
                # 提取所有段落文本
                file_content = "\n".join([
                    para.text 
                    for para in doc.paragraphs 
                    if para.text.strip()
                ])
            elif file.type.startswith("image/"):
                # 使用PIL加载图片
                image = Image.open(file)
                # 使用pytesseract进行OCR识别(需安装Tesseract)
                # 建议配置语言参数:lang='chi_sim+eng'
                file_content = pytesseract.image_to_string(image)
                
            # 构建文件信息字典(限制内容长度)
            st.session_state.uploaded_files.append({
                "id": file.file_id,      # Streamlit生成的唯一文件ID
                "name": file.name,       # 原始文件名
                "content": file_content[:5000]  # 截断前5000字符防止内存溢出
            })
        except Exception as e:
            # 异常处理(捕获所有可能错误)
            st.error(f"处理文件 {file.name} 时出错: {str(e)}")

# 联网搜索功能
def perform_web_search(query):
    try:
        # 搜索引擎API
        search_api = "https://search-for-llmapi.dawne.cn/with-search/v1"
        params = {
            "q": query,  # 用户搜索词
            "api_key": st.session_state.api_key,
            "limit": 3  # 限制返回结果数
        }
        # 发送GET请求到搜索API
        response = requests.get(search_api, params=params)
        # 解析JSON响应并提取结果,默认空列表防止KeyError
        results = response.json().get("results", [])
        
        # 格式化搜索结果为Markdown
        formatted_results = []
        for result in results[:3]:  # 取前3个结果
            formatted_results.append(f"""
            ### {result['title']}
            **来源**: {result['source']}  
            {result['snippet'][:200]}...  # 截取摘要前200字符
            """)
            
        # 存储结果到会话状态
        st.session_state.search_results = formatted_results
        # 返回拼接后的Markdown文本
        return "\n\n".join(formatted_results)
    except Exception as e:
        # 通用异常捕获并显示错误
        st.error(f"搜索失败: {str(e)}")
        return ""

# 生成增强提示
def build_enhanced_prompt(user_input):

    # 初始化基础提示(直接使用用户输入)
    prompt = user_input  # 保留原始问题的完整性
    # 添加文件上下文
    if st.session_state.uploaded_files:
        # 生成结构化文件上下文信息
        # 格式示例:
        # 【文件 年度报告.pdf 内容】
        # <文件内容文本>
        file_context = "\n".join(
            [f"【文件 {f['name']} 内容】\n{f['content']}"  # 每个文件单独标记
             for f in st.session_state.uploaded_files]    # 遍历所有已上传文件
        )
        
        # 将文件上下文插入到提示顶部,保持上下文可见性
        # 新格式:
        # [文件上下文]
        # [原始问题]
        prompt = f"{file_context}\n\n基于以上资料,请回答:{prompt}"  # 使用空行分隔上下文和问题
    
    # 添加搜索结果
    if st.session_state.web_search:
        # 执行网络搜索(假设返回的是结构化文本)
        search_context = perform_web_search(user_input)  # 自定义搜索函数
        
        # 将搜索结果附加到提示底部
        # 格式示例:
        # [原始问题]
        # [网络搜索结果]
        prompt = f"{prompt}\n\n【网络搜索结果】\n{search_context}"  # 追加搜索结果
    
    return prompt  # 最终结构:文件上下文 + 原始问题 + 搜索结果

# 不同模式配置
def get_mode_settings():
    modes = {
        "标准模式": {
            "model": "deepseek-ai/DeepSeek-V3",
            "temperature": 0.7,
            "max_tokens": 2000
        },
        "深度思考": {
            "model": "deepseek-ai/DeepSeek-R1",
            "temperature": 0.3,
            "max_tokens": 4000,
            "thinking_depth": "deep"
        },
        "创意模式": {
            "model": "deepseek-ai/DeepSeek-V3",
            "temperature": 1.0,
            "max_tokens": 1500
        },
        "代码专家": {
            "model": "deepseek-ai/DeepSeek-V3",
            "temperature": 0.2,
            "max_tokens": 3000
        }
    }
    return modes[st.session_state.current_mode]

# 主聊天界面
def main_interface():
    st.title("AI Chat Assistant")
    st.markdown("---")
    
    # 显示聊天记录
    for msg in st.session_state.messages:
        # 确定消息角色(用户/助手)
        role = "assistant" if msg["role"] == "assistant" else "user"
        # 设置角色头像
        avatar = "👾" if role == "user" else "👽"
        # 创建带样式的消息容器
        with st.chat_message(role, avatar=avatar): # 使用Streamlit的聊天消息组件
             # 处理文件类型消息
            if msg.get("type") == "file":  # 检查是否存在特殊消息类型
                # 显示文件上传提示
                st.markdown(f"📎 **已上传文件**: {msg['content']}")  # 使用加粗显示文件名
            else:
                # 显示普通文本消息
                st.markdown(msg["content"])  # 渲染纯文本或Markdown内容
    
    # 处理用户实时输入
    if prompt := st.chat_input("输入消息..."):  # 海象运算符获取输入框内容
        # 调用输入处理函数
        handle_user_input(prompt)  # 将用户输入传递给处理逻辑

# 处理用户输入
def handle_user_input(prompt):
    # 构建增强提示
    enhanced_prompt = build_enhanced_prompt(prompt)
    
    # 添加用户消息
    st.session_state.messages.append({"role": "user", "content": prompt})
    
    # 显示处理状态
    with st.status("正在处理...", expanded=True) as status:
        # 创建助手消息容器
        with st.chat_message("assistant",avatar = "👽"):
            response_placeholder = st.empty() # 创建空白占位符
            full_response = "" # 初始化完整响应
            
            try:
                # 获取模式配置
                settings = get_mode_settings()
                
                # 创建API客户端
                client = OpenAI(
                    api_key=st.session_state.api_key, # 从会话状态获取API密钥
                    base_url="https://api.siliconflow.cn/v1" # 指定私有化部署端点
                )
                
                # 构建消息列表
                messages = [
                    {"role": "system", "content": st.session_state.system_prompt}, # 系统级指令
                    *st.session_state.messages, # 解包历史消息(不含当前输入)
                    {"role": "user", "content": enhanced_prompt} # 当前增强版输入
                ]
                
                # 流式响应处理
                for chunk in client.chat.completions.create(
                    model=settings["model"],              # 模型选择
                    messages=messages,                    # 组合后的消息列表
                    stream=True,                          # 启用流式传输
                    temperature=settings["temperature"],  # 创意度参数 (0~2)
                    max_tokens=settings["max_tokens"]     # 最大输出长度
                ):
                    # 提取增量内容
                    if chunk.choices[0].delta.content:
                        full_response += chunk.choices[0].delta.content # 拼接响应片段
                        # 更新实时显示(带打字光标效果)
                        response_placeholder.markdown(full_response + "▌")
                
                # 显示最终结果
                response_placeholder.markdown(full_response) # 移除光标符号
                status.update(label="处理完成", state="complete")
                
                # 保存消息记录
                st.session_state.messages.append({
                    "role": "assistant",
                    "content": full_response,
                    "metadata": {  # 附加元数据
                        "mode": st.session_state.current_mode,  # 当前模式
                        "files": [f["name"] for f in st.session_state.uploaded_files],  # 使用文件
                        "search_used": st.session_state.web_search  # 是否启用搜索
                    }
                })

                
            except Exception as e:
                response_placeholder.error(f"请求失败: {str(e)}")
                status.update(label="处理出错", state="error")

# 主程序
if __name__ == "__main__":
    inject_custom_css()
    init_session_state()
    render_sidebar()
    main_interface()


该程序为基础版本,未做所有功能的测试,未知bug肯定存在,大家使用过程中一定注意。其中互联网搜索功能存在问题,程序中api无法使用。API Key放在了.streamlit 文件夹的 secrets.toml 中,用户也可以在界面上输入 API key。该demo利用 session_state 记录历史对话,没有加入数据库,所以重新加载就会失效。

希望本文对大家有帮助,上文若有不妥之处,欢迎指正

分享决定高度,学习拉开差距

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐