1. 项目概述:当ChatGPT日志遇上Elastic Stack

最近在搞一个挺有意思的玩意儿,起因是我们团队内部开始大规模使用ChatGPT的API来辅助开发、文档生成和代码审查。用着用着,问题就来了:调用量上去了,成本开始变得不那么透明,有时候响应慢了也不知道是网络问题还是模型负载问题,更关键的是,我们想分析一下大家到底都在用ChatGPT做什么,哪些Prompt效果好,哪些场景的Token消耗最高。光靠OpenAI后台那个简单的仪表盘,信息太有限了,而且数据没法和我们自己的业务系统联动分析。

于是,我就把目光投向了 Elastic Stack (以前叫ELK Stack)。这玩意儿在日志和指标分析领域是绝对的“瑞士军刀”。能不能把ChatGPT API的调用日志,实时地灌进Elasticsearch,然后用Kibana的可视化能力,打造一个专属的、深度定制的ChatGPT应用监控与分析平台?这就是 elastic/chatgpt-log-analysis 这个项目想法的核心。

简单说,它不是一个现成的SaaS服务,而是一套 开源的、可自部署的解决方案框架 。它帮你搭好从日志收集、解析、存储到可视化分析的全链路。你拿到的是蓝图、工具和脚本,需要根据你自己的基础设施(比如你的服务器、你的OpenAI账户)来配置和运行。最终实现的效果是:你团队或产品里每一次调用ChatGPT API的请求,其元数据、请求内容、响应内容、耗时、Token用量、成本等信息,都能被结构化地记录、索引,并变成一张张直观的仪表盘,让你对AI的使用情况了如指掌。

适合谁来搞这个?如果你是团队的技术负责人,关心AI工具的成本与效能;如果你是开发者,想深度优化自己应用的Prompt和调用策略;或者你就是一个数据驱动的AI产品经理,想从交互数据里挖出洞见,那这个项目思路就非常值得你花时间折腾一下。它把黑盒的API调用,变成了可观测、可分析的白盒操作。

2. 核心需求与价值拆解

为什么我们需要对ChatGPT的日志做专门的分析?直接看账单不行吗?账单只告诉你花了多少钱,但“为什么花这么多钱”、“钱花得值不值”、“效率能不能提升”,这些问题账单回答不了。下面我拆解几个最核心的驱动需求。

2.1 成本管控与优化:找到“烧钱”的元凶

OpenAI的API收费模式很清晰,按Token计费,不同模型价格不同。但当你有多个项目、多个团队、多种使用场景时,成本分摊就成了一笔糊涂账。

  • 场景细分 :是代码生成的调用多,还是文案创作的调用多?是某个特定的微服务在疯狂调用,还是某个开发者在做实验?
  • 模型选择分析 :大家都在用昂贵的 gpt-4 ,还是性价比更高的 gpt-3.5-turbo ?有没有场景其实用 gpt-3.5 完全够用,却错误地使用了 gpt-4 ,导致成本飙升?
  • 低效Prompt识别 :有些Prompt可能冗长、重复,或者每次对话都发送大量重复的上下文历史,导致输入Token( prompt_tokens )虚高。通过分析日志,我们能定位到那些“性价比低”的Prompt模式,从而优化它们,用更少的Token获得更好的结果。

没有日志分析,你只能看到总成本这个结果,而无法进行过程干预和优化。有了它,你就能像做性能剖析(Profiling)一样,对AI调用进行“成本剖析”。

2.2 性能监控与SLA保障

对于将ChatGPT API集成到生产环境的应用来说,其响应时间和可用性直接关系到用户体验。

  • 延迟监控 :API调用的 total_tokens response_time 是多少?P99延迟是否在可接受范围内?是否存在偶发的超长响应?
  • 错误率与重试分析 :API返回了哪些非200状态码?是速率限制( 429 )、上下文过长( 400 )还是服务器错误( 5xx )?系统是否设计了合理的重试机制?日志分析能帮你快速定位故障点和瓶颈。
  • 容量规划 :根据历史调用趋势,预测未来的Token消耗和API调用量,为预算和资源规划提供数据支持。

