flask+deepseek电商推荐系统

创建项目

pycharm创建项目

升级pip

python -m pip install --upgrade pip

模拟数据

数据字段说明:
字段 说明 示例值
user_id 用户ID(1-10000) 4567
item_id 商品ID(1-5000) 3210
action_type 行为类型(浏览/购买/收藏) ‘purchase’
timestamp 行为时间戳 2024-03-15 14:30
item_category 商品类别 ‘electronics’
item_price 商品价格(元) 299
user_age 用户年龄 28
user_gender 用户性别 ‘female’
user_region 用户地区(A-E) ‘C’
安装依赖
pip install numpy pandas
代码

mock.py

import pandas as pd
import numpy as np
from datetime import datetime, timedelta


# 生成2024年随机时间戳(修复类型错误)
def random_timestamp(start_date, end_date, n):
  time_range = end_date - start_date
  random_days = np.random.randint(0, time_range.days + 1, n)
  # 将NumPy整数转换为Python整数
  random_days = [int(days) for days in random_days]
  random_times = [start_date + timedelta(days=days) for days in random_days]
  return pd.to_datetime(random_times)


# 生成模拟数据
def generate_ecommerce_data(n=100000):
  data = {
    'user_id': np.random.randint(1, 10001, n),  # 1-10000用户
    'item_id': np.random.randint(1, 5001, n),  # 1-5000商品
    'action_type': np.random.choice(['view', 'purchase', 'favorite'], n, p=[0.6, 0.2, 0.2]),
    'timestamp': random_timestamp(datetime(2024, 1, 1), datetime(2024, 12, 31), n),
    'item_category': np.random.choice(['electronics', 'clothing', 'home', 'books', 'beauty'], n),
    'item_price': np.random.normal(150, 100, n).clip(10, 1000).astype(int),
    'user_age': np.random.randint(18, 65, n),
    'user_gender': np.random.choice(['male', 'female', 'other'], n),
    'user_region': np.random.choice(['A', 'B', 'C', 'D', 'E'], n)
  }
  df = pd.DataFrame(data)
  df['hour'] = df['timestamp'].dt.hour  # 提取小时特征
  df['weekday'] = df['timestamp'].dt.weekday  # 提取星期特征
  df['month'] = df['timestamp'].dt.month  # 提取月份特征
  return df


# 生成数据并保存
if __name__ == "__main__":
  n = 100000  # 数据量
  df = generate_ecommerce_data(n)
  df.to_csv('ecommerce_behavior_2024.csv', index=False)
  print(f"生成数据成功,共{n}条记录")

解析deekseek响应数据

thinking_and_response.py

# 定义一个函数,用于提取思考过程和推理结果
def extract_langchain_thinking_and_response(data):
  """
    提取langchain思考过程和推理结果。

    参数:
        data (dict): Ollama 返回的 JSON 数据。

    返回:
        tuple: 包含思考过程和推理结果的元组。
  """
  # 初始化思考过程和推理结果变量
  thinking_process = ""
  response_content = ""

  # 检查是否包含思考过程
  if "<think>" in data and "</think>" in data:
    # 提取思考过程,去除标签
    start = data.find("<think>") + len("<think>")  # 找到 "<think>" 标签的起始位置
    end = data.find("</think>")  # 找到 "</think>" 标签的结束位置
    thinking_process = data[start:end].strip()  # 提取标签内的内容并去除首尾空格

    # 提取推理结果(即消息的其余部分)
    response_content = data.split("</think>")[-1].strip()  # 从 "</think>" 标签之后的内容中提取推理结果
  # 返回思考过程和推理结果
  return thinking_process, response_content


