flask+deepseek电商推荐系统
flask+deepseek电商推荐系统
·
flask+deepseek电商推荐系统
创建项目
升级pip
python -m pip install --upgrade pip
模拟数据
数据字段说明:
字段 | 说明 | 示例值 |
---|---|---|
user_id | 用户ID(1-10000) | 4567 |
item_id | 商品ID(1-5000) | 3210 |
action_type | 行为类型(浏览/购买/收藏) | ‘purchase’ |
timestamp | 行为时间戳 | 2024-03-15 14:30 |
item_category | 商品类别 | ‘electronics’ |
item_price | 商品价格(元) | 299 |
user_age | 用户年龄 | 28 |
user_gender | 用户性别 | ‘female’ |
user_region | 用户地区(A-E) | ‘C’ |
安装依赖
pip install numpy pandas
代码
mock.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# 生成2024年随机时间戳(修复类型错误)
def random_timestamp(start_date, end_date, n):
time_range = end_date - start_date
random_days = np.random.randint(0, time_range.days + 1, n)
# 将NumPy整数转换为Python整数
random_days = [int(days) for days in random_days]
random_times = [start_date + timedelta(days=days) for days in random_days]
return pd.to_datetime(random_times)
# 生成模拟数据
def generate_ecommerce_data(n=100000):
data = {
'user_id': np.random.randint(1, 10001, n), # 1-10000用户
'item_id': np.random.randint(1, 5001, n), # 1-5000商品
'action_type': np.random.choice(['view', 'purchase', 'favorite'], n, p=[0.6, 0.2, 0.2]),
'timestamp': random_timestamp(datetime(2024, 1, 1), datetime(2024, 12, 31), n),
'item_category': np.random.choice(['electronics', 'clothing', 'home', 'books', 'beauty'], n),
'item_price': np.random.normal(150, 100, n).clip(10, 1000).astype(int),
'user_age': np.random.randint(18, 65, n),
'user_gender': np.random.choice(['male', 'female', 'other'], n),
'user_region': np.random.choice(['A', 'B', 'C', 'D', 'E'], n)
}
df = pd.DataFrame(data)
df['hour'] = df['timestamp'].dt.hour # 提取小时特征
df['weekday'] = df['timestamp'].dt.weekday # 提取星期特征
df['month'] = df['timestamp'].dt.month # 提取月份特征
return df
# 生成数据并保存
if __name__ == "__main__":
n = 100000 # 数据量
df = generate_ecommerce_data(n)
df.to_csv('ecommerce_behavior_2024.csv', index=False)
print(f"生成数据成功,共{n}条记录")
解析deekseek响应数据
thinking_and_response.py
# 定义一个函数,用于提取思考过程和推理结果
def extract_langchain_thinking_and_response(data):
"""
提取langchain思考过程和推理结果。
参数:
data (dict): Ollama 返回的 JSON 数据。
返回:
tuple: 包含思考过程和推理结果的元组。
"""
# 初始化思考过程和推理结果变量
thinking_process = ""
response_content = ""
# 检查是否包含思考过程
if "<think>" in data and "</think>" in data:
# 提取思考过程,去除标签
start = data.find("<think>") + len("<think>") # 找到 "<think>" 标签的起始位置
end = data.find("</think>") # 找到 "</think>" 标签的结束位置
thinking_process = data[start:end].strip() # 提取标签内的内容并去除首尾空格
# 提取推理结果(即消息的其余部分)
response_content = data.split("</think>")[-1].strip() # 从 "</think>" 标签之后的内容中提取推理结果
# 返回思考过程和推理结果
return thinking_process, response_content
# 定义一个函数,用于提取思考过程和推理结果
def extract_chat_thinking_and_response(data):
"""
提取对话交互思考过程和推理结果。
参数:
data (dict): Ollama 返回的 JSON 数据。
返回:
tuple: 包含思考过程和推理结果的元组。
"""
# 初始化思考过程和推理结果变量
thinking_process = ""
response_content = ""
# 检查返回数据中是否包含消息内容
if "message" in data and "content" in data["message"]:
# 提取消息内容
content = data["message"]["content"]
# 检查是否包含思考过程
if "<think>" in content and "</think>" in content:
# 提取思考过程,去除标签
start = content.find("<think>") + len("<think>") # 找到 "<think>" 标签的起始位置
end = content.find("</think>") # 找到 "</think>" 标签的结束位置
thinking_process = content[start:end].strip() # 提取标签内的内容并去除首尾空格
# 提取推理结果(即消息的其余部分)
response_content = content.split("</think>")[-1].strip() # 从 "</think>" 标签之后的内容中提取推理结果
# 返回思考过程和推理结果
return thinking_process, response_content
# 定义一个函数,用于提取思考过程和推理结果
def extract_generate_thinking_and_response(data):
"""
提取文本生成思考过程和推理结果。
参数:
data (dict): Ollama 返回的 JSON 数据。
返回:
tuple: 包含思考过程和推理结果的元组。
"""
# 初始化思考过程和推理结果变量
thinking_process = ""
response_content = ""
# 检查返回数据中是否包含响应内容
if "response" in data:
# 提取响应内容
response = data["response"]
# 检查是否包含思考过程
if "<think>" in response and "</think>" in response:
# 提取思考过程,去除标签
start = response.find("<think>") + len("<think>")
end = response.find("</think>")
thinking_process = response[start:end].strip()
# 提取推理结果(即响应的其余部分)
response_content = response.split("</think>")[-1].strip()
# 返回思考过程和推理结果
return thinking_process, response_content
import re
def extract_ai_chat(response_data):
"""从DeepSeek API响应中提取并格式化推理结果"""
try:
# 从响应数据中提取模型返回的原始内容
content = response_data["message"]["content"]
# 使用正则表达式移除<think>标签及其包含的所有内容(非贪婪模式)
cleaned_content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL).strip()
# 使用正则表达式按###标题分割内容为多个章节
sections = re.split(r'###\s*', cleaned_content)
# 提取主体内容(第一个部分)
main_content = sections[0].strip()
chapters = [] # 用于存储各章节内容的列表
# 处理每个章节(从第二个部分开始)
for section in sections[1:]:
if not section.strip(): # 跳过空部分
continue
# 分割章节标题和内容(按第一个换行符分割)
lines = section.split('\n', 1)
if len(lines) == 2: # 确保有标题和内容
title, content = lines
chapters.append({ # 将章节信息添加到列表
"title": title.strip(), # 章节标题
"content": content.strip() # 章节内容
})
# 返回结构化的提取结果
return {
"main_content": main_content, # 主体内容
"chapters": chapters # 章节列表
}
except Exception as e:
print(f"提取内容时出错: {e}") # 处理异常
return None
def extract_ai_generate(response_data):
"""从DeepSeek API响应中提取并格式化推理结果"""
try:
# 从响应数据中提取模型返回的原始内容
content = response_data["response"]
# 使用正则表达式移除<think>标签及其包含的所有内容(非贪婪模式)
cleaned_content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL).strip()
# 使用正则表达式按###标题分割内容为多个章节
sections = re.split(r'###\s*', cleaned_content)
# 提取主体内容(第一个部分)
main_content = sections[0].strip()
chapters = [] # 用于存储各章节内容的列表
# 处理每个章节(从第二个部分开始)
for section in sections[1:]:
if not section.strip(): # 跳过空部分
continue
# 分割章节标题和内容(按第一个换行符分割)
lines = section.split('\n', 1)
if len(lines) == 2: # 确保有标题和内容
title, content = lines
chapters.append({ # 将章节信息添加到列表
"title": title.strip(), # 章节标题
"content": content.strip() # 章节内容
})
# 返回结构化的提取结果
return {
"main_content": main_content, # 主体内容
"chapters": chapters # 章节列表
}
except Exception as e:
print(f"提取内容时出错: {e}") # 处理异常
return None
def format_ai_history(extracted_data):
"""将提取的内容格式化为易读的文本"""
if not extracted_data: # 检查提取数据是否有效
return "未能提取有效内容"
# 初始化格式化文本,先添加主体内容
formatted = extracted_data["main_content"] + "\n\n"
# 遍历每个章节,按序号和标题格式添加
for i, chapter in enumerate(extracted_data["chapters"], 1):
formatted += f"### {chapter['title']}\n{chapter['content']}\n\n"
return formatted # 返回格式化后的完整文本
def save_to_file(content, filename="ai_history.md"):
"""将内容保存到文件"""
try:
# 以写入模式打开文件(自动创建或覆盖)
with open(filename, 'w', encoding='utf-8') as file:
file.write(content) # 写入内容
print(f"内容已保存到 {filename}") # 提示保存成功
except Exception as e:
print(f"保存文件时出错: {e}") # 处理保存异常
Flask服务
安装依赖库
pip install --upgrade requests scikit-learn flask
代码
app.py
import json
from flask import Flask, request, jsonify, Response
import pandas as pd
import requests
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from thinking_and_response import extract_generate_thinking_and_response
# 初始化Flask应用
app = Flask(__name__)
app.config['JSON_AS_ASCII'] = False # 解决中文Unicode转义问题
DATA_PATH = 'ecommerce_behavior_2024.csv' # 数据文件路径
OLLAMA_URL = 'http://localhost:11434/api/generate' # Ollama API地址
# 推荐系统核心类,负责数据处理和推荐算法实现
class RecommendationSystem:
def __init__(self):
# 加载电商行为数据
self.df = pd.read_csv(DATA_PATH)
# 构建用户-商品交互矩阵
self.user_item_matrix = self._build_user_item_matrix()
# 提取商品特征向量及索引映射
self.item_features, self.item_id_to_index = self._extract_item_features()
def _build_user_item_matrix(self):
"""构建用户-商品交互矩阵,根据行为类型赋予不同权重"""
interaction = self.df.groupby(['user_id', 'item_id'])['action_type']
return interaction.apply(lambda x: 3 if 'purchase' in x else 2 if 'favorite' in x else 1).unstack(fill_value=0)
def _extract_item_features(self):
"""提取商品特征并建立item_id到索引的映射"""
unique_items = self.df.drop_duplicates('item_id')[['item_id', 'item_category', 'item_price']]
unique_items['feature'] = unique_items.apply(
lambda x: f"{x['item_category']} price:{x['item_price']}", axis=1
)
tfidf = TfidfVectorizer()
item_features = tfidf.fit_transform(unique_items['feature'])
# 建立商品ID到特征矩阵索引的映射
item_id_to_index = {item_id: idx for idx, item_id in enumerate(unique_items['item_id'])}
return item_features, item_id_to_index
def user_based_recommendation(self, user_id, top_n=10):
"""基于用户的协同过滤推荐算法"""
user_vector = self.user_item_matrix.loc[user_id]
similarity = self.user_item_matrix.corrwith(user_vector).dropna()
similar_users = similarity.sort_values(ascending=False).index[1:11] # 取前10相似用户
recommended_items = self.user_item_matrix.loc[similar_users].sum().sort_values(ascending=False).index
return recommended_items[:top_n].tolist()
def content_based_recommendation(self, item_id, top_n=10):
"""基于商品内容的推荐算法(修正索引匹配)"""
if item_id not in self.item_id_to_index:
return [] # 商品不存在时返回空列表
idx = self.item_id_to_index[item_id] # 获取正确索引
sim_scores = cosine_similarity(
self.item_features[idx], # 目标商品特征(一维向量)
self.item_features # 所有商品特征
).flatten()
# 排除自身,取前top_n个相关商品
related_indices = sim_scores.argsort()[:-top_n - 1:-1][1:] # 跳过第一个元素(自身相似度1)
related_item_ids = [list(self.item_id_to_index.keys())[i] for i in related_indices]
return related_item_ids[:top_n]
def call_deepseek_analysis(self, query):
"""调用DeepSeek模型分析用户行为"""
payload = {
"model": "deepseek-r1:1.5b",
"prompt": f"分析电商用户行为:{query}",
"stream": False # 关闭流式响应,获取完整JSON
}
try:
response = requests.post(OLLAMA_URL, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
thinking_process, response_content = extract_generate_thinking_and_response(data)
return response_content
except Exception as e:
print(f"DeepSeek调用失败: {str(e)}")
return "模型分析失败"
# 初始化推荐系统实例
rec_sys = RecommendationSystem()
# 定义API接口,处理推荐请求
@app.route('/api/recommend', methods=['GET'])
def get_recommendations():
user_id = int(request.args.get('user_id', 0))
item_id = int(request.args.get('item_id', 0))
if user_id > 0:
cf_recommendations = rec_sys.user_based_recommendation(user_id)
analysis = rec_sys.call_deepseek_analysis(f"用户{user_id}购买频率和时间段")
json_str = json.dumps({
"user_based_recommendations": cf_recommendations,
"analysis": analysis
}, ensure_ascii=False)
return Response(json_str, content_type='application/json; charset=utf-8')
elif item_id > 0:
content_recommendations = rec_sys.content_based_recommendation(item_id)
analysis = rec_sys.call_deepseek_analysis(f"商品{item_id}的用户偏好特征")
json_str = json.dumps({
"content_based_recommendations": content_recommendations,
"analysis": analysis
}, ensure_ascii=False)
return Response(json_str, content_type='application/json; charset=utf-8')
else:
return Response(
json.dumps({"error": "需提供user_id或item_id"}, ensure_ascii=False),
content_type='application/json; charset=utf-8')
# 启动Flask应用
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000, debug=True)
测试接口
# 用户推荐示例
curl http://localhost:5000/api/recommend?user_id=123
# 商品推荐示例
curl http://localhost:5000/api/recommend?item_id=456
用户推荐返回结果示例
{
"user_based_recommendations": [820, 330, 1946, 508, 1832, 566, 1578, 512, 1610, 2218],
"analysis": "为了有效分析电商用户123的购买频率和时间段,并制定相应的运营策略,可以按照以下步骤进行:\n\n### 1. 收集数据\n- **历史订单记录**:获取用户123过去一段时间内的所有订单信息,包括订单日期和时间、商品详情等。\n- **用户行为日志**:分析用户的浏览、点击和加购行为,了解其在网站或应用上的活动模式。\n\n### 2. 分析购买频率\n- **计算购买间隔**:确定两次购买之间的时间间隔,判断是否存在固定周期(如每周一次)。\n- **时间段分布**:统计用户在一天中的不同时间段内的购买次数,观察是否有高峰时段。\n- **季节性分析**:检查购买行为是否受季节影响,比如节假日或特定商品类别需求增加时的购买频率变化。\n\n### 3. 购买时间段分析\n- **小时分布**:分析每个小时的购买情况,找出高峰期(如晚上8点至10点)。\n- **星期分布**:确定用户在一周中的哪几天更活跃,例如周末或工作日特定时间。\n- **设备使用情况**:查看用户是否倾向于使用手机、电脑或其他设备进行购物,这可能影响其购物时间选择。\n\n### 4. 用户行为类型\n- **冲动性购买 vs 计划性购买**:通过分析用户的浏览和加购路径,判断其购买决策的类型。\n- **商品类别偏好**:确定用户在特定时间段内倾向于购买哪些商品类别,从而调整库存和促销策略。\n\n### 5. 制定运营策略\n- **精准营销**:根据购买高峰时段发送个性化推荐或优惠信息,提升转化率。\n- **优化用户体验**:针对高流量时段增加服务器响应速度,确保用户在高峰期获得良好的购物体验。\n- **会员与忠诚度计划**:设计奖励机制,鼓励用户在特定时间段内增加购买频率。\n\n### 6. 结果应用\n- **库存管理**:根据用户的购买高峰调整库存水平,确保热门商品在关键时段有足够的供应。\n- **促销活动安排**:选择用户活跃的时间段推出促销,吸引更多的购买行为。\n- **用户体验提升**:优化网站或应用的导航和推荐系统,特别是在用户高活跃时间段内。\n\n### 7. 监测与调整\n- **持续跟踪**:定期更新用户的购买数据,监控其行为变化,并根据新数据调整策略。\n- **A/B测试**:尝试不同的促销时间和营销方式,评估哪种方法最有效。\n\n通过以上步骤,可以全面了解用户123的购买模式,并据此制定有效的电商运营策略,提升销售和用户体验。"
}
商品推荐返回结果示例
{
"content_based_recommendations": [456, 3976, 171, 675, 1327, 1905, 4717, 2183, 2296],
"analysis": "要全面分析电商平台上商品456的用户偏好特征,可以从以下几个方面进行深入研究:\n\n### 1. 用户人口统计学(Demographics)\n- **年龄**:确定目标用户的年龄段,例如是年轻人、中年人还是老年人。\n- **性别**:了解用户群体的主要性别分布。\n- **职业**:分析不同职业的用户对商品的需求差异。\n- **收入水平**:评估用户的购买力和消费习惯。\n\n### 2. 用户地理位置(Geographic Location)\n- 确定主要用户的地理分布,包括国家、地区或城市,以优化库存管理和物流策略。\n\n### 3. 用户行为模式(Behavioral Patterns)\n- **浏览行为**:分析用户如何浏览商品页面,如停留时间、跳出率。\n- **点击行为**:研究用户点击哪些产品或信息,评估兴趣点。\n- **购买行为**:考察用户的下单频率、加购行为和转化率。\n\n### 4. 购买历史与频率(Purchase History and Frequency)\n- 分析用户对商品的重复购买情况,识别忠诚客户和一次性购买者。\n- 探索用户的交叉购买行为,找出常一起购买的商品。\n\n### 5. 用户偏好特征\n- **品牌偏好**:确定用户偏好的品牌及其背后的原因。\n- **产品特性偏好**:分析用户对颜色、尺寸、材质等功能的喜好。\n- **价格敏感度**:评估价格变化对销售的影响,识别价格弹性。\n- **渠道偏好**:了解用户更喜欢通过哪些平台或渠道购买。\n\n### 6. 用户反馈与评价(Feedback and Reviews)\n- 整理用户的正面和负面评价,提取关键词和情感倾向,识别产品优缺点。\n\n### 数据收集方法\n- **埋点技术**:记录用户在网站或应用中的行为数据。\n- **问卷调查**:通过调查收集用户的主观偏好信息。\n- **社交媒体分析**:监控用户对商品的评论、点赞和分享情况。\n\n### 分析方法\n- **统计分析**:描述性统计,了解用户的基本特征分布。\n- **聚类分析**:识别不同的用户群体,如高消费群或价格敏感群。\n- **机器学习模型**:使用协同过滤推荐系统,预测用户的购买行为。\n- **自然语言处理(NLP)**:分析用户评价中的情感和关键词。\n\n### 实施步骤\n1. **数据收集**:整合埋点、问卷和社交媒体的数据源。\n2. **数据分析**:应用统计和机器学习方法进行深入分析。\n3. **结果解读**:根据分析结果制定针对性的营销策略,优化产品和服务。\n\n通过以上步骤,可以全面了解商品456的用户偏好特征,从而提升用户的购买体验和满意度,促进销售增长。"
}
更多推荐
所有评论(0)