2.3 使用洞察与效果评估

这部分价值更偏产品和业务。

  • 热门功能分析 :用户最常使用AI来做什么?是问答、总结、翻译还是创意生成?这些洞察能指导产品功能的迭代重点。
  • 对话质量评估 :虽然不能直接评估内容质量,但可以通过分析交互模式来间接判断。例如,一个任务是否需要多轮对话( 多轮交互率 )才能完成?用户是否频繁修改或重新生成回答( 重试率 )?这些指标可以反映Prompt设计的有效性和AI回复的满意度。
  • 安全与合规审计 :记录所有的请求和响应(需注意隐私脱敏),可以用于事后审查,确保AI的使用符合公司政策,没有输入或输出敏感信息。

2.4 技术架构的必然选择:为什么是Elastic Stack?

面对海量的、半结构化的日志数据,为什么Elastic Stack是首选?

  1. 强大的实时处理能力 :Logstash或Fluentd可以实时摄取和解析日志。
  2. 高效的检索与分析 :Elasticsearch的倒排索引和聚合查询能力,能让你在亿级日志中快速完成“按项目过滤、按模型分组、按Token排序”这类复杂查询。
  3. 灵活的可视化 :Kibana的仪表盘(Dashboard)、可视化(Visualization)、机器学习(ML)功能,能让数据自己“说话”,无需额外开发报表系统。
  4. 完整的生态 :Beats系列轻量级数据采集器、Index Lifecycle Management(ILM)索引生命周期管理(用于自动滚动、归档、删除旧日志以控制成本),提供了一站式的解决方案。
  5. 开源与可控 :所有数据都在自己掌控的集群中,无需担心第三方服务的数据隐私和合规问题。

所以,这个项目的本质,是 将成熟的日志观测实践,应用到新兴的AI API调用领域 ,填补了在AI运营层面的工具空白。

3. 日志采集与处理架构设计

光有想法不行,得落地。第一步就是设计一个稳健、可扩展的日志采集与处理流水线。这里我分享我们经过实践验证的一套架构。

3.1 整体数据流设计

我们的目标是尽可能无侵入、低延迟地收集日志。核心思路是: 在应用层和OpenAI API之间,插入一个轻量的日志代理(Logging Agent) ,而不是去修改每一个调用ChatGPT的代码。

[你的应用程序] --> (发起HTTP请求) --> [日志代理/侧车] --> (转发请求并记录日志) --> [OpenAI API]
                                      |
                                      V
                               [结构化日志] --> [Logstash/Fluentd] --> [Elasticsearch] <--> [Kibana]

方案选择与理由

  1. 方案A:应用内SDK封装 。在代码里封装一个自定义的 ChatGPTClient ,在每次调用前后记录日志。优点是控制精细,缺点是需要改造所有调用代码,侵入性强,不同语言需要不同实现。
  2. 方案B:HTTP代理侧车 。部署一个独立的HTTP代理服务(比如用Nginx、Node.js或Go写一个),让应用的请求都先经过这个代理,由代理转发到OpenAI并记录日志。 这是我最推荐的方式 ,因为它对应用透明,只需要改变API的Endpoint地址(指向代理),语言无关,且可以集中实现认证、限流、重试等增强功能。
  3. 方案C:网络层抓包 。通过tcpdump或Mitmproxy拦截流量。这种方式太底层,解析复杂,不适合生产环境。

我们选择了 方案B ,并决定用 Go语言 实现这个代理。Go的并发性能好,内存占用低,非常适合做这种高并发的网络代理和日志转发器。

3.2 日志代理的核心实现要点

