
手把手一起使用Streamlit与硅基流动(Siliconflow)快速构建DeepSeek应用
使用Streamlit与硅基流动(Siliconflow)快速构建DeepSeek应用
·
获取Siliconflow API Key
近期Deepseek官网频繁出现“服务器繁忙,请稍后再试”,而近期各大厂商都纷纷推出Deepseek满血版API接口,实测可用。
Siliconflow作为集合顶尖大模型的一站式云服务平台,致力于为开发者提供更快、更全面、体验更丝滑的模型 API,助力开发者和企业聚焦产品创新,无须担心产品大规模推广所带来的高昂算力成本,注册链接: https://cloud.siliconflow.cn/i/rIJNnSJP 新注册用户免费送14元,几乎用不完。
创建API Key
创建完成,后续需要使用,直接复制即可
更多详细教程,可以在用户手册中学习 https://docs.siliconflow.cn/cn/userguide/introduction
Streamlit搭建DeepSeek应用
实现效果如下:
自定义CSS样式
# 自定义CSS样式
def inject_custom_css():
st.markdown("""
<style>
/* 主界面样式 */
.stApp {
background: #f8f9fa;
}
/* 模式选择器 */
.mode-selector {
background: #ffffff;
border-radius: 12px;
padding: 1rem;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}
/* 文件上传区域 */
.uploader-box {
border: 2px dashed #6366f1;
border-radius: 12px;
padding: 2rem;
text-align: center;
}
/* 增强型消息气泡 */
.enhanced-message {
border-radius: 1rem;
padding: 1.5rem;
margin: 1rem 0;
transition: all 0.3s ease;
}
</style>
""", unsafe_allow_html=True)
初始化会话状态
# 初始化会话状态
def init_session_state():
# 定义默认状态字典,包含所有需要维护的状态变量及其初始值
default_state = {
# 当前对话的消息列表,存储字典格式的消息记录
# 每个消息包含 role(user/assistant)和 content
"messages": [],
# 历史对话存档列表,每个元素为一个完整对话的字典
# 结构示例:{
# "id": "唯一标识",
# "title": "对话标题",
# "messages": [消息列表],
# "timestamp": "时间戳"
# }
"conversations": [],
# 当前活动对话的索引(指向conversations列表的索引位置)
# None表示当前处于新建对话状态
"current_conv": None,
# 大模型 API 密钥存储,用于接口认证
# 初始为空字符串,用户可在界面输入或从secrets加载
"api_key": "",
# 系统级提示词,用于控制AI的基本行为模式
# 可在设置中修改,影响所有对话
"system_prompt": "你是一个专业的人工智能助手",
# 当前选择的对话模式,影响模型参数和响应方式
# 可选值:"标准模式"/"深度思考"/"创意模式"/"代码专家"
"current_mode": "标准模式",
# 是否启用联网搜索功能的标志位
# True表示自动补充网络搜索结果,False则禁用
"web_search": False,
# 已上传文件列表,存储字典格式的文件信息
# 每个文件包含:id(唯一标识)、name(文件名)、content(提取内容)
"uploaded_files": [],
# 最近一次搜索的结果缓存
# 存储格式化后的搜索结果文本,用于构建上下文提示
"search_results": []
}
# 遍历默认状态字典,初始化会话状态
# 采用先检查再赋值的策略,保留用户已修改的状态值
for key, value in default_state.items():
# 仅当会话状态中不存在该键时进行初始化
# 确保不会覆盖用户操作产生的状态变化
if key not in st.session_state:
# 将默认值赋给会话状态
# 使用浅拷贝避免引用类型数据共享问题
st.session_state[key] = value.copy() if isinstance(value, (list, dict)) else value
侧边栏组件
# 侧边栏组件
def render_sidebar():
with st.sidebar:
st.header("⚙️ 控制中心")
# API密钥输入
api_key = st.text_input(
label="大模型 API密钥", # 输入框标签
type="password", # 密码类型(隐藏输入内容)
value=st.session_state.api_key, # 绑定会话状态中的API密钥
help="从下面链接中获取API密钥", # 悬浮提示信息
key="api_key_input" # 唯一组件标识
)
"[获取最新大模型 API key](https://cloud.siliconflow.cn/i/rIJNnSJP)"
# 更新会话状态(优先使用用户输入,否则尝试从secrets获取) secrets文件即secrets.toml,创建在.streamlit目录下,内容为OpenAI_key = "xxxxxx"
st.session_state.api_key = api_key or st.secrets.get("OpenAI_key")
# 新对话按钮
if st.button("✨ 开启新对话",
use_container_width=True, # 占满容器宽度
help="清空当前对话并创建新会话" ):
save_current_conversation() # 持久化当前对话
reset_conversation() # 重置会话状态
st.divider() # 区域分隔线
# 历史对话管理
st.subheader("📜 对话历史")
render_conversation_history() # 渲染历史对话列表组件
st.divider()
st.header("🛠️ 系统设置")
# 模式选择
st.session_state.current_mode = st.selectbox(
label="对话模式",
options=["标准模式", "深度思考", "创意模式", "代码专家"],
index=0,
help="根据任务类型选择响应模式:\n"
"- 标准模式:通用对话\n"
"- 深度思考:复杂问题分析\n"
"- 创意模式:内容生成\n"
"- 代码专家:编程相关"
)
# 联网搜索开关
st.session_state.web_search = st.checkbox(
label="启用联网搜索",
value=st.session_state.web_search, # 绑定状态值
help="当知识库信息不足时,自动通过搜索引擎获取最新网络信息"
)
# 文件上传
st.divider()
st.subheader("📁 文件管理")
uploaded_files = st.file_uploader(
label="上传文档(支持PDF/Word/图片/TXT)",
type=["pdf", "docx", "txt", "png", "jpg"],
accept_multiple_files=True, # 启用多文件选择
key="file_uploader", # 唯一组件标识
help="最大单个文件50MB,支持OCR识别图片文字" # 上传限制说明
)
# 处理上传文件(文本提取)
process_uploaded_files(uploaded_files)
# 显示已上传文件
if st.session_state.uploaded_files:
st.subheader("已上传文件")
# 遍历每个已上传文件
for file in st.session_state.uploaded_files:
# 创建2列布局(3:1比例)
cols = st.columns([3,1])
# 左侧列显示文件名和图标
with cols[0]:
st.caption(f"📄 {file['name']}") # 带图标的文件名
# 右侧列显示删除按钮
with cols[1]:
# 动态生成唯一删除按钮
if st.button(
"×", # 按钮符号
key=f"del_{file['id']}", # 唯一键(基于文件ID)
help=f"删除 {file['name']}" # 悬停提示
):
# 点击时触发删除操作
delete_uploaded_file(file['id'])
对话处理
def delete_uploaded_file(file_id):
# 过滤出需要保留的文件
st.session_state.uploaded_files = [
f for f in st.session_state.uploaded_files
if f['id'] != file_id
]
# 渲染历史对话列表
def render_conversation_history():
# 空状态处理:当没有历史对话时显示提示
if len(st.session_state.conversations) == 0:
st.caption("暂无历史对话")
return
# 遍历所有历史对话记录(带索引枚举)
for idx, conv in enumerate(st.session_state.conversations):
# 创建三列布局(比例1:0.2:0.2)
# 布局说明:
# [0] 对话标题按钮(主要区域)
# [1] 空白间隔(占位缓冲)
# [2] 删除按钮(操作区域)
cols = st.columns([1, 0.2, 0.2])
# 判断是否为当前活动对话(用于视觉反馈)
is_active = idx == st.session_state.current_conv
# 对话标题按钮
with cols[0]:
# 动态按钮样式(选中状态背景色)
btn_style = "background: #f0f4ff;" if is_active else ""
# 创建带样式的对话项按钮
if st.button(
f"🗨️ {conv['title']}", # 带图标的标题文本
key=f"conv_{idx}", # 唯一键(基于索引)
use_container_width=True, # 占满列宽
help=f"最后更新: {conv['time']}", # 悬浮显示更新时间
type="primary" if is_active else "secondary" # 颜色类型
):
# 点击回调:加载指定索引的对话
load_conversation(idx)
# 删除按钮
with cols[2]:
if st.button(
"🗑️", # 垃圾桶图标
key=f"del_{idx}", # 唯一键(基于索引)
help="永久删除此对话" # 悬浮提示
):
# 点击回调:删除指定索引的对话
delete_conversation(idx)
# 保存当前对话
def save_current_conversation():
# 检查当前是否存在有效对话消息
if len(st.session_state.messages) > 0:
new_conv = {
# 唯一标识:使用精确到秒的时间戳(示例:"20240220143015")
"id": datetime.now().strftime("%Y%m%d%H%M%S"),
# 自动生成的对话标题(通过分析首条用户消息)
"title": generate_conversation_title(),
# 格式化时间戳(示例:"02/20 14:30")
"time": datetime.now().strftime("%m/%d %H:%M"),
# 深拷贝当前消息列表(避免引用问题)
"messages": st.session_state.messages.copy(),
# 深拷贝已上传文件列表(保留文件上下文)
"files": st.session_state.uploaded_files.copy(),
# 记录对话使用的模式(影响后续加载时的模式还原)
"mode": st.session_state.current_mode
}
# 更新或新增对话记录
if st.session_state.current_conv is not None:
# 更新模式:覆盖现有对话记录
# current_conv 是 conversations 列表的有效索引
st.session_state.conversations[st.session_state.current_conv] = new_conv
else:
# 新增模式:将新对话追加到历史记录末尾
st.session_state.conversations.append(new_conv)
# 生成对话标题
def generate_conversation_title():
if len(st.session_state.messages) >= 2:
# 查找首条用户消息(生成器表达式提高效率)
first_user_msg = next(
(m["content"] for m in st.session_state.messages if m["role"] == "user"),
"新对话" # 默认值(找不到用户消息时使用)
)
# 智能截断处理(避免截断半个中文字符)
if len(first_user_msg) > 18:
# 截取前18字符(中文安全)
return first_user_msg[:18] + "..."
else:
return first_user_msg
return "新对话" # 消息不足时的默认标题
# 加载历史对话
def load_conversation(index):
# 索引范围校验(防止越界访问)
if 0 <= index < len(st.session_state.conversations):
# 获取目标对话记录
conv = st.session_state.conversations[index]
# 加载核心数据(深拷贝防止引用问题)
st.session_state.messages = list(conv["messages"]) # 强制列表类型
# 加载关联文件(兼容无文件记录的旧对话)
st.session_state.uploaded_files = list(conv.get("files", []))
# 恢复对话模式(默认"标准模式"防止模式字段缺失)
st.session_state.current_mode = str(conv.get("mode", "标准模式"))
# 更新当前对话指针
st.session_state.current_conv = int(index) # 确保索引为整数类型
# 删除对话
def delete_conversation(index):
# 索引有效性检查(防止误删)
if 0 <= index < len(st.session_state.conversations):
# 删除目标对话
del st.session_state.conversations[index]
# 若删除的是当前对话,执行状态重置
if st.session_state.current_conv == index:
reset_conversation() # 调用统一重置函数
# 处理索引指针(因列表变动需特殊处理)
if len(st.session_state.conversations) > 0:
# 自动指向相邻对话(优先前一条)
new_index = max(0, index-1)
load_conversation(new_index)
else:
# 无历史对话时完全重置
st.session_state.current_conv = None
# 重置对话
def reset_conversation():
# 清空消息记录(保留系统提示词)
st.session_state.messages = []
# 清理上传文件(释放内存)
st.session_state.uploaded_files = []
# 重置对话指针(标记为新对话)
st.session_state.current_conv = None
文件处理
# 文件处理函数
def process_uploaded_files(uploaded_files):
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
for file in uploaded_files:
# 检查文件是否已存在(通过唯一file_id校验)
# 使用any()进行存在性检查,避免重复处理相同文件
if any(f["id"] == file.file_id for f in st.session_state.uploaded_files):
continue # 跳过已处理文件
# 验证文件大小
if file.size > MAX_FILE_SIZE:
st.warning(f"文件 {file.name} 超过{MAX_FILE_SIZE//1024//1024}MB限制,已跳过")
continue
# 初始化文件内容容器
file_content = ""
try:
if file.type == "application/pdf":
# 使用PyPDF2解析PDF文本
pdf_reader = PyPDF2.PdfReader(file)
# 逐页提取文本并拼接(过滤空页)
file_content = "\n".join([
page.extract_text()
for page in pdf_reader.pages
if page.extract_text().strip()
])
elif file.type == "text/plain":
# 读取字节流并解码为UTF-8
# 注意:可能需处理其他编码格式(如GBK)
file_content = str(file.read(), "utf-8")
elif file.type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
# 使用python-docx解析.docx格式
doc = docx.Document(file)
# 提取所有段落文本
file_content = "\n".join([
para.text
for para in doc.paragraphs
if para.text.strip()
])
elif file.type.startswith("image/"):
# 使用PIL加载图片
image = Image.open(file)
# 使用pytesseract进行OCR识别(需安装Tesseract)
# 建议配置语言参数:lang='chi_sim+eng'
file_content = pytesseract.image_to_string(image)
# 构建文件信息字典(限制内容长度)
st.session_state.uploaded_files.append({
"id": file.file_id, # Streamlit生成的唯一文件ID
"name": file.name, # 原始文件名
"content": file_content[:5000] # 截断前5000字符防止内存溢出
})
except Exception as e:
# 异常处理(捕获所有可能错误)
st.error(f"处理文件 {file.name} 时出错: {str(e)}")
联网搜索
# 联网搜索功能
def perform_web_search(query):
try:
# 搜索引擎API
search_api = "https://search-for-llmapi.dawne.cn/with-search/v1"
params = {
"q": query, # 用户搜索词
"api_key": st.session_state.api_key,
"limit": 3 # 限制返回结果数
}
# 发送GET请求到搜索API
response = requests.get(search_api, params=params)
# 解析JSON响应并提取结果,默认空列表防止KeyError
results = response.json().get("results", [])
# 格式化搜索结果为Markdown
formatted_results = []
for result in results[:3]: # 取前3个结果
formatted_results.append(f"""
### {result['title']}
**来源**: {result['source']}
{result['snippet'][:200]}... # 截取摘要前200字符
""")
# 存储结果到会话状态
st.session_state.search_results = formatted_results
# 返回拼接后的Markdown文本
return "\n\n".join(formatted_results)
except Exception as e:
# 通用异常捕获并显示错误
st.error(f"搜索失败: {str(e)}")
return ""
生成增强
# 生成增强提示
def build_enhanced_prompt(user_input):
# 初始化基础提示(直接使用用户输入)
prompt = user_input # 保留原始问题的完整性
# 添加文件上下文
if st.session_state.uploaded_files:
# 生成结构化文件上下文信息
# 格式示例:
# 【文件 年度报告.pdf 内容】
# <文件内容文本>
file_context = "\n".join(
[f"【文件 {f['name']} 内容】\n{f['content']}" # 每个文件单独标记
for f in st.session_state.uploaded_files] # 遍历所有已上传文件
)
# 将文件上下文插入到提示顶部,保持上下文可见性
# 新格式:
# [文件上下文]
# [原始问题]
prompt = f"{file_context}\n\n基于以上资料,请回答:{prompt}" # 使用空行分隔上下文和问题
# 添加搜索结果
if st.session_state.web_search:
# 执行网络搜索(假设返回的是结构化文本)
search_context = perform_web_search(user_input) # 自定义搜索函数
# 将搜索结果附加到提示底部
# 格式示例:
# [原始问题]
# [网络搜索结果]
prompt = f"{prompt}\n\n【网络搜索结果】\n{search_context}" # 追加搜索结果
return prompt # 最终结构:文件上下文 + 原始问题 + 搜索结果
不同模式配置
# 不同模式配置
def get_mode_settings():
modes = {
"标准模式": {
"model": "deepseek-ai/DeepSeek-V3",
"temperature": 0.7,
"max_tokens": 2000
},
"深度思考": {
"model": "deepseek-ai/DeepSeek-R1",
"temperature": 0.3,
"max_tokens": 4000,
"thinking_depth": "deep"
},
"创意模式": {
"model": "deepseek-ai/DeepSeek-V3",
"temperature": 1.0,
"max_tokens": 1500
},
"代码专家": {
"model": "deepseek-ai/DeepSeek-V3",
"temperature": 0.2,
"max_tokens": 3000
}
}
return modes[st.session_state.current_mode]
主聊天界面
# 主聊天界面
def main_interface():
st.title("AI Chat Assistant")
st.markdown("---")
# 显示聊天记录
for msg in st.session_state.messages:
# 确定消息角色(用户/助手)
role = "assistant" if msg["role"] == "assistant" else "user"
# 设置角色头像
avatar = "👾" if role == "user" else "👽"
# 创建带样式的消息容器
with st.chat_message(role, avatar=avatar): # 使用Streamlit的聊天消息组件
# 处理文件类型消息
if msg.get("type") == "file": # 检查是否存在特殊消息类型
# 显示文件上传提示
st.markdown(f"📎 **已上传文件**: {msg['content']}") # 使用加粗显示文件名
else:
# 显示普通文本消息
st.markdown(msg["content"]) # 渲染纯文本或Markdown内容
# 处理用户实时输入
if prompt := st.chat_input("输入消息..."): # 海象运算符获取输入框内容
# 调用输入处理函数
handle_user_input(prompt) # 将用户输入传递给处理逻辑
处理用户输入
# 处理用户输入
def handle_user_input(prompt):
# 构建增强提示
enhanced_prompt = build_enhanced_prompt(prompt)
# 添加用户消息
st.session_state.messages.append({"role": "user", "content": prompt})
# 显示处理状态
with st.status("正在处理...", expanded=True) as status:
# 创建助手消息容器
with st.chat_message("assistant",avatar = "👽"):
response_placeholder = st.empty() # 创建空白占位符
full_response = "" # 初始化完整响应
try:
# 获取模式配置
settings = get_mode_settings()
# 创建API客户端
client = OpenAI(
api_key=st.session_state.api_key, # 从会话状态获取API密钥
base_url="https://api.siliconflow.cn/v1" # 指定私有化部署端点
)
# 构建消息列表
messages = [
{"role": "system", "content": st.session_state.system_prompt}, # 系统级指令
*st.session_state.messages, # 解包历史消息(不含当前输入)
{"role": "user", "content": enhanced_prompt} # 当前增强版输入
]
# 流式响应处理
for chunk in client.chat.completions.create(
model=settings["model"], # 模型选择
messages=messages, # 组合后的消息列表
stream=True, # 启用流式传输
temperature=settings["temperature"], # 创意度参数 (0~2)
max_tokens=settings["max_tokens"] # 最大输出长度
):
# 提取增量内容
if chunk.choices[0].delta.content:
full_response += chunk.choices[0].delta.content # 拼接响应片段
# 更新实时显示(带打字光标效果)
response_placeholder.markdown(full_response + "▌")
# 显示最终结果
response_placeholder.markdown(full_response) # 移除光标符号
status.update(label="处理完成", state="complete")
# 保存消息记录
st.session_state.messages.append({
"role": "assistant",
"content": full_response,
"metadata": { # 附加元数据
"mode": st.session_state.current_mode, # 当前模式
"files": [f["name"] for f in st.session_state.uploaded_files], # 使用文件
"search_used": st.session_state.web_search # 是否启用搜索
}
})
except Exception as e:
response_placeholder.error(f"请求失败: {str(e)}")
status.update(label="处理出错", state="error")
完整程序
import streamlit as st
from openai import OpenAI
import requests
from PIL import Image
import PyPDF2, docx, pytesseract
from datetime import datetime
st.set_page_config(
page_title="AI_Chat_Assistant",
page_icon="🧠",
layout="wide",
)
# 自定义CSS样式
def inject_custom_css():
st.markdown("""
<style>
/* 主界面样式 */
.stApp {
background: #f8f9fa;
}
/* 模式选择器 */
.mode-selector {
background: #ffffff;
border-radius: 12px;
padding: 1rem;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}
/* 文件上传区域 */
.uploader-box {
border: 2px dashed #6366f1;
border-radius: 12px;
padding: 2rem;
text-align: center;
}
/* 增强型消息气泡 */
.enhanced-message {
border-radius: 1rem;
padding: 1.5rem;
margin: 1rem 0;
transition: all 0.3s ease;
}
</style>
""", unsafe_allow_html=True)
# 初始化会话状态
def init_session_state():
# 定义默认状态字典,包含所有需要维护的状态变量及其初始值
default_state = {
# 当前对话的消息列表,存储字典格式的消息记录
# 每个消息包含 role(user/assistant)和 content
"messages": [],
# 历史对话存档列表,每个元素为一个完整对话的字典
# 结构示例:{
# "id": "唯一标识",
# "title": "对话标题",
# "messages": [消息列表],
# "timestamp": "时间戳"
# }
"conversations": [],
# 当前活动对话的索引(指向conversations列表的索引位置)
# None表示当前处于新建对话状态
"current_conv": None,
# 大模型 API 密钥存储,用于接口认证
# 初始为空字符串,用户可在界面输入或从secrets加载
"api_key": "",
# 系统级提示词,用于控制AI的基本行为模式
# 可在设置中修改,影响所有对话
"system_prompt": "你是一个专业的人工智能助手",
# 当前选择的对话模式,影响模型参数和响应方式
# 可选值:"标准模式"/"深度思考"/"创意模式"/"代码专家"
"current_mode": "标准模式",
# 是否启用联网搜索功能的标志位
# True表示自动补充网络搜索结果,False则禁用
"web_search": False,
# 已上传文件列表,存储字典格式的文件信息
# 每个文件包含:id(唯一标识)、name(文件名)、content(提取内容)
"uploaded_files": [],
# 最近一次搜索的结果缓存
# 存储格式化后的搜索结果文本,用于构建上下文提示
"search_results": []
}
# 遍历默认状态字典,初始化会话状态
# 采用先检查再赋值的策略,保留用户已修改的状态值
for key, value in default_state.items():
# 仅当会话状态中不存在该键时进行初始化
# 确保不会覆盖用户操作产生的状态变化
if key not in st.session_state:
# 将默认值赋给会话状态
# 使用浅拷贝避免引用类型数据共享问题
st.session_state[key] = value.copy() if isinstance(value, (list, dict)) else value
# 侧边栏组件
def render_sidebar():
with st.sidebar:
st.header("⚙️ 控制中心")
# API密钥输入
api_key = st.text_input(
label="大模型 API密钥", # 输入框标签
type="password", # 密码类型(隐藏输入内容)
value=st.session_state.api_key, # 绑定会话状态中的API密钥
help="从下面链接中获取API密钥", # 悬浮提示信息
key="api_key_input" # 唯一组件标识
)
"[获取最新大模型 API key](https://cloud.siliconflow.cn/i/rIJNnSJP)"
# 更新会话状态(优先使用用户输入,否则尝试从secrets获取)
st.session_state.api_key = api_key or st.secrets.get("OpenAI_key")
# 新对话按钮
if st.button("✨ 开启新对话",
use_container_width=True, # 占满容器宽度
help="清空当前对话并创建新会话" ):
save_current_conversation() # 持久化当前对话
reset_conversation() # 重置会话状态
st.divider() # 区域分隔线
# 历史对话管理
st.subheader("📜 对话历史")
render_conversation_history() # 渲染历史对话列表组件
st.divider()
st.header("🛠️ 系统设置")
# 模式选择
st.session_state.current_mode = st.selectbox(
label="对话模式",
options=["标准模式", "深度思考", "创意模式", "代码专家"],
index=0,
help="根据任务类型选择响应模式:\n"
"- 标准模式:通用对话\n"
"- 深度思考:复杂问题分析\n"
"- 创意模式:内容生成\n"
"- 代码专家:编程相关"
)
# 联网搜索开关
st.session_state.web_search = st.checkbox(
label="启用联网搜索",
value=st.session_state.web_search, # 绑定状态值
help="当知识库信息不足时,自动通过搜索引擎获取最新网络信息"
)
# 文件上传
st.divider()
st.subheader("📁 文件管理")
uploaded_files = st.file_uploader(
label="上传文档(支持PDF/Word/图片/TXT)",
type=["pdf", "docx", "txt", "png", "jpg"],
accept_multiple_files=True, # 启用多文件选择
key="file_uploader", # 唯一组件标识
help="最大单个文件50MB,支持OCR识别图片文字" # 上传限制说明
)
# 处理上传文件(文本提取)
process_uploaded_files(uploaded_files)
# 显示已上传文件
if st.session_state.uploaded_files:
st.subheader("已上传文件")
# 遍历每个已上传文件
for file in st.session_state.uploaded_files:
# 创建2列布局(3:1比例)
cols = st.columns([3,1])
# 左侧列显示文件名和图标
with cols[0]:
st.caption(f"📄 {file['name']}") # 带图标的文件名
# 右侧列显示删除按钮
with cols[1]:
# 动态生成唯一删除按钮
if st.button(
"×", # 按钮符号
key=f"del_{file['id']}", # 唯一键(基于文件ID)
help=f"删除 {file['name']}" # 悬停提示
):
# 点击时触发删除操作
delete_uploaded_file(file['id'])
def delete_uploaded_file(file_id):
# 过滤出需要保留的文件
st.session_state.uploaded_files = [
f for f in st.session_state.uploaded_files
if f['id'] != file_id
]
# 渲染历史对话列表
def render_conversation_history():
# 空状态处理:当没有历史对话时显示提示
if len(st.session_state.conversations) == 0:
st.caption("暂无历史对话")
return
# 遍历所有历史对话记录(带索引枚举)
for idx, conv in enumerate(st.session_state.conversations):
# 创建三列布局(比例1:0.2:0.2)
# 布局说明:
# [0] 对话标题按钮(主要区域)
# [1] 空白间隔(占位缓冲)
# [2] 删除按钮(操作区域)
cols = st.columns([1, 0.2, 0.2])
# 判断是否为当前活动对话(用于视觉反馈)
is_active = idx == st.session_state.current_conv
# 对话标题按钮
with cols[0]:
# 动态按钮样式(选中状态背景色)
btn_style = "background: #f0f4ff;" if is_active else ""
# 创建带样式的对话项按钮
if st.button(
f"🗨️ {conv['title']}", # 带图标的标题文本
key=f"conv_{idx}", # 唯一键(基于索引)
use_container_width=True, # 占满列宽
help=f"最后更新: {conv['time']}", # 悬浮显示更新时间
type="primary" if is_active else "secondary" # 颜色类型
):
# 点击回调:加载指定索引的对话
load_conversation(idx)
# 删除按钮
with cols[2]:
if st.button(
"🗑️", # 垃圾桶图标
key=f"del_{idx}", # 唯一键(基于索引)
help="永久删除此对话" # 悬浮提示
):
# 点击回调:删除指定索引的对话
delete_conversation(idx)
# 保存当前对话
def save_current_conversation():
# 检查当前是否存在有效对话消息
if len(st.session_state.messages) > 0:
new_conv = {
# 唯一标识:使用精确到秒的时间戳(示例:"20240220143015")
"id": datetime.now().strftime("%Y%m%d%H%M%S"),
# 自动生成的对话标题(通过分析首条用户消息)
"title": generate_conversation_title(),
# 格式化时间戳(示例:"02/20 14:30")
"time": datetime.now().strftime("%m/%d %H:%M"),
# 深拷贝当前消息列表(避免引用问题)
"messages": st.session_state.messages.copy(),
# 深拷贝已上传文件列表(保留文件上下文)
"files": st.session_state.uploaded_files.copy(),
# 记录对话使用的模式(影响后续加载时的模式还原)
"mode": st.session_state.current_mode
}
# 更新或新增对话记录
if st.session_state.current_conv is not None:
# 更新模式:覆盖现有对话记录
# current_conv 是 conversations 列表的有效索引
st.session_state.conversations[st.session_state.current_conv] = new_conv
else:
# 新增模式:将新对话追加到历史记录末尾
st.session_state.conversations.append(new_conv)
# 生成对话标题
def generate_conversation_title():
if len(st.session_state.messages) >= 2:
# 查找首条用户消息(生成器表达式提高效率)
first_user_msg = next(
(m["content"] for m in st.session_state.messages if m["role"] == "user"),
"新对话" # 默认值(找不到用户消息时使用)
)
# 智能截断处理(避免截断半个中文字符)
if len(first_user_msg) > 18:
# 截取前18字符(中文安全)
return first_user_msg[:18] + "..."
else:
return first_user_msg
return "新对话" # 消息不足时的默认标题
# 加载历史对话
def load_conversation(index):
# 索引范围校验(防止越界访问)
if 0 <= index < len(st.session_state.conversations):
# 获取目标对话记录
conv = st.session_state.conversations[index]
# 加载核心数据(深拷贝防止引用问题)
st.session_state.messages = list(conv["messages"]) # 强制列表类型
# 加载关联文件(兼容无文件记录的旧对话)
st.session_state.uploaded_files = list(conv.get("files", []))
# 恢复对话模式(默认"标准模式"防止模式字段缺失)
st.session_state.current_mode = str(conv.get("mode", "标准模式"))
# 更新当前对话指针
st.session_state.current_conv = int(index) # 确保索引为整数类型
# 删除对话
def delete_conversation(index):
# 索引有效性检查(防止误删)
if 0 <= index < len(st.session_state.conversations):
# 删除目标对话
del st.session_state.conversations[index]
# 若删除的是当前对话,执行状态重置
if st.session_state.current_conv == index:
reset_conversation() # 调用统一重置函数
# 处理索引指针(因列表变动需特殊处理)
if len(st.session_state.conversations) > 0:
# 自动指向相邻对话(优先前一条)
new_index = max(0, index-1)
load_conversation(new_index)
else:
# 无历史对话时完全重置
st.session_state.current_conv = None
# 重置对话
def reset_conversation():
# 清空消息记录(保留系统提示词)
st.session_state.messages = []
# 清理上传文件(释放内存)
st.session_state.uploaded_files = []
# 重置对话指针(标记为新对话)
st.session_state.current_conv = None
# 文件处理函数
def process_uploaded_files(uploaded_files):
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
for file in uploaded_files:
# 检查文件是否已存在(通过唯一file_id校验)
# 使用any()进行存在性检查,避免重复处理相同文件
if any(f["id"] == file.file_id for f in st.session_state.uploaded_files):
continue # 跳过已处理文件
# 验证文件大小
if file.size > MAX_FILE_SIZE:
st.warning(f"文件 {file.name} 超过{MAX_FILE_SIZE//1024//1024}MB限制,已跳过")
continue
# 初始化文件内容容器
file_content = ""
try:
if file.type == "application/pdf":
# 使用PyPDF2解析PDF文本
pdf_reader = PyPDF2.PdfReader(file)
# 逐页提取文本并拼接(过滤空页)
file_content = "\n".join([
page.extract_text()
for page in pdf_reader.pages
if page.extract_text().strip()
])
elif file.type == "text/plain":
# 读取字节流并解码为UTF-8
# 注意:可能需处理其他编码格式(如GBK)
file_content = str(file.read(), "utf-8")
elif file.type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
# 使用python-docx解析.docx格式
doc = docx.Document(file)
# 提取所有段落文本
file_content = "\n".join([
para.text
for para in doc.paragraphs
if para.text.strip()
])
elif file.type.startswith("image/"):
# 使用PIL加载图片
image = Image.open(file)
# 使用pytesseract进行OCR识别(需安装Tesseract)
# 建议配置语言参数:lang='chi_sim+eng'
file_content = pytesseract.image_to_string(image)
# 构建文件信息字典(限制内容长度)
st.session_state.uploaded_files.append({
"id": file.file_id, # Streamlit生成的唯一文件ID
"name": file.name, # 原始文件名
"content": file_content[:5000] # 截断前5000字符防止内存溢出
})
except Exception as e:
# 异常处理(捕获所有可能错误)
st.error(f"处理文件 {file.name} 时出错: {str(e)}")
# 联网搜索功能
def perform_web_search(query):
try:
# 搜索引擎API
search_api = "https://search-for-llmapi.dawne.cn/with-search/v1"
params = {
"q": query, # 用户搜索词
"api_key": st.session_state.api_key,
"limit": 3 # 限制返回结果数
}
# 发送GET请求到搜索API
response = requests.get(search_api, params=params)
# 解析JSON响应并提取结果,默认空列表防止KeyError
results = response.json().get("results", [])
# 格式化搜索结果为Markdown
formatted_results = []
for result in results[:3]: # 取前3个结果
formatted_results.append(f"""
### {result['title']}
**来源**: {result['source']}
{result['snippet'][:200]}... # 截取摘要前200字符
""")
# 存储结果到会话状态
st.session_state.search_results = formatted_results
# 返回拼接后的Markdown文本
return "\n\n".join(formatted_results)
except Exception as e:
# 通用异常捕获并显示错误
st.error(f"搜索失败: {str(e)}")
return ""
# 生成增强提示
def build_enhanced_prompt(user_input):
# 初始化基础提示(直接使用用户输入)
prompt = user_input # 保留原始问题的完整性
# 添加文件上下文
if st.session_state.uploaded_files:
# 生成结构化文件上下文信息
# 格式示例:
# 【文件 年度报告.pdf 内容】
# <文件内容文本>
file_context = "\n".join(
[f"【文件 {f['name']} 内容】\n{f['content']}" # 每个文件单独标记
for f in st.session_state.uploaded_files] # 遍历所有已上传文件
)
# 将文件上下文插入到提示顶部,保持上下文可见性
# 新格式:
# [文件上下文]
# [原始问题]
prompt = f"{file_context}\n\n基于以上资料,请回答:{prompt}" # 使用空行分隔上下文和问题
# 添加搜索结果
if st.session_state.web_search:
# 执行网络搜索(假设返回的是结构化文本)
search_context = perform_web_search(user_input) # 自定义搜索函数
# 将搜索结果附加到提示底部
# 格式示例:
# [原始问题]
# [网络搜索结果]
prompt = f"{prompt}\n\n【网络搜索结果】\n{search_context}" # 追加搜索结果
return prompt # 最终结构:文件上下文 + 原始问题 + 搜索结果
# 不同模式配置
def get_mode_settings():
modes = {
"标准模式": {
"model": "deepseek-ai/DeepSeek-V3",
"temperature": 0.7,
"max_tokens": 2000
},
"深度思考": {
"model": "deepseek-ai/DeepSeek-R1",
"temperature": 0.3,
"max_tokens": 4000,
"thinking_depth": "deep"
},
"创意模式": {
"model": "deepseek-ai/DeepSeek-V3",
"temperature": 1.0,
"max_tokens": 1500
},
"代码专家": {
"model": "deepseek-ai/DeepSeek-V3",
"temperature": 0.2,
"max_tokens": 3000
}
}
return modes[st.session_state.current_mode]
# 主聊天界面
def main_interface():
st.title("AI Chat Assistant")
st.markdown("---")
# 显示聊天记录
for msg in st.session_state.messages:
# 确定消息角色(用户/助手)
role = "assistant" if msg["role"] == "assistant" else "user"
# 设置角色头像
avatar = "👾" if role == "user" else "👽"
# 创建带样式的消息容器
with st.chat_message(role, avatar=avatar): # 使用Streamlit的聊天消息组件
# 处理文件类型消息
if msg.get("type") == "file": # 检查是否存在特殊消息类型
# 显示文件上传提示
st.markdown(f"📎 **已上传文件**: {msg['content']}") # 使用加粗显示文件名
else:
# 显示普通文本消息
st.markdown(msg["content"]) # 渲染纯文本或Markdown内容
# 处理用户实时输入
if prompt := st.chat_input("输入消息..."): # 海象运算符获取输入框内容
# 调用输入处理函数
handle_user_input(prompt) # 将用户输入传递给处理逻辑
# 处理用户输入
def handle_user_input(prompt):
# 构建增强提示
enhanced_prompt = build_enhanced_prompt(prompt)
# 添加用户消息
st.session_state.messages.append({"role": "user", "content": prompt})
# 显示处理状态
with st.status("正在处理...", expanded=True) as status:
# 创建助手消息容器
with st.chat_message("assistant",avatar = "👽"):
response_placeholder = st.empty() # 创建空白占位符
full_response = "" # 初始化完整响应
try:
# 获取模式配置
settings = get_mode_settings()
# 创建API客户端
client = OpenAI(
api_key=st.session_state.api_key, # 从会话状态获取API密钥
base_url="https://api.siliconflow.cn/v1" # 指定私有化部署端点
)
# 构建消息列表
messages = [
{"role": "system", "content": st.session_state.system_prompt}, # 系统级指令
*st.session_state.messages, # 解包历史消息(不含当前输入)
{"role": "user", "content": enhanced_prompt} # 当前增强版输入
]
# 流式响应处理
for chunk in client.chat.completions.create(
model=settings["model"], # 模型选择
messages=messages, # 组合后的消息列表
stream=True, # 启用流式传输
temperature=settings["temperature"], # 创意度参数 (0~2)
max_tokens=settings["max_tokens"] # 最大输出长度
):
# 提取增量内容
if chunk.choices[0].delta.content:
full_response += chunk.choices[0].delta.content # 拼接响应片段
# 更新实时显示(带打字光标效果)
response_placeholder.markdown(full_response + "▌")
# 显示最终结果
response_placeholder.markdown(full_response) # 移除光标符号
status.update(label="处理完成", state="complete")
# 保存消息记录
st.session_state.messages.append({
"role": "assistant",
"content": full_response,
"metadata": { # 附加元数据
"mode": st.session_state.current_mode, # 当前模式
"files": [f["name"] for f in st.session_state.uploaded_files], # 使用文件
"search_used": st.session_state.web_search # 是否启用搜索
}
})
except Exception as e:
response_placeholder.error(f"请求失败: {str(e)}")
status.update(label="处理出错", state="error")
# 主程序
if __name__ == "__main__":
inject_custom_css()
init_session_state()
render_sidebar()
main_interface()
该程序为基础版本,未做所有功能的测试,未知bug肯定存在,大家使用过程中一定注意。其中互联网搜索功能存在问题,程序中api无法使用。API Key放在了.streamlit 文件夹的 secrets.toml 中,用户也可以在界面上输入 API key。该demo利用 session_state 记录历史对话,没有加入数据库,所以重新加载就会失效。
希望本文对大家有帮助,上文若有不妥之处,欢迎指正
分享决定高度,学习拉开差距
更多推荐
所有评论(0)