# 定义一个函数,用于提取思考过程和推理结果
def extract_chat_thinking_and_response(data):
  """
  提取对话交互思考过程和推理结果。

  参数:
      data (dict): Ollama 返回的 JSON 数据。

  返回:
      tuple: 包含思考过程和推理结果的元组。
  """
  # 初始化思考过程和推理结果变量
  thinking_process = ""
  response_content = ""

  # 检查返回数据中是否包含消息内容
  if "message" in data and "content" in data["message"]:
    # 提取消息内容
    content = data["message"]["content"]

    # 检查是否包含思考过程
    if "<think>" in content and "</think>" in content:
      # 提取思考过程,去除标签
      start = content.find("<think>") + len("<think>")  # 找到 "<think>" 标签的起始位置
      end = content.find("</think>")  # 找到 "</think>" 标签的结束位置
      thinking_process = content[start:end].strip()  # 提取标签内的内容并去除首尾空格

    # 提取推理结果(即消息的其余部分)
    response_content = content.split("</think>")[-1].strip()  # 从 "</think>" 标签之后的内容中提取推理结果

  # 返回思考过程和推理结果
  return thinking_process, response_content


# 定义一个函数,用于提取思考过程和推理结果
def extract_generate_thinking_and_response(data):
  """
  提取文本生成思考过程和推理结果。

  参数:
      data (dict): Ollama 返回的 JSON 数据。

  返回:
      tuple: 包含思考过程和推理结果的元组。
  """
  # 初始化思考过程和推理结果变量
  thinking_process = ""
  response_content = ""

  # 检查返回数据中是否包含响应内容
  if "response" in data:
    # 提取响应内容
    response = data["response"]

    # 检查是否包含思考过程
    if "<think>" in response and "</think>" in response:
      # 提取思考过程,去除标签
      start = response.find("<think>") + len("<think>")
      end = response.find("</think>")
      thinking_process = response[start:end].strip()

    # 提取推理结果(即响应的其余部分)
    response_content = response.split("</think>")[-1].strip()

  # 返回思考过程和推理结果
  return thinking_process, response_content


import re


def extract_ai_chat(response_data):
  """从DeepSeek API响应中提取并格式化推理结果"""
  try:
    # 从响应数据中提取模型返回的原始内容
    content = response_data["message"]["content"]

    # 使用正则表达式移除<think>标签及其包含的所有内容(非贪婪模式)
    cleaned_content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL).strip()

    # 使用正则表达式按###标题分割内容为多个章节
    sections = re.split(r'###\s*', cleaned_content)

    # 提取主体内容(第一个部分)
    main_content = sections[0].strip()
    chapters = []  # 用于存储各章节内容的列表

    # 处理每个章节(从第二个部分开始)
    for section in sections[1:]:
      if not section.strip():  # 跳过空部分
        continue
      # 分割章节标题和内容(按第一个换行符分割)
      lines = section.split('\n', 1)
      if len(lines) == 2:  # 确保有标题和内容
        title, content = lines
        chapters.append({  # 将章节信息添加到列表
          "title": title.strip(),  # 章节标题
          "content": content.strip()  # 章节内容
        })

    # 返回结构化的提取结果
    return {
      "main_content": main_content,  # 主体内容
      "chapters": chapters  # 章节列表
    }
  except Exception as e:
    print(f"提取内容时出错: {e}")  # 处理异常
    return None


def extract_ai_generate(response_data):
  """从DeepSeek API响应中提取并格式化推理结果"""
  try:
    # 从响应数据中提取模型返回的原始内容
    content = response_data["response"]

    # 使用正则表达式移除<think>标签及其包含的所有内容(非贪婪模式)
    cleaned_content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL).strip()

    # 使用正则表达式按###标题分割内容为多个章节
    sections = re.split(r'###\s*', cleaned_content)

    # 提取主体内容(第一个部分)
    main_content = sections[0].strip()
    chapters = []  # 用于存储各章节内容的列表

    # 处理每个章节(从第二个部分开始)
    for section in sections[1:]:
      if not section.strip():  # 跳过空部分
        continue
      # 分割章节标题和内容(按第一个换行符分割)
      lines = section.split('\n', 1)
      if len(lines) == 2:  # 确保有标题和内容
        title, content = lines
        chapters.append({  # 将章节信息添加到列表
          "title": title.strip(),  # 章节标题
          "content": content.strip()  # 章节内容
        })

    # 返回结构化的提取结果
    return {
      "main_content": main_content,  # 主体内容
      "chapters": chapters  # 章节列表
    }
  except Exception as e:
    print(f"提取内容时出错: {e}")  # 处理异常
    return None