这个代理(我们内部叫它 ai-logger-proxy )是关键组件,它要做几件事:

  1. 接收请求 :监听一个端口,模拟OpenAI API的接口(主要是 /v1/chat/completions )。
  2. 记录请求 :解析入站请求的Header和Body,提取关键信息: Authorization 头(用于区分用户/项目)、请求体中的 model , messages , max_tokens , temperature 等。
  3. 转发请求 :将请求原样转发给真正的OpenAI API端点( https://api.openai.com/v1/chat/completions ),并加上必要的认证头。
  4. 记录响应 :接收OpenAI的响应,解析响应体,提取关键信息: id , choices[0].message.content , usage.prompt_tokens , usage.completion_tokens , usage.total_tokens ,以及整个请求的耗时。
  5. 结构化并输出日志 :将请求和响应信息合并成一个结构化的JSON日志对象,写入标准输出(stdout)或直接发送到Logstash/Fluentd。

这里有一个 非常重要的细节 Token消耗( usage )只能从OpenAI的响应中获得 。所以代理必须等待API响应完成后,才能记录完整的日志条目。这要求代理是异步的,不会阻塞客户端。

注意:隐私与脱敏 messages 里的内容可能包含敏感信息。在生产环境中, 绝对不能 将原始的prompt和completion全文记录到日志中。必须设计脱敏策略。例如,可以只记录前N个字符的哈希值用于去重分析,或者完全忽略内容,只记录元数据。如果必须记录内容用于分析,则需要确保Elasticsearch集群的访问安全(如加密、基于角色的访问控制RBAC),并可能需要在摄入前进行加密或掩码处理。这是一个严肃的合规考量点。

3.3 日志格式定义

一个良好的、结构化的日志格式是后续分析的基础。我们定义的日志JSON示例如下:

{
  "@timestamp": "2023-10-27T08:45:12.123Z",
  "log.level": "INFO",
  "message": "ChatGPT API call completed",
  "service.name": "ai-logger-proxy",
  "event.duration": 1250,
  "labels": {
    "project": "marketing-copy-generator",
    "team": "growth-hacking",
    "environment": "production"
  },
  "openai": {
    "request": {
      "id": "req_abc123", // 代理生成的唯一请求ID
      "model": "gpt-4",
      "stream": false,
      "temperature": 0.7,
      "max_tokens": 500,
      "message_count": 3,
      "prompt_hash": "a1b2c3d4...", // 对messages内容计算的哈希,用于匿名化追踪相同Prompt
      "user": "user-456" // OpenAI API中的user字段,用于区分终端用户
    },
    "response": {
      "id": "chatcmpl-xyz789", // OpenAI返回的ID
      "created": 1698393112,
      "finish_reason": "stop",
      "usage": {
        "prompt_tokens": 120,
        "completion_tokens": 320,
        "total_tokens": 440
      },
      "has_content": true // 是否包含非空回复,用于错误判断
    },
    "cost_estimate": {
      "input_cost": 0.0036, // 根据模型单价和prompt_tokens计算
      "output_cost": 0.0672, // 根据模型单价和completion_tokens计算
      "total_cost": 0.0708,
      "currency": "USD"
    }
  },
  "http": {
    "request": {
      "method": "POST",
      "headers": {
        "content-type": "application/json"
      }
    },
    "response": {
      "status_code": 200
    }
  },
  "client": {
    "ip": "10.0.1.25",
    "user_agent": "python-requests/2.28.1"
  }
}

字段设计解析

  • @timestamp , log.level , service.name :遵循ECS(Elastic Common Schema)规范,便于与Elastic生态的其他日志集成。
  • labels :用于打标,这是 成本分摊和多维度分析的关键 。这些标签应该在代理配置中通过规则注入(如根据请求来源IP或Header中的特定字段判断)。
  • openai.request.prompt_hash :这是平衡分析与隐私的折中方案。我们不存原文,但存一个哈希值,这样依然可以统计“相同Prompt被请求了多少次”。
  • openai.cost_estimate 这是核心价值字段 。代理需要内置一个模型价格表(例如 {“gpt-4”: {“input”: 0.03, “output”: 0.06}, “gpt-3.5-turbo”: {...}} ),并根据 usage 和模型名称实时计算出本次调用的估算成本。这让你在Kibana里可以直接做成本聚合。
  • event.duration :单位毫秒,用于性能监控。

4. Elastic Stack部署与配置实战

有了日志源,下一步就是搭建处理和分析平台。假设你已经在服务器上部署了Elasticsearch和Kibana(版本建议8.x),这里重点讲Logstash的配置和Elasticsearch的索引模板。

4.1 Logstash管道配置

Logstash的作用是接收代理发来的日志,进行过滤、富化,然后写入Elasticsearch。假设代理将JSON日志写入文件,并由Filebeat收集,或者代理直接通过HTTP发送到Logstash。这里展示一个从HTTP接收的配置。

logstash.conf 核心部分

input {
  http {
    port => 5044
    codec => "json" # 期望接收JSON格式的body
  }
  # 也可以使用 beats input 接收来自Filebeat的日志
  # beats {
  #   port => 5044
  # }
}

filter {
  # 1. 数据质量检查与字段修剪
  if [openai][request][model] {
    # 确保模型名称统一小写
    mutate {
      lowercase => [ "[openai][request][model]" ]
    }
  } else {
    # 如果没有模型字段,添加一个未知标记,防止后续计算错误
    mutate {
      add_field => { "[openai][request][model]" => "unknown" }
    }
  }

  # 2. 成本计算富化 (如果代理没计算,可以在这里补)
  # 注意:这需要维护一个模型价格字典,这里用ruby filter示例
  ruby {
    code => '
      price_map = {
        "gpt-4" => {"input" => 0.03, "output" => 0.06},
        "gpt-4-32k" => {"input" => 0.06, "output" => 0.12},
        "gpt-3.5-turbo" => {"input" => 0.0015, "output" => 0.002},
        "gpt-3.5-turbo-16k" => {"input" => 0.003, "output" => 0.004}
      }
      model = event.get("[openai][request][model]")
      usage = event.get("[openai][response][usage]")
      
      if price_map[model] && usage
        prompt_tokens = usage["prompt_tokens"].to_i
        completion_tokens = usage["completion_tokens"].to_i
        
        input_cost = (prompt_tokens / 1000.0) * price_map[model]["input"]
        output_cost = (completion_tokens / 1000.0) * price_map[model]["output"]
        total_cost = input_cost + output_cost
        
        event.set("[openai][cost_estimate][input_cost]", input_cost.round(6))
        event.set("[openai][cost_estimate][output_cost]", output_cost.round(6))
        event.set("[openai][cost_estimate][total_cost]", total_cost.round(6))
        event.set("[openai][cost_estimate][currency]", "USD")
      end
    '
  }

  # 3. 根据状态码和内容判断请求成功与否
  if [http][response][status_code] != 200 or ![openai][response][has_content] {
    mutate {
      add_tag => [ "openai_error" ]
    }
  }

  # 4. 添加基于时间的字段,便于按小时/天聚合
  date {
    match => [ "@timestamp", "ISO8601" ]
    target => "@timestamp" # 覆盖原字段,确保时区正确
  }
}

output {
  elasticsearch {
    hosts => ["http://your-elasticsearch-host:9200"]
    index => "chatgpt-logs-%{+YYYY.MM.dd}" # 按日滚动索引
    # 使用预定义的索引模板
    template => "/usr/share/logstash/templates/chatgpt-log-template.json"
    template_name => "chatgpt-logs"
    template_overwrite => true
    # 认证信息(如果Elasticsearch开启了安全特性)
    user => "logstash_writer"
    password => "${LOGSTASH_ES_PASSWORD}"
  }
  # 开发时也可以输出到控制台查看
  stdout {
    codec => rubydebug
  }
}

4.2 Elasticsearch索引模板与ILM策略

为了让日志高效存储和查询,必须预先定义索引映射(Mapping)和索引生命周期策略(ILM)。

索引模板 ( chatgpt-log-template.json ) : 这个模板定义了字段的类型(如 keyword 用于精确聚合, text 用于全文搜索, integer float date 等),并设置一些默认配置。

{
  "index_patterns": ["chatgpt-logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 1,
      "index.lifecycle.name": "chatgpt-logs-policy", // 关联ILM策略
      "index.lifecycle.rollover_alias": "chatgpt-logs"
    },
    "mappings": {
      "dynamic_templates": [
        {
          "strings_as_keywords": {
            "match_mapping_type": "string",
            "mapping": {
              "type": "keyword",
              "ignore_above": 1024
            }
          }
        }
      ],
      "properties": {
        "@timestamp": { "type": "date" },
        "openai.request.model": { "type": "keyword" },
        "openai.request.user": { "type": "keyword" },
        "openai.request.prompt_hash": { "type": "keyword" },
        "openai.response.usage.prompt_tokens": { "type": "integer" },
        "openai.response.usage.completion_tokens": { "type": "integer" },
        "openai.response.usage.total_tokens": { "type": "integer" },
        "openai.cost_estimate.total_cost": { "type": "scaled_float", "scaling_factor": 1000000 },
        "labels.project": { "type": "keyword" },
        "labels.team": { "type": "keyword" },
        "event.duration": { "type": "long" },
        "http.response.status_code": { "type": "integer" }
      }
    }
  }
}

