基于Python与Claude AI的智能邮件分拣系统:从原理到Docker部署
邮件自动化处理是提升工作效率的关键技术,其核心在于通过程序化方式管理信息流。传统方法依赖规则过滤,而现代AI技术则能理解语义上下文。基于大语言模型的智能分拣系统,通过自然语言处理技术解析邮件内容,实现意图识别与分类,其技术价值在于将人工处理转化为自动化工作流。在工程实践中,Python因其丰富的库生态成为首选开发语言,结合Claude AI的API可实现高质量的文本理解与生成。Docker容器化技
1. 项目概述与核心价值
如果你每天都被淹没在成百上千封邮件里,分不清哪些是紧急待办,哪些是垃圾广告,哪些又需要你花上半小时来构思回复,那么这个项目可能就是你的救星。 email-triage-openclaw 是一个基于人工智能的邮件智能分拣与处理代理,它像一位不知疲倦的私人助理,帮你自动扫描 Gmail 收件箱,对邮件进行分类、草拟回复,并在遇到紧急邮件时及时发出警报。整个系统由 Python 驱动,利用 Claude AI 的强大语言理解能力,并通过 Docker 封装,确保部署过程简单、环境一致。这不仅仅是一个脚本,而是一个完整的、生产就绪的自动化解决方案,旨在将你从繁琐的邮件处理工作中解放出来,让你能专注于真正重要的事情。
这个项目的核心价值在于其“智能”与“自动化”的结合。传统的邮件规则(Filter)只能基于简单的关键词或发件人进行过滤,而 email-triage-openclaw 则能理解邮件的上下文和意图。它能判断一封邮件是“销售推广”、“团队内部讨论”、“客户支持请求”还是“老板的紧急指令”,并据此采取不同的行动。对于需要回复的邮件,它甚至能根据你的风格和邮件内容,生成一份高质量的回复草稿,你只需要稍作修改即可发送。这种级别的自动化,在过去需要复杂的商业软件或定制开发才能实现,而现在,通过这个开源项目,任何有一定技术背景的用户都可以搭建属于自己的智能邮件处理中心。
2. 技术栈选型与架构解析
2.1 为什么选择 Python + Claude AI + Docker?
这个技术栈的选择体现了现代 AI 应用开发的典型思路:用成熟的语言快速构建业务逻辑,接入顶尖的 AI 服务实现核心智能,再用容器化技术保证部署的便捷性和一致性。
Python 是首选。它在数据处理、网络请求和快速原型开发方面有着无与伦比的优势。处理邮件需要与 Gmail API 交互、解析邮件内容(HTML/纯文本)、处理附件,这些任务 Python 都有非常成熟的库(如 google-api-python-client , beautifulsoup4 , email 标准库)支持。同时,Python 也是与各类 AI 模型 API 对接最友好的语言,生态丰富。
Claude AI (Anthropic) 作为核心的 LLM(大语言模型)被引入,是项目“智能”的源泉。相比于其他模型,Claude 在长文本理解、指令遵循和安全性方面表现突出。邮件内容可能很长且结构复杂,Claude 能很好地提取关键信息、理解用户意图。它的 API 设计清晰,响应格式稳定,非常适合用于需要结构化输出的分类和生成任务。当然,项目架构上应该设计为可插拔的,理论上也可以替换为 OpenAI 的 GPT 系列或其他兼容 API 的模型。
Docker 是项目能“开箱即用”的关键。一个 AI 应用依赖复杂,可能包括特定版本的 Python、一系列 pip 包、系统工具以及环境变量。Docker 通过容器镜像将所有这些依赖打包,确保在任何支持 Docker 的机器上(无论是你的本地开发机、云服务器还是 NAS),运行效果都是一致的。这极大降低了用户的部署门槛,你不需要再为“在我机器上是好的”这种问题烦恼。
2.2 系统架构与工作流程
整个系统可以看作一个由事件驱动的数据处理管道。其核心工作流程如下:
- 触发与抓取 :系统通过定时任务(例如,使用
cron或schedule库)定期触发,或者通过 Gmail 的推送通知(更实时)来启动。随后,它使用 OAuth 2.0 认证的 Gmail API 安全地访问指定邮箱的收件箱。 - 预处理与提取 :抓取到原始邮件数据(通常是 RFC 2822 格式)后,系统会进行预处理。这包括解析发件人、收件人、主题、日期等头部信息,提取邮件正文(优先纯文本,若无则解析 HTML),并可能处理附件(如保存到本地或云存储以供 AI 分析)。
- AI 智能处理 :这是核心环节。预处理后的邮件内容(包括主题和正文)被构造成一个精心设计的提示词(Prompt),发送给 Claude AI API。这个 Prompt 会指示模型完成多项任务:
- 分类 :将邮件归入预设的类别,如
urgent(紧急)、action_required(需处理)、informational(通知类)、newsletter(订阅)、spam(垃圾)等。 - 摘要/关键信息提取 :对于长邮件,生成简短摘要。
- 情感/意图分析 :判断邮件是否包含投诉、咨询、赞扬等。
- 草稿生成 :对于需要回复的邮件(如客户咨询),根据邮件上下文和预设的“你的身份”(例如,“技术支持工程师张三”),生成一份礼貌、专业、切题的回复草稿。
- 分类 :将邮件归入预设的类别,如
- 后处理与执行 :收到 AI 的响应后,系统根据结果执行相应操作:
- 打标签/移动邮件 :通过 Gmail API 为邮件添加相应的标签(Label),或将其移动到特定文件夹(如“待处理”、“已分类/订阅”)。
- 保存草稿 :将 AI 生成的回复内容,连同原邮件引用信息,通过 Gmail API 创建为一封新的草稿邮件。这样你可以在 Gmail 网页版或客户端中直接查看、编辑并发送。
- 发送警报 :如果邮件被分类为
urgent,系统会通过配置的警报渠道(如 Slack Webhook、Telegram Bot、电子邮件或短信 API)向你发送一条即时通知。
- 日志与状态管理 :所有操作都应有详细的日志记录,包括处理了哪封邮件、AI 的分类结果、执行了何种操作以及是否成功。这对于调试和后期优化至关重要。系统还应具备一定的状态管理能力,例如记录已处理的邮件 ID,避免在下次运行时重复处理。
注意:安全与隐私是第一要务 。整个流程中,你的邮件数据会从 Google 服务器发送到你部署此应用的服务器,再发送至 Anthropic 的 API 服务器。你必须确保:1) 使用安全的 OAuth 2.0 流程,不存储明文密码;2) 部署服务器的网络和访问安全;3) 了解并认可 Anthropic 的 API 数据处理政策。对于高度敏感的邮件,可以考虑在发送到 AI 前进行局部脱敏(如替换人名、账号)。
3. 环境准备与详细配置
3.1 前置条件与账号准备
在写第一行代码之前,你需要准备好几个关键服务的访问权限。这就像组装一台机器前,要备齐所有螺丝和齿轮。
1. Google Cloud 项目与 Gmail API 启用:
- 访问 Google Cloud Console ,创建一个新项目(例如
email-triage-agent)。 - 在“API和服务”库中,搜索并启用 Gmail API 。
- 进入“API和服务” -> “凭据”,点击“创建凭据” -> “OAuth 客户端 ID”。
- 应用类型选择“桌面应用”(Desktop application),给它起个名字。创建后,你会下载到一个
credentials.json文件。这个文件包含了你的客户端 ID 和密钥,是程序与你的 Gmail 通信的“门票”。
2. Anthropic Claude API 密钥:
- 前往 Anthropic 控制台 ,注册或登录账号。
- 在账户设置中,找到创建 API 密钥的选项,生成一个新的密钥(Key)。妥善保存,因为它只显示一次。
3. (可选)警报渠道配置:
- Slack :创建一个 Slack App,并获取其 Webhook URL。
- Telegram :通过
@BotFather创建一个 Bot,获取它的HTTP API令牌。 - 将相应的 URL 或令牌保存好,后续需要写入配置。
3.2 Docker 环境与项目结构搭建
假设你已经在本机或服务器上安装好了 Docker 和 Docker Compose。接下来是组织你的项目代码。
一个清晰的项目结构有助于维护。我建议的目录结构如下:
email-triage-openclaw/
├── Dockerfile # 定义容器镜像的构建规则
├── docker-compose.yml # 定义服务、网络、卷的编排
├── requirements.txt # Python 依赖包列表
├── .env.example # 环境变量示例文件
├── src/ # 源代码目录
│ ├── __init__.py
│ ├── main.py # 主程序入口
│ ├── gmail_client.py # 封装 Gmail API 交互
│ ├── ai_processor.py # 封装 Claude AI 调用与提示工程
│ ├── action_executor.py # 执行打标签、存草稿、发警报
│ └── config.py # 配置加载与管理
├── auth/ # 存放 OAuth 令牌
│ └── token.pickle # (程序运行时自动生成)
├── logs/ # 日志目录(通过卷挂载)
│ └── app.log
└── README.md # 项目说明文档
Dockerfile 详解: 这是构建应用镜像的蓝图。一个高效且安全的 Dockerfile 很重要。
# 使用官方 Python 轻量级镜像作为基础
FROM python:3.11-slim
# 设置工作目录,后续命令都在此目录下执行
WORKDIR /app
# 安装系统依赖(例如,某些Python包可能需要编译工具)
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# 将依赖文件复制到容器中
COPY requirements.txt .
# 安装 Python 依赖,使用清华镜像加速(国内环境)
RUN pip install --no-cache-dir -i https://pypi.tuna.tsinghua.edu.cn/simple -r requirements.txt
# 将源代码复制到容器中
COPY src/ ./src/
# 创建一个非 root 用户来运行应用,增强安全性
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# 设置容器启动时执行的命令
CMD ["python", "src/main.py"]
docker-compose.yml 详解: 它定义了服务的运行方式,特别是如何管理敏感的环境变量和持久化数据。
version: '3.8'
services:
email-triage:
build: .
container_name: email_triage_agent
restart: unless-stopped # 容器退出时自动重启,除非手动停止
volumes:
# 挂载 auth 目录,持久化 OAuth 令牌,避免每次重启都重新授权
- ./auth:/app/auth:rw
# 挂载 logs 目录,方便在宿主机查看日志
- ./logs:/app/logs:rw
env_file:
# 从 .env 文件加载环境变量,这是管理密钥的最佳实践
- .env
# 可以设置定时任务,例如每5分钟运行一次
# 实际项目中,更推荐使用外部 cron 或 systemd timer 来调度 `docker-compose run`
.env.example 文件: 这是一个模板,你需要复制它为 .env 并填入真实值。 务必确保 .env 文件被添加到 .gitignore 中,切勿提交到版本库!
# Google OAuth 2.0 Credentials (从 credentials.json 中获取)
GOOGLE_CLIENT_ID=your_client_id_here.apps.googleusercontent.com
GOOGLE_CLIENT_SECRET=your_client_secret_here
# 可以指定处理哪个邮箱,不填则默认为授权用户的主邮箱
TARGET_EMAIL=your.email@gmail.com
# Anthropic Claude API
ANTHROPIC_API_KEY=sk-ant-your_api_key_here
# Alerting (Optional)
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/your/webhook/url
TELEGRAM_BOT_TOKEN=your_telegram_bot_token
TELEGRAM_CHAT_ID=your_telegram_chat_id
# Application Settings
LOG_LEVEL=INFO # DEBUG, INFO, WARNING, ERROR
CHECK_INTERVAL_MINUTES=5 # 检查邮件的间隔(如果使用循环)
3.3 Gmail API OAuth 2.0 授权流程实现
这是第一个技术难点。你需要让程序获得访问用户邮件的权限。我们使用 google-auth-oauthlib 和 google-auth-httplib2 库来完成。
在 src/gmail_client.py 中,初始化流程如下:
import os
import pickle
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
# 如果修改了 SCOPES,需要删除旧的 token.pickle 文件
SCOPES = ['https://www.googleapis.com/auth/gmail.modify'] # 可读、写邮件(打标签、存草稿),但不能发送
def get_gmail_service():
creds = None
# token.pickle 存储了用户的访问和刷新令牌
if os.path.exists('/app/auth/token.pickle'):
with open('/app/auth/token.pickle', 'rb') as token:
creds = pickle.load(token)
# 如果令牌不存在或已失效
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
# 刷新令牌
creds.refresh(Request())
else:
# 首次运行,启动本地服务器流程进行授权
flow = InstalledAppFlow.from_client_secrets_file(
'/app/auth/credentials.json', SCOPES)
# 注意:在 Docker 容器或无头服务器中,需要使用 `console` 流程
# flow = InstalledAppFlow.from_client_secrets_file(
# 'credentials.json', SCOPES)
# creds = flow.run_console() # 适用于服务器
creds = flow.run_local_server(port=8080, open_browser=False) # 适用于本地开发
# 保存令牌供下次使用
with open('/app/auth/token.pickle', 'wb') as token:
pickle.dump(creds, token)
# 构建 Gmail API 服务对象
service = build('gmail', 'v1', credentials=creds)
return service
实操心得:Docker 环境下的授权陷阱 在本地开发时,
run_local_server会弹出浏览器让你登录,这很方便。但在 Docker 容器或远程服务器(无图形界面)中,这行不通。你需要改用run_console方式。具体操作是:第一次在本地完成授权,生成token.pickle文件,然后将这个文件复制到服务器的auth/目录下。因为刷新令牌(refresh_token)包含在其中,只要应用保持活跃,它就能自动刷新访问令牌。这是部署到生产环境的关键一步。
4. 核心模块实现与 AI 提示工程
4.1 邮件抓取与预处理
获得 Gmail 服务对象后,就可以抓取邮件了。我们通常抓取未读邮件( is:unread )或特定时间段内的邮件。
def fetch_unread_emails(service, max_results=50):
try:
# 搜索查询,这里找未读邮件
results = service.users().messages().list(
userId='me',
labelIds=['INBOX'],
q='is:unread',
maxResults=max_results
).execute()
messages = results.get('messages', [])
email_details = []
for msg in messages:
msg_id = msg['id']
# 获取邮件的完整详情,包括头部和正文
message = service.users().messages().get(
userId='me', id=msg_id, format='full').execute()
email_details.append(parse_email_message(message))
return email_details
except Exception as error:
print(f'An error occurred: {error}')
return []
def parse_email_message(message):
"""解析 Gmail API 返回的复杂邮件结构,提取我们需要的信息"""
headers = message['payload']['headers']
subject = next((h['value'] for h in headers if h['name'] == 'Subject'), '(No Subject)')
sender = next((h['value'] for h in headers if h['name'] == 'From'), 'Unknown')
date = next((h['value'] for h in headers if h['name'] == 'Date'), '')
msg_id = message['id']
thread_id = message['threadId']
# 提取邮件正文(纯文本优先)
body = ''
if 'parts' in message['payload']:
# 多部分邮件(如带附件)
for part in message['payload']['parts']:
if part['mimeType'] == 'text/plain':
if 'data' in part['body']:
import base64
body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8')
break
else:
# 单部分邮件
if message['payload']['mimeType'] == 'text/plain' and 'data' in message['payload']['body']:
import base64
body = base64.urlsafe_b64decode(message['payload']['body']['data']).decode('utf-8')
# 如果没有纯文本,尝试解析 HTML
elif message['payload']['mimeType'] == 'text/html' and 'data' in message['payload']['body']:
import base64
html_body = base64.urlsafe_b64decode(message['payload']['body']['data']).decode('utf-8')
# 简单去除 HTML 标签,生产环境可用 BeautifulSoup
import re
body = re.sub('<[^<]+?>', '', html_body)
# 截断过长的正文,避免超出 AI Token 限制
max_body_length = 8000
if len(body) > max_body_length:
body = body[:max_body_length] + '... [内容已截断]'
return {
'id': msg_id,
'thread_id': thread_id,
'subject': subject,
'from': sender,
'date': date,
'snippet': message.get('snippet', ''),
'body': body
}
4.2 AI 处理器与提示词设计
这是项目的“大脑”。 src/ai_processor.py 负责与 Claude API 对话。提示词(Prompt)的设计直接决定了分类和生成的质量。
首先,安装 Anthropic 的官方 Python SDK: pip install anthropic 。
import anthropic
from src.config import settings # 假设 config.py 从环境变量加载了 ANTHROPIC_API_KEY
class AIProcessor:
def __init__(self):
self.client = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY)
# 根据你的需求选择合适的模型,claude-3-haiku-20240307 速度快成本低,适合分类
self.model = "claude-3-haiku-20240307"
# 对于生成回复草稿,可以考虑使用更强大的模型,如 claude-3-sonnet-20240229
self.model_for_draft = "claude-3-sonnet-20240229"
def triage_email(self, email_subject, email_body, email_from):
"""对邮件进行分类和关键信息提取"""
prompt = f"""请你扮演一个专业的邮件助理,对以下邮件进行分析。
邮件来自:{email_from}
邮件主题:{email_subject}
邮件正文:
{email_body}
请严格按照以下 JSON 格式输出分析结果,不要输出任何其他解释性文字。
{{
"category": "urgent", // 分类,必须是以下之一:urgent(紧急,需立即关注)、action_required(需要你采取行动,但非立即)、informational(通知/信息,仅需知悉)、newsletter(订阅/推广邮件)、spam(垃圾邮件)、other(其他)
"confidence": 0.95, // 你对分类的置信度,0-1之间的小数
"summary": "邮件的核心内容摘要,不超过100字。",
"requires_reply": true, // 这封邮件是否需要回复?布尔值
"sentiment": "neutral", // 邮件的情感倾向:positive, negative, neutral, mixed
"key_points": ["关键点1", "关键点2"] // 从邮件中提取的2-5个关键点,用于快速浏览
}}
"""
try:
response = self.client.messages.create(
model=self.model,
max_tokens=500,
temperature=0.1, # 低温度保证输出稳定,符合格式要求
messages=[{"role": "user", "content": prompt}]
)
# 解析返回的 JSON
import json
result = json.loads(response.content[0].text)
return result
except Exception as e:
print(f"AI 处理失败: {e}")
# 返回一个安全的默认结果
return {
"category": "other",
"confidence": 0.0,
"summary": "AI分析失败。",
"requires_reply": False,
"sentiment": "neutral",
"key_points": []
}
def draft_reply(self, original_email, your_role="一位专业且友好的同事"):
"""根据原邮件草拟回复"""
prompt = f"""请你扮演{your_role},针对以下邮件草拟一份回复。
原邮件主题:{original_email['subject']}
原邮件发件人:{original_email['from']}
原邮件正文:
{original_email['body']}
请草拟一份回复。要求:
1. 语气专业、礼貌、切题。
2. 直接针对原邮件中的问题或内容进行回应。
3. 如果原邮件中有多个问题,请逐一回应。
4. 在回复开头,使用“Hi [对方名字],”或“您好,”等适当的称呼(请从发件人信息中推断名字)。
5. 在回复结尾,使用“Best regards,”或“谢谢,”等适当的结束语。
6. 如果有些信息你需要我(收件人)来补充,请在回复中用“【待补充】”标出。
7. 输出纯文本,不要使用 Markdown 格式。
请直接输出回复的正文内容,不要加“回复:”这样的前缀。"""
try:
response = self.client.messages.create(
model=self.model_for_draft,
max_tokens=1000,
temperature=0.7, # 稍高的温度让回复更有创造性
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text.strip()
except Exception as e:
print(f"草拟回复失败: {e}")
return None
提示工程心得:结构化输出与温度参数
- 结构化输出 :通过要求 AI 返回严格的 JSON 格式,我们可以像调用普通函数一样使用 AI 的结果,极大地简化了后续的代码逻辑。这是构建可靠 AI 应用的关键技巧。
- 温度(Temperature) :
temperature参数控制输出的随机性。对于分类任务,我们设为较低的值(如 0.1),以确保相同的输入总是得到相同(或极其相似)的分类结果,保证稳定性。对于生成回复草稿,可以设为稍高的值(如 0.7),让回复更具变化和人情味。- 角色扮演 :在提示词中让 AI “扮演”某个角色,能有效引导其生成符合特定风格和语境的文本。
4.3 执行器:将 AI 决策转化为行动
AI 给出了判断,现在需要执行。 src/action_executor.py 模块负责与 Gmail API 和外部警报系统交互。
import base64
from email.mime.text import MIMEText
from googleapiclient.errors import HttpError
class ActionExecutor:
def __init__(self, gmail_service):
self.service = gmail_service
def add_label(self, email_msg_id, label_name):
"""为邮件添加标签(如果标签不存在则创建)"""
# 首先,获取或创建标签
label_id = self._get_or_create_label(label_name)
if not label_id:
return False
# 修改邮件,添加标签
try:
self.service.users().messages().modify(
userId='me',
id=email_msg_id,
body={'addLabelIds': [label_id]}
).execute()
print(f"已为邮件 {email_msg_id} 添加标签 '{label_name}'")
return True
except HttpError as error:
print(f'为邮件添加标签时出错: {error}')
return False
def _get_or_create_label(self, label_name):
"""内部方法:根据标签名获取其ID,不存在则创建"""
try:
results = self.service.users().labels().list(userId='me').execute()
labels = results.get('labels', [])
for label in labels:
if label['name'].lower() == label_name.lower():
return label['id']
# 标签不存在,创建它
label_body = {'name': label_name, 'labelListVisibility': 'labelShow', 'messageListVisibility': 'show'}
created_label = self.service.users().labels().create(userId='me', body=label_body).execute()
return created_label['id']
except HttpError as error:
print(f'处理标签时出错: {error}')
return None
def create_draft_reply(self, original_email_msg_id, draft_body):
"""创建一封回复原邮件的草稿"""
try:
# 1. 获取原邮件信息,用于构建回复的头部
original_msg = self.service.users().messages().get(userId='me', id=original_email_msg_id, format='metadata').execute()
headers = original_msg['payload']['headers']
original_subject = next((h['value'] for h in headers if h['name'] == 'Subject'), '')
original_from = next((h['value'] for h in headers if h['name'] == 'From'), '')
original_to = next((h['value'] for h in headers if h['name'] == 'To'), '')
# 2. 构建回复邮件的 MIME 消息
# 提取发件人邮箱地址,用于构建“回复给”
reply_to = original_from
# 构建主题,通常添加“Re: ”
reply_subject = original_subject
if not reply_subject.startswith('Re:'):
reply_subject = f'Re: {reply_subject}'
# 创建邮件内容
message = MIMEText(draft_body, 'plain', 'utf-8')
message['to'] = reply_to
message['subject'] = reply_subject
# In-Reply-To 和 References 头有助于邮件客户端正确组织对话线程
message['In-Reply-To'] = f'<{original_email_msg_id}@mail.gmail.com>'
message['References'] = f'<{original_email_msg_id}@mail.gmail.com>'
# 3. 将 MIME 消息编码为 base64url 格式
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8')
draft_body = {'message': {'raw': raw_message, 'threadId': original_msg['threadId']}}
# 4. 调用 API 创建草稿
draft = self.service.users().drafts().create(userId='me', body=draft_body).execute()
print(f"已为邮件 {original_email_msg_id} 创建回复草稿,草稿ID: {draft['id']}")
return draft['id']
except HttpError as error:
print(f'创建草稿时出错: {error}')
return None
def send_alert(self, email_info, ai_result, alert_channel='log'):
"""根据配置发送警报"""
alert_message = f"🚨 紧急邮件警报!\n发件人:{email_info['from']}\n主题:{email_info['subject']}\nAI分类:{ai_result['category']}\n摘要:{ai_result['summary']}"
if alert_channel == 'slack' and settings.SLACK_WEBHOOK_URL:
self._send_slack_alert(alert_message)
elif alert_channel == 'telegram' and settings.TELEGRAM_BOT_TOKEN:
self._send_telegram_alert(alert_message)
else:
# 默认记录到日志
print(alert_message)
# 这里可以集成更强大的日志系统,如 logging 模块写入文件
def _send_slack_alert(self, message):
import requests
try:
payload = {'text': message}
response = requests.post(settings.SLACK_WEBHOOK_URL, json=payload)
response.raise_for_status()
except Exception as e:
print(f"发送 Slack 警报失败: {e}")
def _send_telegram_alert(self, message):
import requests
try:
url = f"https://api.telegram.org/bot{settings.TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {'chat_id': settings.TELEGRAM_CHAT_ID, 'text': message}
response = requests.post(url, json=payload)
response.raise_for_status()
except Exception as e:
print(f"发送 Telegram 警报失败: {e}")
5. 主程序流程与调度整合
现在,我们将所有模块串联起来,形成完整的工作流。 src/main.py 是程序的入口。
import time
import logging
from src.gmail_client import get_gmail_service, fetch_unread_emails
from src.ai_processor import AIProcessor
from src.action_executor import ActionExecutor
from src.config import settings
# 配置日志
logging.basicConfig(
level=getattr(logging, settings.LOG_LEVEL),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/app/logs/app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def main_processing_loop():
"""主处理循环"""
logger.info("启动邮件智能分拣代理...")
# 初始化各模块
gmail_service = get_gmail_service()
ai_processor = AIProcessor()
action_executor = ActionExecutor(gmail_service)
while True:
try:
logger.info("开始检查新邮件...")
emails = fetch_unread_emails(gmail_service, max_results=20) # 每次处理最多20封,避免超限
if not emails:
logger.info("没有发现未读邮件。")
else:
logger.info(f"发现 {len(emails)} 封未读邮件,开始处理。")
for email in emails:
logger.info(f"处理邮件: {email['subject'][:50]}...")
# 步骤1: AI 分析
ai_result = ai_processor.triage_email(email['subject'], email['body'], email['from'])
logger.info(f"AI 分析结果: 分类={ai_result['category']}, 置信度={ai_result['confidence']:.2f}")
# 步骤2: 根据分类执行动作
category = ai_result['category']
# 为所有邮件添加分类标签
action_executor.add_label(email['id'], f"AI_{category.upper()}")
if category == 'urgent':
# 紧急邮件:发送警报
action_executor.send_alert(email, ai_result, alert_channel=settings.ALERT_CHANNEL)
# 也可以额外打上 URGENT 标签
action_executor.add_label(email['id'], "URGENT")
logger.warning(f"已标记并警报紧急邮件: {email['subject']}")
if ai_result.get('requires_reply', False) and category not in ['spam', 'newsletter']:
# 需要回复的邮件:生成草稿
draft_body = ai_processor.draft_reply(email, your_role=settings.YOUR_ROLE)
if draft_body:
draft_id = action_executor.create_draft_reply(email['id'], draft_body)
if draft_id:
logger.info(f"已为邮件创建回复草稿。")
# 可以打上 NEEDS_REVIEW 标签
action_executor.add_label(email['id'], "NEEDS_REVIEW")
# 步骤3: (可选)将邮件标记为已读,避免下次重复处理
# self.service.users().messages().modify(userId='me', id=email['id'], body={'removeLabelIds': ['UNREAD']}).execute()
logger.info(f"邮件处理完成: {email['subject'][:30]}...")
# 短暂暂停,避免对 API 请求过于频繁
time.sleep(1)
logger.info(f"本轮处理完成。等待 {settings.CHECK_INTERVAL_MINUTES} 分钟后再次检查。")
time.sleep(settings.CHECK_INTERVAL_MINUTES * 60)
except KeyboardInterrupt:
logger.info("收到中断信号,程序退出。")
break
except Exception as e:
logger.error(f"主循环发生未预期错误: {e}", exc_info=True)
# 发生错误时等待更长时间再重试
time.sleep(300)
if __name__ == '__main__':
main_processing_loop()
6. 部署、运行与监控
6.1 构建与运行
一切就绪后,通过 Docker Compose 来运行是最简单的方式。
# 1. 复制环境变量模板并填写你的真实密钥
cp .env.example .env
# 使用文本编辑器编辑 .env 文件,填入所有必要的 KEY 和 ID
# 2. 将你的 Google credentials.json 文件放到 auth/ 目录下
# 假设你已经从 Google Cloud Console 下载了它
mv ~/Downloads/credentials.json ./auth/
# 3. 构建 Docker 镜像
docker-compose build
# 4. 首次运行(进行 OAuth 授权)
# 如果你在本地开发(有浏览器):
docker-compose up
# 此时控制台会输出一个授权 URL,复制到浏览器打开,登录并授权。
# 授权后,token.pickle 会保存在 ./auth 目录下。
# 5. 后台运行
# 授权完成后,可以停止容器 (Ctrl+C),然后以后台模式运行:
docker-compose up -d
# 查看日志
docker-compose logs -f email-triage
6.2 生产环境部署建议
对于 7x24 小时运行的生产环境,有几点需要优化:
-
使用外部调度器 :
main.py中的while True循环和sleep是一种简单实现,但不健壮。推荐使用系统的定时任务来触发。- Linux (cron) :在宿主机上设置 cron 任务,每5分钟运行一次容器命令。
# 编辑 crontab -e */5 * * * * cd /path/to/email-triage-openclaw && /usr/local/bin/docker-compose run --rm email-triage python src/main.py --single-run >> /path/to/logs/cron.log 2>&1你需要修改
main.py,增加一个--single-run参数,让程序处理一次邮件后就退出。- Docker 内部 (Supercronic) :在 Docker 容器内安装一个轻量级的 cron 实现,如 Supercronic。
-
健康检查与监控 :在
docker-compose.yml中添加健康检查指令,并配合 Prometheus、Grafana 或简单的 uptime robot 来监控服务是否存活。healthcheck: test: ["CMD", "python", "-c", "import sys; sys.exit(0)"] # 一个简单的检查,实际可以写一个检查API interval: 60s timeout: 10s retries: 3 start_period: 30s -
日志管理 :将日志从文件输出转向更专业的系统,如 Docker 的
json-file或syslog驱动,或者使用Fluentd、Loki等日志收集工具。 -
密钥管理 :对于生产环境,将密钥放在
.env文件中可能不够安全。可以考虑使用 Docker Secrets(在 Swarm 中)、云服务商的密钥管理服务(如 AWS Secrets Manager, GCP Secret Manager)或 HashiCorp Vault。
6.3 常见问题与排查技巧
即使按照步骤操作,你也可能会遇到一些问题。这里记录了一些常见的坑和解决方法。
问题1:Docker 构建时 pip 安装超时或失败。
- 原因 :网络连接问题或 PyPI 镜像不稳定。
- 解决 :修改
Dockerfile中的 pip 安装命令,使用国内镜像源,如之前示例中使用的清华源-i https://pypi.tuna.tsinghua.edu.cn/simple。也可以使用阿里云、腾讯云等镜像。
问题2:首次运行时,OAuth 流程在容器内无法打开浏览器。
- 原因 :
run_local_server需要交互式浏览器,这在无头环境的容器中不可用。 - 解决 :
- 方案A(推荐) :在本地开发机先完成首次授权。
- 在本地(非Docker)运行一次授权流程,生成
token.pickle。 - 将本地生成的
token.pickle和credentials.json一起复制到服务器的auth/目录。 - 在服务器上直接运行 Docker 容器,此时会使用已有的刷新令牌,无需再次交互。
- 在本地(非Docker)运行一次授权流程,生成
- 方案B :修改代码,在检测到无头环境时使用
run_console流程。这需要你手动复制控制台输出的授权 URL 到有浏览器的机器上完成授权,再将返回的代码粘贴回控制台。
- 方案A(推荐) :在本地开发机先完成首次授权。
问题3:AI 分类结果不准确或回复草稿质量不高。
- 原因 :提示词(Prompt)设计不够好,或者模型选择不当。
- 解决 :
- 迭代提示词 :这是提升效果最有效的方法。在你的提示词中加入更多例子(Few-shot Learning),更清晰地定义分类标准,或者为 AI 提供更多上下文(例如,“我是一个软件工程师,我的邮件主要关于代码审查、项目管理和技术讨论”)。
- 调整模型 :尝试更强大的模型,如从
claude-3-haiku切换到claude-3-sonnet或claude-3-opus,生成质量通常会显著提升,但成本和延迟也会增加。 - 后处理 :对 AI 返回的结果进行简单的后处理。例如,如果置信度低于某个阈值(如 0.6),则将邮件分类为
other或放入一个“待人工审核”的标签中。
问题4:处理大量邮件时,遇到 Gmail API 配额限制。
- 原因 :Gmail API 有每日用量配额(免费用户通常足够个人使用,但批量处理需注意)。
- 解决 :
- 控制频率 :增加
CHECK_INTERVAL_MINUTES,不要过于频繁地检查。 - 分批处理 :在
fetch_unread_emails中控制max_results,每次只处理少量邮件。 - 实现退避机制 :在代码中捕获
HttpError,如果错误码是 429(太多请求)或 403(配额超限),则指数级增加等待时间后再重试。 - 监控配额 :在 Google Cloud Console 的“配额”页面查看 Gmail API 的使用情况。
- 控制频率 :增加
问题5:日志文件过大,占满磁盘。
- 原因 :程序持续运行,日志不断累积。
- 解决 :使用日志轮转(Log Rotation)。可以配置 Python 的
logging.handlers.RotatingFileHandler或TimedRotatingFileHandler,也可以使用 Docker 的日志驱动选项(如docker-compose.yml中设置logging选项的max-size和max-file),或者直接在宿主机使用logrotate工具来管理日志目录。
这个项目就像给你的邮箱装上了一位 AI 副驾驶。初期搭建和调试可能需要一些耐心,特别是处理 OAuth 授权和调试提示词。但一旦它稳定运行起来,你会发现每天早上的收件箱不再令人焦虑,重要的邮件被高亮,琐碎的订阅被自动归类,甚至一些常规咨询的回复初稿已经为你准备好。你可以根据自己的需求不断迭代它,例如增加对特定项目邮件的自动归档、与任务管理工具(如 Trello, Jira)联动创建工单等。记住,所有的自动化都是为了更好地服务于人,定期检查一下“NEEDS_REVIEW”标签下的 AI 草稿,既能保证沟通质量,也是你优化 AI 助手的好机会。
更多推荐



所有评论(0)