【Dify精讲】第13章:监控与日志系统
完整的日志架构:从请求开始到结束的全链路日志追踪丰富的监控指标:覆盖业务、系统、基础设施各个层面智能的错误追踪:快速定位问题,减少故障修复时间可视化的监控大盘:直观展示系统健康状况及时的告警通知:主动发现问题,而不是被动等待用户投诉这些设计不仅适用于AI应用,对于任何需要高可用性的系统都有参考价值
作为一个在生产环境中运行了大量AI应用的老兵,我深知监控和日志系统的重要性。当凌晨3点被告警电话吵醒时,你最需要的就是清晰的日志和完善的监控指标来快速定位问题。
今天,让我们深入Dify的监控与日志系统,看看这个优秀的开源项目是如何构建可观测性体系的。相信我,这些设计思路对你的项目同样适用。
一、日志收集架构:结构化日志的艺术
1.1 多层次日志设计
翻开Dify的源码,你会发现它的日志系统设计相当精妙。不是简单的print调试,而是一个完整的结构化日志体系:
# api/core/app/app_config/common.py
import logging
from flask import current_app
class AppLogger:
def __init__(self, app_id: str, tenant_id: str):
self.app_id = app_id
self.tenant_id = tenant_id
self.logger = logging.getLogger(f"app.{app_id}")
def info(self, message: str, extra: dict = None):
"""结构化信息日志"""
log_data = {
'app_id': self.app_id,
'tenant_id': self.tenant_id,
'message': message,
'level': 'info'
}
if extra:
log_data.update(extra)
self.logger.info(message, extra=log_data)
def error(self, message: str, error: Exception = None, extra: dict = None):
"""结构化错误日志"""
log_data = {
'app_id': self.app_id,
'tenant_id': self.tenant_id,
'message': message,
'level': 'error'
}
if error:
log_data.update({
'error_type': error.__class__.__name__,
'error_message': str(error),
'stack_trace': traceback.format_exc()
})
if extra:
log_data.update(extra)
self.logger.error(message, extra=log_data, exc_info=error)
这种设计的妙处在于:
- 上下文信息自动注入:每条日志都携带app_id和tenant_id,方便后续筛选
- 结构化数据:JSON格式便于日志分析工具处理
- 分级记录:info、error等不同级别,生产环境可以按需调整
1.2 Workflow执行日志的精妙设计
Workflow作为Dify的核心功能,其日志设计尤其值得学习:
# api/core/workflow/graph_engine/graph_engine.py
class GraphEngine:
def __init__(self, tenant_id: str):
self.tenant_id = tenant_id
self.logger = logging.getLogger('workflow.engine')
def run(self, workflow: Workflow) -> WorkflowResult:
"""执行工作流,记录详细日志"""
execution_id = str(uuid.uuid4())
start_time = time.time()
# 执行开始日志
self.logger.info("Workflow execution started", extra={
'execution_id': execution_id,
'workflow_id': workflow.id,
'tenant_id': self.tenant_id,
'node_count': len(workflow.graph.nodes),
'event': 'workflow_start'
})
try:
# 节点执行日志
for node in workflow.graph.nodes:
node_start = time.time()
self.logger.info(f"Node {node.id} execution started", extra={
'execution_id': execution_id,
'node_id': node.id,
'node_type': node.node_type,
'event': 'node_start'
})
result = self._execute_node(node)
node_duration = time.time() - node_start
# 节点执行结果日志
self.logger.info(f"Node {node.id} execution completed", extra={
'execution_id': execution_id,
'node_id': node.id,
'node_type': node.node_type,
'duration': node_duration,
'status': 'success' if result.success else 'failed',
'token_usage': result.token_usage,
'event': 'node_complete'
})
except Exception as e:
# 执行失败日志
self.logger.error("Workflow execution failed", extra={
'execution_id': execution_id,
'workflow_id': workflow.id,
'error': str(e),
'duration': time.time() - start_time,
'event': 'workflow_failed'
}, exc_info=True)
raise
finally:
# 执行结束日志
total_duration = time.time() - start_time
self.logger.info("Workflow execution completed", extra={
'execution_id': execution_id,
'workflow_id': workflow.id,
'total_duration': total_duration,
'event': 'workflow_end'
})
1.3 API访问日志中间件
对于API服务,请求日志是必不可少的。Dify的实现很值得借鉴:
# api/libs/middleware.py
import time
from flask import request, g
from functools import wraps
def log_api_request(f):
"""API请求日志装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
start_time = time.time()
# 记录请求开始
request_id = str(uuid.uuid4())
g.request_id = request_id
logger.info("API request started", extra={
'request_id': request_id,
'method': request.method,
'path': request.path,
'remote_addr': request.remote_addr,
'user_agent': request.headers.get('User-Agent'),
'content_length': request.content_length,
'event': 'api_request_start'
})
try:
response = f(*args, **kwargs)
duration = time.time() - start_time
# 记录请求成功
logger.info("API request completed", extra={
'request_id': request_id,
'status_code': getattr(response, 'status_code', 200),
'duration': duration,
'event': 'api_request_success'
})
return response
except Exception as e:
duration = time.time() - start_time
# 记录请求失败
logger.error("API request failed", extra={
'request_id': request_id,
'error': str(e),
'duration': duration,
'event': 'api_request_error'
}, exc_info=True)
raise
return decorated_function
二、性能监控指标:数据驱动的洞察
2.1 核心业务指标监控
Dify定义了一套核心的业务指标,这些指标直接反映系统健康状况:
# api/core/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time
class DifyMetrics:
"""Dify核心监控指标"""
def __init__(self):
# 应用相关指标
self.app_conversations = Counter(
'dify_app_conversations_total',
'Total number of conversations',
['app_id', 'tenant_id']
)
self.app_messages = Counter(
'dify_app_messages_total',
'Total number of messages',
['app_id', 'tenant_id', 'message_type']
)
# 工作流相关指标
self.workflow_executions = Counter(
'dify_workflow_executions_total',
'Total workflow executions',
['workflow_id', 'status']
)
self.workflow_duration = Histogram(
'dify_workflow_duration_seconds',
'Workflow execution duration',
['workflow_id', 'status'],
buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]
)
# Token使用指标
self.token_usage = Counter(
'dify_token_usage_total',
'Total token usage',
['model_provider', 'model_name', 'usage_type']
)
# 当前活跃指标
self.active_conversations = Gauge(
'dify_active_conversations',
'Number of active conversations',
['app_id']
)
def record_conversation_start(self, app_id: str, tenant_id: str):
"""记录对话开始"""
self.app_conversations.labels(
app_id=app_id,
tenant_id=tenant_id
).inc()
self.active_conversations.labels(app_id=app_id).inc()
def record_message(self, app_id: str, tenant_id: str, message_type: str):
"""记录消息"""
self.app_messages.labels(
app_id=app_id,
tenant_id=tenant_id,
message_type=message_type
).inc()
def record_workflow_execution(self, workflow_id: str, duration: float, status: str):
"""记录工作流执行"""
self.workflow_executions.labels(
workflow_id=workflow_id,
status=status
).inc()
self.workflow_duration.labels(
workflow_id=workflow_id,
status=status
).observe(duration)
def record_token_usage(self, provider: str, model: str, usage_type: str, count: int):
"""记录Token使用"""
self.token_usage.labels(
model_provider=provider,
model_name=model,
usage_type=usage_type
).inc(count)
# 全局监控实例
metrics = DifyMetrics()
2.2 系统资源监控
除了业务指标,系统资源监控同样重要:
# api/core/monitoring/system_metrics.py
import psutil
import threading
import time
from prometheus_client import Gauge
class SystemMetrics:
"""系统资源监控"""
def __init__(self):
# CPU监控
self.cpu_usage = Gauge('system_cpu_usage_percent', 'CPU usage percentage')
# 内存监控
self.memory_usage = Gauge('system_memory_usage_bytes', 'Memory usage in bytes')
self.memory_usage_percent = Gauge('system_memory_usage_percent', 'Memory usage percentage')
# 磁盘监控
self.disk_usage = Gauge('system_disk_usage_bytes', 'Disk usage in bytes', ['path'])
self.disk_usage_percent = Gauge('system_disk_usage_percent', 'Disk usage percentage', ['path'])
# 网络监控
self.network_bytes_sent = Gauge('system_network_bytes_sent_total', 'Network bytes sent')
self.network_bytes_recv = Gauge('system_network_bytes_received_total', 'Network bytes received')
# 启动后台监控线程
self._start_monitoring()
def _start_monitoring(self):
"""启动监控线程"""
def monitor():
while True:
try:
# CPU监控
cpu_percent = psutil.cpu_percent(interval=1)
self.cpu_usage.set(cpu_percent)
# 内存监控
memory = psutil.virtual_memory()
self.memory_usage.set(memory.used)
self.memory_usage_percent.set(memory.percent)
# 磁盘监控
for disk in ['/']: # 可以配置多个挂载点
disk_usage = psutil.disk_usage(disk)
self.disk_usage.labels(path=disk).set(disk_usage.used)
self.disk_usage_percent.labels(path=disk).set(
(disk_usage.used / disk_usage.total) * 100
)
# 网络监控
network = psutil.net_io_counters()
self.network_bytes_sent.set(network.bytes_sent)
self.network_bytes_recv.set(network.bytes_recv)
except Exception as e:
logging.error(f"System metrics collection failed: {e}")
time.sleep(30) # 30秒采集一次
thread = threading.Thread(target=monitor, daemon=True)
thread.start()
# 初始化系统监控
system_metrics = SystemMetrics()
2.3 实时监控装饰器
为了让监控更容易集成,Dify使用了装饰器模式:
# api/core/monitoring/decorators.py
from functools import wraps
import time
from .metrics import metrics
def monitor_workflow_execution(f):
"""工作流执行监控装饰器"""
@wraps(f)
def decorated_function(workflow_id: str, *args, **kwargs):
start_time = time.time()
status = 'success'
try:
result = f(workflow_id, *args, **kwargs)
return result
except Exception as e:
status = 'failed'
raise
finally:
duration = time.time() - start_time
metrics.record_workflow_execution(
workflow_id=workflow_id,
duration=duration,
status=status
)
return decorated_function
def monitor_token_usage(provider: str, model: str):
"""Token使用监控装饰器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
result = f(*args, **kwargs)
# 从结果中提取token使用信息
if hasattr(result, 'usage') and result.usage:
metrics.record_token_usage(
provider=provider,
model=model,
usage_type='prompt_tokens',
count=result.usage.prompt_tokens
)
metrics.record_token_usage(
provider=provider,
model=model,
usage_type='completion_tokens',
count=result.usage.completion_tokens
)
return result
return decorated_function
return decorator
三、错误追踪机制:快速定位问题的利器
3.1 异常上下文保存
当错误发生时,上下文信息是定位问题的关键。Dify的实现很值得学习:
# api/core/error_tracking/context.py
import contextvars
from typing import Dict, Any
# 上下文变量定义
current_request_id = contextvars.ContextVar('request_id')
current_app_id = contextvars.ContextVar('app_id')
current_tenant_id = contextvars.ContextVar('tenant_id')
current_workflow_id = contextvars.ContextVar('workflow_id')
class ErrorContext:
"""错误上下文管理器"""
@staticmethod
def get_context() -> Dict[str, Any]:
"""获取当前错误上下文"""
context = {}
try:
context['request_id'] = current_request_id.get()
except LookupError:
pass
try:
context['app_id'] = current_app_id.get()
except LookupError:
pass
try:
context['tenant_id'] = current_tenant_id.get()
except LookupError:
pass
try:
context['workflow_id'] = current_workflow_id.get()
except LookupError:
pass
return context
@staticmethod
def set_request_context(request_id: str, app_id: str = None, tenant_id: str = None):
"""设置请求上下文"""
current_request_id.set(request_id)
if app_id:
current_app_id.set(app_id)
if tenant_id:
current_tenant_id.set(tenant_id)
@staticmethod
def set_workflow_context(workflow_id: str):
"""设置工作流上下文"""
current_workflow_id.set(workflow_id)
3.2 异常处理中心
统一的异常处理是错误追踪的核心:
# api/core/error_tracking/exception_handler.py
import traceback
import logging
from flask import Flask, jsonify, request
from .context import ErrorContext
class DifyExceptionHandler:
"""Dify异常处理中心"""
def __init__(self, app: Flask):
self.app = app
self.logger = logging.getLogger('exception_handler')
self._register_handlers()
def _register_handlers(self):
"""注册异常处理器"""
@self.app.errorhandler(Exception)
def handle_general_exception(error):
"""通用异常处理"""
error_id = str(uuid.uuid4())
context = ErrorContext.get_context()
# 记录详细错误信息
error_data = {
'error_id': error_id,
'error_type': error.__class__.__name__,
'error_message': str(error),
'stack_trace': traceback.format_exc(),
'request_method': request.method,
'request_path': request.path,
'request_args': dict(request.args),
'user_agent': request.headers.get('User-Agent'),
'remote_addr': request.remote_addr,
**context # 合并上下文信息
}
self.logger.error(
f"Unhandled exception: {error}",
extra=error_data,
exc_info=True
)
# 发送到错误追踪服务(如Sentry)
self._send_to_error_tracking(error_data)
return jsonify({
'error': 'Internal server error',
'error_id': error_id
}), 500
@self.app.errorhandler(ValidationError)
def handle_validation_error(error):
"""参数验证异常处理"""
context = ErrorContext.get_context()
self.logger.warning(
f"Validation error: {error}",
extra={
'error_type': 'ValidationError',
'error_details': error.messages,
'request_path': request.path,
**context
}
)
return jsonify({
'error': 'Validation failed',
'details': error.messages
}), 400
def _send_to_error_tracking(self, error_data: dict):
"""发送错误到追踪服务"""
try:
# 这里可以集成Sentry、Bugsnag等服务
# sentry_sdk.capture_exception(error_data)
pass
except Exception as e:
self.logger.error(f"Failed to send error to tracking service: {e}")
3.3 错误恢复机制
对于某些可恢复的错误,Dify实现了重试机制:
# api/core/error_tracking/retry.py
import time
import logging
from functools import wraps
from typing import Tuple, Type
def retry_on_failure(
max_retries: int = 3,
delay: float = 1.0,
backoff_factor: float = 2.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,)
):
"""错误重试装饰器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return f(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_retries:
# 最后一次尝试失败,记录详细日志
logging.error(
f"Function {f.__name__} failed after {max_retries} retries",
extra={
'function': f.__name__,
'attempts': max_retries + 1,
'last_error': str(e),
'args': str(args)[:200], # 限制日志大小
},
exc_info=True
)
raise
# 计算延迟时间
sleep_time = delay * (backoff_factor ** attempt)
logging.warning(
f"Function {f.__name__} failed on attempt {attempt + 1}, "
f"retrying in {sleep_time}s",
extra={
'function': f.__name__,
'attempt': attempt + 1,
'max_retries': max_retries,
'error': str(e),
'retry_delay': sleep_time
}
)
time.sleep(sleep_time)
# 理论上不应该到达这里
raise last_exception
return decorated_function
return decorator
# 使用示例
@retry_on_failure(max_retries=3, exceptions=(requests.RequestException, TimeoutError))
def call_external_api(url: str, data: dict):
"""调用外部API,支持重试"""
response = requests.post(url, json=data, timeout=30)
response.raise_for_status()
return response.json()
四、可观测性实践:构建完整的监控体系
4.1 健康检查端点
一个完善的监控体系需要健康检查:
# api/controllers/console/health.py
from flask import Blueprint, jsonify
from core.database import db
from core.redis import redis_client
from datetime import datetime
import psutil
health_bp = Blueprint('health', __name__)
@health_bp.route('/health', methods=['GET'])
def health_check():
"""健康检查端点"""
health_status = {
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'version': current_app.config.get('VERSION', 'unknown'),
'checks': {}
}
# 数据库连接检查
try:
db.engine.execute('SELECT 1')
health_status['checks']['database'] = {'status': 'healthy'}
except Exception as e:
health_status['checks']['database'] = {
'status': 'unhealthy',
'error': str(e)
}
health_status['status'] = 'unhealthy'
# Redis连接检查
try:
redis_client.ping()
health_status['checks']['redis'] = {'status': 'healthy'}
except Exception as e:
health_status['checks']['redis'] = {
'status': 'unhealthy',
'error': str(e)
}
health_status['status'] = 'unhealthy'
# 系统资源检查
try:
cpu_percent = psutil.cpu_percent()
memory = psutil.virtual_memory()
health_status['checks']['system'] = {
'status': 'healthy',
'cpu_usage': cpu_percent,
'memory_usage': memory.percent,
'memory_available': memory.available
}
# 资源使用率告警
if cpu_percent > 90 or memory.percent > 90:
health_status['checks']['system']['status'] = 'warning'
except Exception as e:
health_status['checks']['system'] = {
'status': 'unhealthy',
'error': str(e)
}
status_code = 200 if health_status['status'] == 'healthy' else 503
return jsonify(health_status), status_code
@health_bp.route('/metrics', methods=['GET'])
def prometheus_metrics():
"""Prometheus指标端点"""
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
return generate_latest(), 200, {'Content-Type': CONTENT_TYPE_LATEST}
4.2 日志配置最佳实践
生产环境的日志配置需要仔细调优:
# api/config.py
import logging
import os
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
class LogConfig:
"""日志配置类"""
@staticmethod
def setup_logging(app):
"""配置应用日志"""
if not app.debug:
# 生产环境配置
LogConfig._setup_production_logging(app)
else:
# 开发环境配置
LogConfig._setup_development_logging(app)
@staticmethod
def _setup_production_logging(app):
"""生产环境日志配置"""
# 创建日志目录
log_dir = app.config.get('LOG_DIR', '/var/log/dify')
os.makedirs(log_dir, exist_ok=True)
# 配置根日志
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# 应用日志:按时间轮转
app_handler = TimedRotatingFileHandler(
os.path.join(log_dir, 'dify.log'),
when='midnight',
interval=1,
backupCount=30,
encoding='utf-8'
)
app_handler.setLevel(logging.INFO)
# 错误日志:按大小轮转
error_handler = RotatingFileHandler(
os.path.join(log_dir, 'error.log'),
maxBytes=50*1024*1024, # 50MB
backupCount=10,
encoding='utf-8'
)
error_handler.setLevel(logging.ERROR)
# 访问日志
access_handler = TimedRotatingFileHandler(
os.path.join(log_dir, 'access.log'),
when='midnight',
interval=1,
backupCount=7,
encoding='utf-8'
)
access_handler.setLevel(logging.INFO)
# 日志格式
detailed_formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(name)s: %(message)s '
'[in %(pathname)s:%(lineno)d] - %(request_id)s'
)
json_formatter = LogConfig._create_json_formatter()
app_handler.setFormatter(json_formatter)
error_handler.setFormatter(detailed_formatter)
access_handler.setFormatter(json_formatter)
# 添加处理器
root_logger.addHandler(app_handler)
root_logger.addHandler(error_handler)
# 设置特定logger
access_logger = logging.getLogger('access')
access_logger.addHandler(access_handler)
access_logger.propagate = False
@staticmethod
def _create_json_formatter():
"""创建JSON格式化器"""
class JSONFormatter(logging.Formatter):
def format(self, record):
import json
from datetime import datetime
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
# 添加额外信息
if hasattr(record, 'request_id'):
log_data['request_id'] = record.request_id
if hasattr(record, 'app_id'):
log_data['app_id'] = record.app_id
if hasattr(record, 'tenant_id'):
log_data['tenant_id'] = record.tenant_id
# 异常信息
if record.exc_info:
log_data['exception'] = self.formatException(record.exc_info)
return json.dumps(log_data, ensure_ascii=False)
return JSONFormatter()
4.3 监控大盘配置
最后,我们需要一个可视化的监控大盘。这里是Grafana配置的示例:
{
"dashboard": {
"title": "Dify Application Monitoring",
"panels": [
{
"title": "Request Rate",
"type": "graph",
"targets": [
{
"expr": "rate(dify_api_requests_total[5m])",
"legendFormat": "{{method}} {{path}}"
}
]
},
{
"title": "Response Time",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(dify_api_request_duration_seconds_bucket[5m]))",
"legendFormat": "95th percentile"
},
{
"expr": "histogram_quantile(0.50, rate(dify_api_request_duration_seconds_bucket[5m]))",
"legendFormat": "50th percentile"
}
]
},
{
"title": "Workflow Executions",
"type": "stat",
"targets": [
{
"expr": "increase(dify_workflow_executions_total[1h])",
"legendFormat": "Total Executions"
}
]
},
{
"title": "Token Usage",
"type": "graph",
"targets": [
{
"expr": "rate(dify_token_usage_total[5m])",
"legendFormat": "{{model_provider}} - {{usage_type}}"
}
]
},
{
"title": "Error Rate",
"type": "graph",
"targets": [
{
"expr": "rate(dify_api_requests_total{status=~\"4..|5..\"}[5m]) / rate(dify_api_requests_total[5m])",
"legendFormat": "Error Rate"
}
]
},
{
"title": "System Resources",
"type": "graph",
"targets": [
{
"expr": "system_cpu_usage_percent",
"legendFormat": "CPU Usage"
},
{
"expr": "system_memory_usage_percent",
"legendFormat": "Memory Usage"
}
]
}
]
}
}
五、告警规则配置:主动发现问题
5.1 Prometheus告警规则
有了监控指标,还需要智能的告警规则:
# prometheus/alert_rules.yml
groups:
- name: dify_application_alerts
rules:
# API响应时间告警
- alert: HighAPIResponseTime
expr: histogram_quantile(0.95, rate(dify_api_request_duration_seconds_bucket[5m])) > 2
for: 5m
labels:
severity: warning
service: dify-api
annotations:
summary: "High API response time detected"
description: "95th percentile response time is {{ $value }}s for the last 5 minutes"
# 错误率告警
- alert: HighErrorRate
expr: |
(
rate(dify_api_requests_total{status=~"4..|5.."}[5m]) /
rate(dify_api_requests_total[5m])
) > 0.05
for: 2m
labels:
severity: critical
service: dify-api
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }} for the last 5 minutes"
# 工作流执行失败告警
- alert: WorkflowExecutionFailures
expr: increase(dify_workflow_executions_total{status="failed"}[10m]) > 5
for: 1m
labels:
severity: warning
service: dify-workflow
annotations:
summary: "High workflow execution failures"
description: "{{ $value }} workflow executions failed in the last 10 minutes"
# Token使用量异常告警
- alert: UnusualTokenUsage
expr: |
rate(dify_token_usage_total[1h]) >
(avg_over_time(rate(dify_token_usage_total[1h])[7d:1h]) * 3)
for: 10m
labels:
severity: warning
service: dify-api
annotations:
summary: "Unusual token usage detected"
description: "Token usage rate is 3x higher than the 7-day average"
# 系统资源告警
- alert: HighCPUUsage
expr: system_cpu_usage_percent > 80
for: 5m
labels:
severity: warning
service: dify-system
annotations:
summary: "High CPU usage"
description: "CPU usage is {{ $value }}% for the last 5 minutes"
- alert: HighMemoryUsage
expr: system_memory_usage_percent > 85
for: 5m
labels:
severity: critical
service: dify-system
annotations:
summary: "High memory usage"
description: "Memory usage is {{ $value }}% for the last 5 minutes"
# 数据库连接告警
- alert: DatabaseConnectionFailure
expr: up{job="dify-health-check"} == 0
for: 1m
labels:
severity: critical
service: dify-database
annotations:
summary: "Database connection failure"
description: "Unable to connect to database for the last 1 minute"
5.2 告警通知配置
配置多渠道告警通知:
# api/core/monitoring/alerting.py
import requests
import json
import logging
from typing import Dict, List
from dataclasses import dataclass
@dataclass
class Alert:
title: str
message: str
severity: str
service: str
timestamp: str
metrics: Dict = None
class AlertManager:
"""告警管理器"""
def __init__(self, config: dict):
self.config = config
self.logger = logging.getLogger('alert_manager')
# 告警渠道配置
self.channels = {
'slack': self._send_slack_alert,
'email': self._send_email_alert,
'webhook': self._send_webhook_alert,
'dingtalk': self._send_dingtalk_alert
}
def send_alert(self, alert: Alert, channels: List[str] = None):
"""发送告警"""
if channels is None:
channels = self.config.get('default_channels', ['slack'])
for channel in channels:
if channel in self.channels:
try:
self.channels[channel](alert)
self.logger.info(f"Alert sent to {channel}: {alert.title}")
except Exception as e:
self.logger.error(f"Failed to send alert to {channel}: {e}")
def _send_slack_alert(self, alert: Alert):
"""发送Slack告警"""
webhook_url = self.config.get('slack', {}).get('webhook_url')
if not webhook_url:
return
color = {
'critical': '#FF0000',
'warning': '#FFA500',
'info': '#00FF00'
}.get(alert.severity, '#808080')
payload = {
'attachments': [{
'color': color,
'title': alert.title,
'text': alert.message,
'fields': [
{
'title': 'Severity',
'value': alert.severity,
'short': True
},
{
'title': 'Service',
'value': alert.service,
'short': True
},
{
'title': 'Time',
'value': alert.timestamp,
'short': True
}
]
}]
}
if alert.metrics:
payload['attachments'][0]['fields'].extend([
{
'title': k,
'value': str(v),
'short': True
} for k, v in alert.metrics.items()
])
response = requests.post(webhook_url, json=payload, timeout=10)
response.raise_for_status()
def _send_dingtalk_alert(self, alert: Alert):
"""发送钉钉告警"""
webhook_url = self.config.get('dingtalk', {}).get('webhook_url')
if not webhook_url:
return
markdown_text = f"""
## {alert.title}
**严重程度**: {alert.severity}
**服务**: {alert.service}
**时间**: {alert.timestamp}
**详细信息**: {alert.message}
"""
if alert.metrics:
markdown_text += "\n**监控指标**:\n"
for k, v in alert.metrics.items():
markdown_text += f"- {k}: {v}\n"
payload = {
'msgtype': 'markdown',
'markdown': {
'title': alert.title,
'text': markdown_text
}
}
response = requests.post(webhook_url, json=payload, timeout=10)
response.raise_for_status()
def _send_email_alert(self, alert: Alert):
"""发送邮件告警"""
# 这里可以集成SMTP或邮件服务API
pass
def _send_webhook_alert(self, alert: Alert):
"""发送Webhook告警"""
webhook_url = self.config.get('webhook', {}).get('url')
if not webhook_url:
return
payload = {
'title': alert.title,
'message': alert.message,
'severity': alert.severity,
'service': alert.service,
'timestamp': alert.timestamp,
'metrics': alert.metrics
}
response = requests.post(webhook_url, json=payload, timeout=10)
response.raise_for_status()
# 全局告警管理器
alert_manager = AlertManager(current_app.config.get('ALERTING', {}))
六、监控实践经验分享
6.1 监控指标选择原则
经过多年的实践,我总结了几个监控指标选择的原则:
黄金信号:专注于最重要的4个指标
- 延迟(Latency):用户请求的响应时间
- 流量(Traffic):系统处理的请求数
- 错误(Errors):失败请求的比例
- 饱和度(Saturation):系统资源使用情况
业务相关性:监控指标要与业务价值直接相关
# 好的指标:直接反映用户体验
user_conversation_success_rate = successful_conversations / total_conversations
# 不太好的指标:技术指标但与用户体验关联度低
database_connection_pool_size = current_pool_size
可操作性:告警必须是可操作的,收到告警后要知道该做什么
# 可操作的告警
if workflow_execution_failure_rate > 0.1:
# 明确的操作:检查最近部署、查看错误日志、回滚版本
# 不可操作的告警
if cpu_usage > 50:
# 不明确:50%可能还正常,没有明确的操作指导
6.2 日志最佳实践
结构化是王道:所有日志都应该是结构化的,便于搜索和分析
# 好的日志
logger.info("Workflow executed successfully", extra={
'workflow_id': workflow_id,
'execution_time': 2.5,
'node_count': 8,
'token_usage': 150
})
# 不好的日志
logger.info(f"Workflow {workflow_id} executed in 2.5s with 8 nodes using 150 tokens")
上下文传递:使用context变量传递关键信息
# 在请求开始时设置上下文
current_request_id.set(request_id)
current_tenant_id.set(tenant_id)
# 所有后续日志自动包含这些信息
logger.info("Processing request") # 自动包含request_id和tenant_id
敏感信息脱敏:永远不要记录敏感信息
def safe_log_user_data(user_data: dict):
"""安全记录用户数据"""
safe_data = user_data.copy()
# 脱敏处理
sensitive_fields = ['password', 'api_key', 'token', 'secret']
for field in sensitive_fields:
if field in safe_data:
safe_data[field] = '***masked***'
return safe_data
6.3 告警疲劳管理
告警疲劳是监控系统的大敌,这里是一些管理经验:
分级告警:不同级别的问题使用不同的通知方式
alert_routing = {
'critical': ['phone', 'slack', 'email'], # 立即通知
'warning': ['slack', 'email'], # 工作时间通知
'info': ['email'] # 日报形式
}
告警聚合:避免重复告警轰炸
class AlertAggregator:
def __init__(self):
self.recent_alerts = {} # {alert_key: last_sent_time}
self.suppression_window = 300 # 5分钟内不重复发送相同告警
def should_send_alert(self, alert_key: str) -> bool:
current_time = time.time()
last_sent = self.recent_alerts.get(alert_key, 0)
if current_time - last_sent > self.suppression_window:
self.recent_alerts[alert_key] = current_time
return True
return False
智能阈值:基于历史数据动态调整告警阈值
def calculate_dynamic_threshold(metric_name: str, window: str = '7d') -> float:
"""基于历史数据计算动态阈值"""
historical_data = get_metric_history(metric_name, window)
mean = np.mean(historical_data)
std = np.std(historical_data)
# 使用3-sigma规则
threshold = mean + 3 * std
return threshold
七、总结与展望
通过深入分析Dify的监控与日志系统,我们可以看到一个成熟的可观测性体系应该包含:
- 完整的日志架构:从请求开始到结束的全链路日志追踪
- 丰富的监控指标:覆盖业务、系统、基础设施各个层面
- 智能的错误追踪:快速定位问题,减少故障修复时间
- 可视化的监控大盘:直观展示系统健康状况
- 及时的告警通知:主动发现问题,而不是被动等待用户投诉
这些设计不仅适用于AI应用,对于任何需要高可用性的系统都有参考价值。
实践建议
- 从简单开始:不要一开始就追求完美,先搭建基本的监控框架
- 业务优先:优先监控对业务影响最大的指标
- 持续优化:监控系统需要根据实际运行情况不断调整
- 团队共识:确保团队成员都理解监控指标的含义和告警处理流程
下一章预告
第14章我们将深入探讨《部署架构与DevOps实践》,看看如何将我们的监控系统与CI/CD流程结合,实现真正的DevOps自动化。从Docker容器化到Kubernetes集群部署,从蓝绿部署到金丝雀发布,让我们一起构建现代化的部署体系。
监控不是目的,而是手段。真正的目标是构建一个稳定、可靠、高性能的AI应用系统。有了完善的监控体系,我们就有了这个目标的"眼睛"和"耳朵",能够时刻感知系统的健康状况,及时发现和解决问题。
记住,没有监控的系统就像在黑夜中行驶的汽车,你永远不知道下一个弯道会发生什么。让我们为Dify插上监控的翅膀,让它在AI应用的天空中自由翱翔!
更多推荐
所有评论(0)