ILM策略配置(可在Kibana界面操作) : 日志数据会增长很快,我们需要自动管理旧数据。

  1. Hot阶段(7天) :索引可读写,承载最新数据。
  2. Warm阶段(可选) :索引只读,可以转移到性能较差的节点。
  3. Cold阶段(30天后) :索引只读,转移到最廉价的存储。
  4. Delete阶段(90天后) :永久删除索引,释放空间。

通过Kibana的 Stack Management > Index Lifecycle Policies 可以图形化地创建这个策略,并将其名称(如 chatgpt-logs-policy )关联到上面的索引模板中。

4.3 Kibana可视化与仪表盘搭建

数据进到Elasticsearch后,就可以在Kibana里大展拳脚了。首先需要创建索引模式(Index Pattern) chatgpt-logs-* 。然后就可以创建可视化了。

核心可视化组件建议

  1. 成本总览面积图 :Y轴为 Sum of openai.cost_estimate.total_cost ,X轴为 @timestamp (按天或小时聚合)。一眼看清成本趋势。
  2. 模型使用量柱状图 :Y轴为 Count ,X轴为 openai.request.model 。看哪个模型最受欢迎。
  3. 项目/团队成本堆叠柱状图 :Y轴为 Sum of openai.cost_estimate.total_cost ,X轴为 @timestamp ,用 labels.project labels.team 拆分。用于成本分摊。
  4. 平均响应时间与P95/P99延迟时序图 :Y轴为 Average of event.duration Percentiles of event.duration (95, 99) ,X轴为时间。监控性能。
  5. 错误率指标 :创建一个 Metric 可视化,展示状态码非200的日志比例。
  6. Token效率散点图 :Y轴为 Average of openai.response.usage.total_tokens ,X轴为 labels.project ,用 openai.request.model 着色。快速发现哪个项目或模型组合的Token使用效率异常。
  7. 数据表 :展示最近的高成本请求详情,包括请求ID、模型、Token用量、估算成本、所属项目等,支持点击排序和过滤。