def format_ai_history(extracted_data):
  """将提取的内容格式化为易读的文本"""
  if not extracted_data:  # 检查提取数据是否有效
    return "未能提取有效内容"

  # 初始化格式化文本,先添加主体内容
  formatted = extracted_data["main_content"] + "\n\n"

  # 遍历每个章节,按序号和标题格式添加
  for i, chapter in enumerate(extracted_data["chapters"], 1):
    formatted += f"### {chapter['title']}\n{chapter['content']}\n\n"

  return formatted  # 返回格式化后的完整文本


def save_to_file(content, filename="ai_history.md"):
  """将内容保存到文件"""
  try:
    # 以写入模式打开文件(自动创建或覆盖)
    with open(filename, 'w', encoding='utf-8') as file:
      file.write(content)  # 写入内容
    print(f"内容已保存到 {filename}")  # 提示保存成功
  except Exception as e:
    print(f"保存文件时出错: {e}")  # 处理保存异常

Flask服务

安装依赖库
pip install --upgrade requests scikit-learn flask
代码

app.py

import json

from flask import Flask, request, jsonify, Response
import pandas as pd
import requests
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from thinking_and_response import extract_generate_thinking_and_response

# 初始化Flask应用
app = Flask(__name__)
app.config['JSON_AS_ASCII'] = False  # 解决中文Unicode转义问题
DATA_PATH = 'ecommerce_behavior_2024.csv'  # 数据文件路径
OLLAMA_URL = 'http://localhost:11434/api/generate'  # Ollama API地址


# 推荐系统核心类,负责数据处理和推荐算法实现
class RecommendationSystem:
  def __init__(self):
    # 加载电商行为数据
    self.df = pd.read_csv(DATA_PATH)
    # 构建用户-商品交互矩阵
    self.user_item_matrix = self._build_user_item_matrix()
    # 提取商品特征向量及索引映射
    self.item_features, self.item_id_to_index = self._extract_item_features()

  def _build_user_item_matrix(self):
    """构建用户-商品交互矩阵,根据行为类型赋予不同权重"""
    interaction = self.df.groupby(['user_id', 'item_id'])['action_type']
    return interaction.apply(lambda x: 3 if 'purchase' in x else 2 if 'favorite' in x else 1).unstack(fill_value=0)

  def _extract_item_features(self):
    """提取商品特征并建立item_id到索引的映射"""
    unique_items = self.df.drop_duplicates('item_id')[['item_id', 'item_category', 'item_price']]
    unique_items['feature'] = unique_items.apply(
      lambda x: f"{x['item_category']} price:{x['item_price']}", axis=1
    )
    tfidf = TfidfVectorizer()
    item_features = tfidf.fit_transform(unique_items['feature'])
    # 建立商品ID到特征矩阵索引的映射
    item_id_to_index = {item_id: idx for idx, item_id in enumerate(unique_items['item_id'])}
    return item_features, item_id_to_index

  def user_based_recommendation(self, user_id, top_n=10):
    """基于用户的协同过滤推荐算法"""
    user_vector = self.user_item_matrix.loc[user_id]
    similarity = self.user_item_matrix.corrwith(user_vector).dropna()
    similar_users = similarity.sort_values(ascending=False).index[1:11]  # 取前10相似用户
    recommended_items = self.user_item_matrix.loc[similar_users].sum().sort_values(ascending=False).index
    return recommended_items[:top_n].tolist()

  def content_based_recommendation(self, item_id, top_n=10):
    """基于商品内容的推荐算法(修正索引匹配)"""
    if item_id not in self.item_id_to_index:
      return []  # 商品不存在时返回空列表
    idx = self.item_id_to_index[item_id]  # 获取正确索引
    sim_scores = cosine_similarity(
      self.item_features[idx],  # 目标商品特征(一维向量)
      self.item_features  # 所有商品特征
    ).flatten()
    # 排除自身,取前top_n个相关商品
    related_indices = sim_scores.argsort()[:-top_n - 1:-1][1:]  # 跳过第一个元素(自身相似度1)
    related_item_ids = [list(self.item_id_to_index.keys())[i] for i in related_indices]
    return related_item_ids[:top_n]

  def call_deepseek_analysis(self, query):
    """调用DeepSeek模型分析用户行为"""
    payload = {
      "model": "deepseek-r1:1.5b",
      "prompt": f"分析电商用户行为:{query}",
      "stream": False  # 关闭流式响应,获取完整JSON
    }
    try:
      response = requests.post(OLLAMA_URL, json=payload, timeout=30)
      response.raise_for_status()
      data = response.json()
      thinking_process, response_content = extract_generate_thinking_and_response(data)
      return response_content
    except Exception as e:
      print(f"DeepSeek调用失败: {str(e)}")
      return "模型分析失败"