把这些可视化组件拖拽到一个仪表盘(Dashboard)里,你就拥有了一个实时监控ChatGPT API使用情况的“指挥中心”。

5. 高级分析与运维技巧

基础仪表盘搭建好后,可以玩一些更深入的分析,并解决一些实际运维中的问题。

5.1 利用Elasticsearch聚合进行深度分析

Kibana的可视化背后是强大的Elasticsearch聚合查询。你可以直接在Kibana的 Discover Dev Tools 里写查询。

  • 找出最“烧钱”的Prompt模式

    GET chatgpt-logs-*/_search
    {
      "size": 0,
      "aggs": {
        "group_by_prompt_hash": {
          "terms": {
            "field": "openai.request.prompt_hash",
            "size": 10,
            "order": { "total_cost": "desc" }
          },
          "aggs": {
            "total_cost": { "sum": { "field": "openai.cost_estimate.total_cost" } },
            "avg_tokens": { "avg": { "field": "openai.response.usage.total_tokens" } },
            "sample_request": {
              "top_hits": {
                "size": 1,
                "_source": ["labels.project", "openai.request.model"]
              }
            }
          }
        }
      }
    }
    

    这个查询能找出总成本最高的前10个Prompt哈希,并附带其成本、平均Token用量以及一个样例请求(包含项目和模型),帮你定位需要优化的具体Prompt。

  • 分析各模型的Token产出效率

    GET chatgpt-logs-*/_search
    {
      "size": 0,
      "aggs": {
        "by_model": {
          "terms": { "field": "openai.request.model" },
          "aggs": {
            "avg_input_tokens": { "avg": { "field": "openai.response.usage.prompt_tokens" } },
            "avg_output_tokens": { "avg": { "field": "openai.response.usage.completion_tokens" } },
            "cost_per_output_token": {
              "bucket_script": {
                "buckets_path": {
                  "total_cost": "total_cost",
                  "total_output": "total_output_tokens"
                },
                "script": "params.total_cost / params.total_output * 1000" // 每千个输出Token的成本
              }
            },
            "total_cost": { "sum": { "field": "openai.cost_estimate.total_cost" } },
            "total_output_tokens": { "sum": { "field": "openai.response.usage.completion_tokens" } }
          }
        }
      }
    }
    

    这个聚合能帮你计算每个模型平均消耗多少输入/输出Token,以及更关键的—— 每千个输出Token的成本 。这是一个衡量模型“性价比”的硬指标。

5.2 告警配置:让系统主动找你

监控的终极目标是预防问题。Kibana的告警功能可以做到:

  1. 成本超支告警 :当过去24小时内总成本超过预设阈值(如100美元)时,发送邮件或Slack通知。
  2. 错误率飙升告警 :当最近15分钟内API错误率(非200状态码比例)超过5%时,立即告警,提示可能遇到API故障或配额问题。
  3. 响应时间退化告警 :当P95响应时间连续多个时间窗口超过设定值(如10秒)时告警。

在Kibana的 Stack Management > Rules and Connectors 中配置。告警规则基于一个查询(例如, http.response.status_code >= 400 )和聚合条件(例如, count > 10 )来触发。

5.3 性能优化与运维心得

  • 代理的性能与高可用 :Go写的代理虽然轻量,但在高并发下也可能成为瓶颈。要做好连接池管理、超时设置和优雅降级。可以考虑部署多个代理实例,前面用负载均衡器(如Nginx)分发流量。
  • 日志量控制 :如果调用量巨大,全量日志会对Elasticsearch集群造成压力。可以考虑采样(Sample),例如只记录1/10的请求,或者只记录耗时超过一定阈值、成本超过一定阈值的“异常”请求。
  • Elasticsearch集群规划 :根据日志量预估磁盘空间。 chatgpt-logs-* 索引的字段相对固定且不多,压缩率会很好。使用ILM策略及时删除旧数据是控制存储成本的关键。
  • 标签(Labels)的管理 labels.project labels.team 是分析的维度灵魂。如何准确打标?我们是在代理的配置文件中维护了一个映射表,将请求来源IP段或Header中的特定标识(如 X-Project-ID )映射到项目名和团队名。这要求调用方在请求时带上这些标识。
  • 模型价格更新 :OpenAI的定价可能会调整。代理中的价格表和Logstash的Ruby脚本里的价格表需要同步更新。最好将其做成外部配置文件,便于统一管理。