# 初始化推荐系统实例
rec_sys = RecommendationSystem()


# 定义API接口,处理推荐请求
@app.route('/api/recommend', methods=['GET'])
def get_recommendations():
  user_id = int(request.args.get('user_id', 0))
  item_id = int(request.args.get('item_id', 0))

  if user_id > 0:
    cf_recommendations = rec_sys.user_based_recommendation(user_id)
    analysis = rec_sys.call_deepseek_analysis(f"用户{user_id}购买频率和时间段")
    json_str = json.dumps({
      "user_based_recommendations": cf_recommendations,
      "analysis": analysis
    }, ensure_ascii=False)
    return Response(json_str, content_type='application/json; charset=utf-8')
  elif item_id > 0:
    content_recommendations = rec_sys.content_based_recommendation(item_id)
    analysis = rec_sys.call_deepseek_analysis(f"商品{item_id}的用户偏好特征")
    json_str = json.dumps({
      "content_based_recommendations": content_recommendations,
      "analysis": analysis
    }, ensure_ascii=False)
    return Response(json_str, content_type='application/json; charset=utf-8')
  else:
    return Response(
      json.dumps({"error": "需提供user_id或item_id"}, ensure_ascii=False),
      content_type='application/json; charset=utf-8')


# 启动Flask应用
if __name__ == "__main__":
  app.run(host='0.0.0.0', port=5000, debug=True)

测试接口

# 用户推荐示例
curl http://localhost:5000/api/recommend?user_id=123