6. 常见问题与排查实录

在实际搭建和运行过程中,我踩过不少坑。这里记录几个典型问题和解决方法。

6.1 日志采集延迟或丢失

  • 现象 :Kibana里看到的数据比实际调用晚几分钟,或者偶尔丢失一些日志条目。
  • 排查
    1. 检查代理的日志输出缓冲区是否已满?Go程序的标准输出如果没被及时消费,可能会阻塞或丢数据。确保代理将日志直接发送到Logstash的HTTP端口,或者使用异步、非阻塞的方式写文件。
    2. 检查Logstash管道吞吐量。在Logstash监控界面查看 events.in events.out 的速率,以及 pipeline.duration 。如果处理速度跟不上,需要优化Filter插件(比如移除不必要的Ruby计算),或者增加Logstash节点。
    3. 检查Elasticsearch索引是否健康,有无 yellow red 状态。可能是磁盘空间不足或分片分配有问题。

6.2 成本计算字段为null或不准

  • 现象 openai.cost_estimate.total_cost 字段为空,或者计算出的成本与OpenAI账单对不上。
  • 排查
    1. 首先确认OpenAI的响应中是否包含完整的 usage 字段。有些流式(stream)响应或错误响应可能没有。
    2. 检查代理或Logstash中的模型价格表是否完整,是否包含了所有使用的模型(如 gpt-4-turbo-preview 等新模型)。
    3. 核对价格单位。OpenAI的价格是每1000个Token的美元数。计算时是 (token_count / 1000) * price_per_1k 。检查计算逻辑是否有误。
    4. 注意,这里的成本是 估算值 ,与OpenAI官方账单可能因四舍五入或促销略有差异,但趋势和量级应该一致。

6.3 Kibana仪表盘加载缓慢

  • 现象 :打开包含多个复杂可视化的仪表盘时,加载时间很长。
  • 排查与优化
    1. 缩小时间范围 :默认加载“最近15分钟”和加载“最近7天”的数据量天差地别。鼓励用户查看仪表盘时先选择一个合适的时间窗口。
    2. 优化索引映射 :确保用于聚合和过滤的字段(如 model , project )是 keyword 类型,而不是 text 类型。 text 类型会进行分词,聚合效率低。
    3. 使用运行时字段(Runtime Fields)谨慎 :如果有些计算字段是在Kibana中用脚本定义的运行时字段,会极大影响查询性能。尽量在数据摄入时(在代理或Logstash中)就计算好这些字段。
    4. 增加Elasticsearch资源 :如果数据量确实很大(日增数亿条),需要考虑给Elasticsearch集群增加内存和CPU资源,或者对索引进行按时间分片(我们已经做了),甚至考虑将“热”数据(最近几天)和“温冷”数据存储在不同性能的硬件上。

6.4 如何验证数据管道是否通畅?

建立一个简单的端到端测试流程:

  1. 通过你的代理发送一个测试请求到ChatGPT API。
  2. 立即在Kibana的 Discover 页面,索引模式选择 chatgpt-logs-* ,时间范围选择“最近1小时”,搜索 openai.response.id:<你收到的响应ID>
  3. 如果能搜到对应的日志记录,并且字段齐全,说明从代理->Logstash->Elasticsearch->Kibana的整个链路是通的。

这个项目从构思到落地,花了我们小团队大概两周的业余时间。最大的收获不是做出了一个多么炫酷的系统,而是真正让AI的使用从“凭感觉”变成了“看数据”。每次看到仪表盘上异常的成本 spikes,我们都能快速定位到是哪个团队、哪个功能、甚至哪段Prompt引起的,然后有针对性地去优化。这种掌控感,对于任何希望规模化、负责任地使用AI技术的团队来说,都是无价的。如果你也在用ChatGPT API,强烈建议你花点时间搭建一套类似的日志分析系统,它带来的回报远大于投入。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