# 商品推荐示例
curl http://localhost:5000/api/recommend?item_id=456
用户推荐返回结果示例
{
  "user_based_recommendations": [820, 330, 1946, 508, 1832, 566, 1578, 512, 1610, 2218],
  "analysis": "为了有效分析电商用户123的购买频率和时间段,并制定相应的运营策略,可以按照以下步骤进行:\n\n### 1. 收集数据\n- **历史订单记录**:获取用户123过去一段时间内的所有订单信息,包括订单日期和时间、商品详情等。\n- **用户行为日志**:分析用户的浏览、点击和加购行为,了解其在网站或应用上的活动模式。\n\n### 2. 分析购买频率\n- **计算购买间隔**:确定两次购买之间的时间间隔,判断是否存在固定周期(如每周一次)。\n- **时间段分布**:统计用户在一天中的不同时间段内的购买次数,观察是否有高峰时段。\n- **季节性分析**:检查购买行为是否受季节影响,比如节假日或特定商品类别需求增加时的购买频率变化。\n\n### 3. 购买时间段分析\n- **小时分布**:分析每个小时的购买情况,找出高峰期(如晚上8点至10点)。\n- **星期分布**:确定用户在一周中的哪几天更活跃,例如周末或工作日特定时间。\n- **设备使用情况**:查看用户是否倾向于使用手机、电脑或其他设备进行购物,这可能影响其购物时间选择。\n\n### 4. 用户行为类型\n- **冲动性购买 vs 计划性购买**:通过分析用户的浏览和加购路径,判断其购买决策的类型。\n- **商品类别偏好**:确定用户在特定时间段内倾向于购买哪些商品类别,从而调整库存和促销策略。\n\n### 5. 制定运营策略\n- **精准营销**:根据购买高峰时段发送个性化推荐或优惠信息,提升转化率。\n- **优化用户体验**:针对高流量时段增加服务器响应速度,确保用户在高峰期获得良好的购物体验。\n- **会员与忠诚度计划**:设计奖励机制,鼓励用户在特定时间段内增加购买频率。\n\n### 6. 结果应用\n- **库存管理**:根据用户的购买高峰调整库存水平,确保热门商品在关键时段有足够的供应。\n- **促销活动安排**:选择用户活跃的时间段推出促销,吸引更多的购买行为。\n- **用户体验提升**:优化网站或应用的导航和推荐系统,特别是在用户高活跃时间段内。\n\n### 7. 监测与调整\n- **持续跟踪**:定期更新用户的购买数据,监控其行为变化,并根据新数据调整策略。\n- **A/B测试**:尝试不同的促销时间和营销方式,评估哪种方法最有效。\n\n通过以上步骤,可以全面了解用户123的购买模式,并据此制定有效的电商运营策略,提升销售和用户体验。"
}
商品推荐返回结果示例
{
  "content_based_recommendations": [456, 3976, 171, 675, 1327, 1905, 4717, 2183, 2296],
  "analysis": "要全面分析电商平台上商品456的用户偏好特征,可以从以下几个方面进行深入研究:\n\n### 1. 用户人口统计学(Demographics)\n- **年龄**:确定目标用户的年龄段,例如是年轻人、中年人还是老年人。\n- **性别**:了解用户群体的主要性别分布。\n- **职业**:分析不同职业的用户对商品的需求差异。\n- **收入水平**:评估用户的购买力和消费习惯。\n\n### 2. 用户地理位置(Geographic Location)\n- 确定主要用户的地理分布,包括国家、地区或城市,以优化库存管理和物流策略。\n\n### 3. 用户行为模式(Behavioral Patterns)\n- **浏览行为**:分析用户如何浏览商品页面,如停留时间、跳出率。\n- **点击行为**:研究用户点击哪些产品或信息,评估兴趣点。\n- **购买行为**:考察用户的下单频率、加购行为和转化率。\n\n### 4. 购买历史与频率(Purchase History and Frequency)\n- 分析用户对商品的重复购买情况,识别忠诚客户和一次性购买者。\n- 探索用户的交叉购买行为,找出常一起购买的商品。\n\n### 5. 用户偏好特征\n- **品牌偏好**:确定用户偏好的品牌及其背后的原因。\n- **产品特性偏好**:分析用户对颜色、尺寸、材质等功能的喜好。\n- **价格敏感度**:评估价格变化对销售的影响,识别价格弹性。\n- **渠道偏好**:了解用户更喜欢通过哪些平台或渠道购买。\n\n### 6. 用户反馈与评价(Feedback and Reviews)\n- 整理用户的正面和负面评价,提取关键词和情感倾向,识别产品优缺点。\n\n### 数据收集方法\n- **埋点技术**:记录用户在网站或应用中的行为数据。\n- **问卷调查**:通过调查收集用户的主观偏好信息。\n- **社交媒体分析**:监控用户对商品的评论、点赞和分享情况。\n\n### 分析方法\n- **统计分析**:描述性统计,了解用户的基本特征分布。\n- **聚类分析**:识别不同的用户群体,如高消费群或价格敏感群。\n- **机器学习模型**:使用协同过滤推荐系统,预测用户的购买行为。\n- **自然语言处理(NLP)**:分析用户评价中的情感和关键词。\n\n### 实施步骤\n1. **数据收集**:整合埋点、问卷和社交媒体的数据源。\n2. **数据分析**:应用统计和机器学习方法进行深入分析。\n3. **结果解读**:根据分析结果制定针对性的营销策略,优化产品和服务。\n\n通过以上步骤,可以全面了解商品456的用户偏好特征,从而提升用户的购买体验和满意度,促进销售增长。"
}
Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