ADK 模型抽象与多模型集成:从 Gemini 到 Ollama
1. 模型抽象层设计
如果把智能体比作一辆汽车,那么大语言模型(LLM)就是它的发动机。
没有发动机的汽车只是一个金属壳子,没有 LLM 的智能体也只是一个空架子。LLM 提供了推理能力、语言理解能力、规划能力——这些是智能体“智能”的来源。
但发动机有很多种——有汽油发动机、柴油发动机、电动机、混合动力。同样,LLM 也有很多种——有 Gemini、GPT、Claude、Llama、Mistral……
一个好的智能体框架,不应该绑定到某一个特定的模型上。它应该能够灵活地切换不同的模型,让开发者根据场景选择最合适的。
ADK 就是这样做的。它通过一个清晰的抽象层,将智能体逻辑与具体的模型实现解耦。
1.1 设计思想
ADK 的模型系统遵循以下几个设计原则:
极简接口:LLM 接口只有两个方法,非常简洁。极简接口的好处是容易理解、容易实现、容易测试、不容易出错。很多框架的模型接口设计得很复杂,有各种各样的方法。但本质上,模型做的事情只有一件——生成内容。其他都是细节。ADK 抓住了本质,把接口设计得很简洁。
流式优先:虽然同时支持流式和非流式,但从设计上可以看出流式是一等公民。统一使用迭代器作为返回类型,响应中有专门的 Partial 标志,有 StreamingResponseAggregator 来处理流式聚合。这是一个很现代的设计。现在的 AI 应用几乎都需要流式输出,非流式反而成了特例。
复用 genai 类型:ADK 没有重新定义一套内容类型,而是直接使用 genai 库的类型。这是一个务实的选择。genai 库的类型设计得很好,支持多模态、函数调用、代码执行等各种特性。重新发明轮子既浪费时间,又可能遗漏重要的东西。当然,这也带来了一定的耦合——ADK 模型层和 genai 库绑定了。但考虑到 genai 本身就是跨模型的,这个代价是值得的。
可扩展的响应元数据:LLMResponse 有很多元数据字段,还有一个 CustomMetadata map[string]any 用于自定义扩展。这使得不同的模型可以返回自己特有的元数据,而不需要修改接口。比如 Gemini 可能返回 grounding metadata,Claude 可能返回不同的 usage 信息,你的自定义模型可能返回一些特殊的调试信息,都可以通过 CustomMetadata 来传递。
1.2 模型在智能体中的位置
模型是 LLMAgent 的核心组件。LLMAgent 通过 Flow 来调用模型,Flow 封装了 ReAct 循环(推理-行动-观察)。
模型的职责很单一:就是根据输入生成输出。至于什么时候调用模型、什么时候调用工具、什么时候转移智能体,这些逻辑都在 Flow 和 LLMAgent 中。
这种关注点分离是很好的设计:
- 模型只负责“思考”
- 智能体负责“规划”
- 工具负责“行动”
各自职责清晰,互不干扰。
2. LLM 接口定义
ADK 的模型抽象非常简洁。核心就是一个接口:LLM。
让我们看看 llm.go 中的定义:
// LLM 定义了大语言模型的核心接口
type LLM interface {
// Name 返回模型的名称
Name() string
// GenerateContent 根据输入生成内容,支持流式和非流式两种模式
GenerateContent(ctx context.Context, req *LLMRequest, stream bool) iter.Seq2[*LLMResponse, error]
}
只有两个方法。Name() 方法返回模型的名称,用于标识和日志记录。GenerateContent() 方法是核心方法,根据输入生成内容,支持流式和非流式两种模式。就是这么简单。一个模型只需要能做一件事:根据输入生成输出。这种极简设计的好处是容易理解、容易实现、容易测试。任何想要集成到 ADK 的模型,只需要实现这两个方法即可。
3. LLMRequest 与 LLMResponse
3.1 LLMRequest:请求结构
请求的结构定义如下:
// LLMRequest 封装了发送给 LLM 的请求信息
type LLMRequest struct {
// Model 指定模型名称(可选,可覆盖默认模型)
Model string
// Contents 是对话历史内容列表
Contents []*genai.Content
// Config 是生成配置(温度、top_p 等参数)
Config *genai.GenerateContentConfig
// Tools 是工具集合(内部使用),JSON 序列化时忽略
Tools map[string]any `json:"-"`
}
Model 字段是可选的模型名称,如果设置了会覆盖默认模型,这允许插件在运行时动态切换模型。
Contents 字段是对话历史内容列表,包含了完整的对话上下文,从用户消息到智能体回复到工具响应。
Config 字段是生成配置,包含了温度、top_p 等控制生成行为的参数。
Tools 字段是工具集合,这是内部使用的字段,在 JSON 序列化时会被忽略,因为它包含了运行时的工具信息而不是需要持久化的数据。
可以看到,LLMRequest 使用了 genai.Content 作为内容格式。这是 Google 的 genai 库定义的类型。
你可能会问:为什么要用 Google 的类型?ADK 不是应该和具体模型解耦吗?
这是一个权衡。genai 库的 Content 类型设计得相当完善,支持多模态、函数调用、代码执行等特性。重新发明一套轮子既麻烦又没必要。
而且,genai 库本身就是跨模型的:它不仅支持 Google 的模型,也可以适配其他模型。
3.2 LLMResponse:响应结构
响应的结构是这样的:
// LLMResponse 封装了 LLM 返回的响应信息
type LLMResponse struct {
// Content 是生成的核心内容(文本、函数调用等)
Content *genai.Content
// CitationMetadata 是引用元数据
CitationMetadata *genai.CitationMetadata
// GroundingMetadata 是接地元数据
GroundingMetadata *genai.GroundingMetadata
// UsageMetadata 是用量元数据(token 数等)
UsageMetadata *genai.GenerateContentResponseUsageMetadata
// CustomMetadata 是自定义元数据,用于扩展
CustomMetadata map[string]any
// LogprobsResult 是对数概率结果
LogprobsResult *genai.LogprobsResult
// ModelVersion 是模型版本号
ModelVersion string
// Partial 表示是否为部分响应(流式中间块)
Partial bool
// TurnComplete 表示本轮对话是否已完成
TurnComplete bool
// Interrupted 表示响应是否被中断
Interrupted bool
// ErrorCode 是错误代码
ErrorCode string
// ErrorMessage 是错误消息
ErrorMessage string
// FinishReason 是结束原因
FinishReason genai.FinishReason
// AvgLogprobs 是平均对数概率
AvgLogprobs float64
}
字段很多,但可以分为几类:
核心内容:
Content:生成的内容(文本、函数调用等)
元数据:
CitationMetadata:引用元数据GroundingMetadata:接地元数据UsageMetadata:用量元数据(token 数等)LogprobsResult:对数概率结果ModelVersion:模型版本CustomMetadata:自定义元数据
流式相关:
Partial:是否是部分响应(流式中间块)TurnComplete:本轮是否完成Interrupted:是否被中断
错误相关:
ErrorCode:错误代码ErrorMessage:错误消息FinishReason:结束原因
这个响应结构非常全面,覆盖了各种场景。对于简单的使用场景,你只需要关注 Content 就够了。对于高级场景,还有很多有用的元数据。
4. Gemini 模型实现
理解了接口,我们来看看具体的实现。ADK 内置了 Gemini 模型的实现。
让我们看看 gemini.go 中的实现。
4.1 geminiModel 结构体
// geminiModel 是 Gemini 模型的核心实现结构体
type geminiModel struct {
// client 是 genai 库的客户端,用于与 Gemini API 通信
client *genai.Client
// name 是模型名称,如 "gemini-2.5-flash"
name string
// versionHeaderValue 是预计算的版本请求头值,用于遥测
versionHeaderValue string
}
三个字段:
client:genai 客户端name:模型名称versionHeaderValue:版本请求头(用于遥测)
4.2 创建模型
// NewModel 创建一个新的 Gemini 模型实例
func NewModel(ctx context.Context, modelName string, cfg *genai.ClientConfig) (model.LLM, error) {
// 创建配置副本,避免直接修改调用方传入的配置
if cfg != nil {
// 浅拷贝配置结构体
cfgCopy := *cfg
// 如果 HTTPClient 不为空,也需要拷贝一份
if cfg.HTTPClient != nil {
// 浅拷贝 HTTPClient
clientCopy := *cfg.HTTPClient
// 将拷贝后的 HTTPClient 赋值给配置副本
cfgCopy.HTTPClient = &clientCopy
}
// 使用配置副本
cfg = &cfgCopy
}
// 使用 genai 库创建客户端
client, err := genai.NewClient(ctx, cfg)
if err != nil {
// 创建失败,返回错误
return nil, err
}
// 如果客户端配置了 HTTPClient,添加请求头拦截器
if client.ClientConfig().HTTPClient != nil {
// 用 mergeHeadersInterceptor 包装原有的 Transport
client.ClientConfig().HTTPClient.Transport = &mergeHeadersInterceptor{
// 保留原有的 Transport 作为基础
base: client.ClientConfig().HTTPClient.Transport,
}
}
// 预计算版本请求头,格式为 "google-adk/版本号 gl-go/Go版本"
headerValue := fmt.Sprintf("google-adk/%s gl-go/%s", version.Version,
// 获取 Go 运行时版本并去掉 "go" 前缀
strings.TrimPrefix(runtime.Version(), "go"))
// 返回构造好的 geminiModel 实例
return &geminiModel{
name: modelName,
client: client,
versionHeaderValue: headerValue,
}, nil
}
创建过程做了几件事:
- 复制配置:防御性编程,避免修改调用方的配置和底层 http.Client
- 创建客户端:使用 genai 库创建客户端
- 添加拦截器:添加 HTTP 请求头拦截器,用于合并请求头
- 预计算请求头:避免每次请求都计算,格式为
google-adk/版本号 gl-go/Go版本
注意这里的 mergeHeadersInterceptor。它是一个 HTTP RoundTripper,用于合并和添加请求头。
// mergeHeadersInterceptor 是一个 HTTP RoundTripper 拦截器
type mergeHeadersInterceptor struct {
// base 是基础的 RoundTripper,如果为空则使用默认 Transport
base http.RoundTripper
}
// RoundTrip 实现了 http.RoundTripper 接口
func (h *mergeHeadersInterceptor) RoundTrip(req *http.Request) (*http.Response, error) {
// 遍历需要合并的请求头名称
for _, headerName := range []string{"x-goog-api-client", "user-agent"} {
// 获取该请求头的所有值
if values := req.Header.Values(headerName); len(values) > 0 {
// 用空格连接多个值,合并为一个
req.Header.Set(headerName, strings.Join(values, " "))
}
}
// 如果基础 RoundTripper 为空,使用默认的 HTTP Transport
if h.base == nil {
return http.DefaultTransport.RoundTrip(req)
}
// 否则委托给基础 RoundTripper
return h.base.RoundTrip(req)
}
这个拦截器主要是为了合并多个请求头值,便于 Google 统计和调试。
4.3 GenerateContent 实现
// GenerateContent 是 LLM 接口的核心方法实现
func (m *geminiModel) GenerateContent(ctx context.Context, req *model.LLMRequest, stream bool) iter.Seq2[*model.LLMResponse, error] {
// 确保最后一条消息是用户消息(Gemini API 要求)
m.maybeAppendUserContent(req)
// 如果配置为空,初始化为默认配置
if req.Config == nil {
req.Config = &genai.GenerateContentConfig{}
}
// 如果 HTTP 选项为空,初始化
if req.Config.HTTPOptions == nil {
req.Config.HTTPOptions = &genai.HTTPOptions{}
}
// 如果请求头为空,初始化
if req.Config.HTTPOptions.Headers == nil {
req.Config.HTTPOptions.Headers = make(http.Header)
}
// 添加版本信息请求头
m.addHeaders(req.Config.HTTPOptions.Headers)
// 根据 stream 参数选择流式或非流式实现
if stream {
// 流式模式
return m.generateStream(ctx, req)
}
// 非流式模式:返回一个只产生一次结果的迭代器
return func(yield func(*model.LLMResponse, error) bool) {
// 调用非流式生成方法
resp, err := m.generate(ctx, req)
// 将结果通过 yield 输出
yield(resp, err)
}
}
核心逻辑很简单:
- 追加用户内容:确保最后一条消息是用户消息
- 确保配置:初始化各种配置对象
- 添加请求头:添加版本信息请求头
- 选择模式:根据 stream 参数选择流式或非流式实现
4.4 addHeaders:添加请求头
// addHeaders 向 HTTP 请求头中添加版本信息
func (m *geminiModel) addHeaders(headers http.Header) {
// 设置 x-goog-api-client 请求头,标识 API 客户端
headers.Set("x-goog-api-client", m.versionHeaderValue)
// 设置 user-agent 请求头,标识用户代理
headers.Set("user-agent", m.versionHeaderValue)
}
添加两个请求头:
x-goog-api-client:API 客户端标识user-agent:用户代理
这两个请求头都设置为预计算的版本字符串,格式为 google-adk/版本号 gl-go/Go版本。
4.5 modelName:模型名称选择
// modelName 根据请求中的配置选择实际使用的模型名称
func (m *geminiModel) modelName(req *model.LLMRequest) string {
// 如果请求中指定了模型名称,优先使用请求中的
if req.Model != "" {
return req.Model
}
// 否则使用构造时指定的默认模型名称
return m.name
}
优先使用请求中的 Model 字段(可被 BeforeModelCallback 修改),若没设置则使用构造时指定的模型名称。
这种设计允许插件在运行时动态切换模型。
4.6 maybeAppendUserContent:特殊处理
// maybeAppendUserContent 确保对话内容的最后一条是用户消息
func (m *geminiModel) maybeAppendUserContent(req *model.LLMRequest) {
// 如果内容列表为空,添加一条默认的用户消息
if len(req.Contents) == 0 {
req.Contents = append(req.Contents, genai.NewContentFromText("Handle the requests as specified in the System Instruction.", "user"))
}
// 获取最后一条消息
if last := req.Contents[len(req.Contents)-1]; last != nil && last.Role != "user" {
// 如果最后一条不是用户消息,追加一条提示消息
req.Contents = append(req.Contents, genai.NewContentFromText("Continue processing previous requests as instructed. Exit or provide a summary if no more outputs are needed.", "user"))
}
}
这个方法做了两件事:
- 如果内容为空,添加一条默认的用户消息
- 如果最后一条不是用户消息,追加一条用户消息
为什么要这样做?因为 Gemini API 要求最后一条消息必须是用户消息。但在智能体的场景中,有时候最后一条可能是函数调用响应(model 角色),这时候就需要追加一条用户消息来"唤醒"模型。
这是一个很实用的兼容性处理。
4.7 非流式生成
// generate 执行非流式的内容生成
func (m *geminiModel) generate(ctx context.Context, req *model.LLMRequest) (*model.LLMResponse, error) {
// 调用 genai 客户端的 GenerateContent 方法
resp, err := m.client.Models.GenerateContent(ctx, m.modelName(req), req.Contents, req.Config)
if err != nil {
// 调用失败,包装错误信息后返回
return nil, fmt.Errorf("failed to call model: %w", err)
}
// 检查响应是否为空
if len(resp.Candidates) == 0 {
return nil, fmt.Errorf("empty response")
}
// 使用转换器将 genai 响应格式转换为 ADK 的 LLMResponse 格式
return converters.Genai2LLMResponse(resp), nil
}
非流式的实现很直接:
- 调用 genai 客户端的 GenerateContent 方法
- 检查响应是否为空
- 使用 converters.Genai2LLMResponse 转换响应格式
4.8 流式生成
流式生成稍微复杂一点:
// generateStream 执行流式的内容生成
func (m *geminiModel) generateStream(ctx context.Context, req *model.LLMRequest) iter.Seq2[*model.LLMResponse, error] {
// 创建流式响应聚合器,用于组装部分响应
aggregator := llminternal.NewStreamingResponseAggregator()
// 返回一个迭代器函数
return func(yield func(*model.LLMResponse, error) bool) {
// 遍历 genai 客户端的流式响应
for resp, err := range m.client.Models.GenerateContentStream(ctx, m.modelName(req), req.Contents, req.Config) {
// 如果发生错误,输出错误并结束
if err != nil {
yield(nil, err)
return
}
// 将每个流式响应块交给聚合器处理
for llmResponse, err := range aggregator.ProcessResponse(ctx, resp) {
// 输出聚合后的响应
if !yield(llmResponse, err) {
// 消费者停止接收,结束迭代
return
}
}
}
// 流式结束后,获取聚合器关闭时产生的最终完整响应
if closeResult := aggregator.Close(); closeResult != nil {
yield(closeResult, nil)
}
}
}
这里有一个重要的组件——StreamingResponseAggregator(流式响应聚合器)。
为什么需要聚合器?因为 Gemini API 的流式响应可能是不完整的——函数调用可能被拆分成多个块,文本可能逐字返回。聚合器负责将这些块拼起来,形成完整的响应。
聚合器的工作原理大致是这样的:
聚合器一边接收响应块,一边输出部分响应(用于 UI 实时显示),最后输出完整响应(用于业务逻辑处理)。
4.9 GetGoogleLLMVariant:获取后端类型
// GetGoogleLLMVariant 返回当前模型使用的 Google LLM 后端类型
func (m *geminiModel) GetGoogleLLMVariant() genai.Backend {
// 安全检查:如果接收者或客户端为空,返回未指定类型
if m == nil || m.client == nil {
return genai.BackendUnspecified
}
// 从客户端配置中获取后端类型
return m.client.ClientConfig().Backend
}
这个方法返回当前模型的后端类型(如 Gemini、Vertex AI 等),用于遥测和日志。
4.10 接口实现验证
// 编译时检查:确保 geminiModel 实现了 googlellm.GoogleLLM 接口
var _ googlellm.GoogleLLM = &geminiModel{}
这行代码确保 geminiModel 实现了 googlellm.GoogleLLM 接口。
5. Apigee 模型:企业级 API 网关代理
在企业环境中,直接调用 Gemini API 可能不符合安全和管理要求。Apigee 是 Google 的 API 管理平台,它可以作为 Gemini API 的代理,提供认证、限流、监控等能力。
根据 apigee.go 的实现,Apigee 模型是一个委托模式(Delegate Pattern)的实现:
// apigeeModel 是 Apigee 代理模型的实现
type apigeeModel struct {
// delegate 是委托的底层 Gemini 模型实例
delegate model.LLM
// name 是模型名称
name string
}
// GenerateContent 直接委托给底层模型执行
func (m *apigeeModel) GenerateContent(ctx context.Context, req *model.LLMRequest, stream bool) iter.Seq2[*model.LLMResponse, error] {
// 所有 LLM 逻辑都委托给底层的 Gemini 模型
return m.delegate.GenerateContent(ctx, req, stream)
}
Apigee 模型本身不实现任何 LLM 逻辑,它只是在创建 Gemini 模型时注入了 Apigee 代理的配置。
5.1 模型名称解析
Apigee 模型支持多种命名格式:
// parseModelName 解析模型名称字符串,提取后端类型和 API 版本信息
func parseModelName(modelName string) (*modelInfo, error) {
// 支持的格式:
// 格式一:apigee/{model_id}
// 格式二:apigee/gemini/{model_id}
// 格式三:apigee/vertex_ai/{model_id}
// 格式四:apigee/v1/{model_id}
// 格式五:apigee/vertex_ai/v1/{model_id}
// 格式六:apigee/gemini/v1/{model_id}
}
Apigee 模型支持多种命名格式。最简单的格式是 apigee/{model_id},它默认使用 Gemini API 后端和默认 API 版本。如果想明确指定后端,可以使用 apigee/gemini/{model_id} 或 apigee/vertex_ai/{model_id},前者使用 Gemini API 后端,后者使用 Vertex AI 后端。如果需要指定特定的 API 版本,可以在模型名称中加入版本号,如 apigee/v1/{model_id} 使用 Gemini API v1 版本,apigee/vertex_ai/v1/{model_id} 使用 Vertex AI v1 版本。这种灵活的命名格式让开发者可以根据需求选择合适的后端和版本。
5.2 创建 Apigee 模型
// 创建 Apigee 代理模型,指定代理 URL 和自定义请求头
model, err := apigee.NewModel(ctx, "apigee/gemini-2.5-flash",
// 设置 Apigee 代理的 URL
apigee.WithProxyURL("https://my-apigee-proxy.example.com"),
// 设置自定义请求头
apigee.WithCustomHeaders(http.Header{
// 添加 API Key 请求头
"X-API-Key": []string{"my-api-key"},
}),
)
如果不提供 WithProxyURL,会从 APIGEE_PROXY_URL 环境变量读取。如果是 Vertex AI 后端,还需要设置 GOOGLE_CLOUD_PROJECT 和 GOOGLE_CLOUD_LOCATION 环境变量。
5.3 生成 HTTP 选项
// generateHTTPOptions 生成 HTTP 选项配置
func generateHTTPOptions(proxyURL, apiVersion string, customHeaders http.Header) *genai.HTTPOptions {
// 创建 HTTP 选项,将基础 URL 改为 Apigee 代理 URL
httpOptions := &genai.HTTPOptions{
BaseURL: proxyURL, // 所有 Gemini API 请求都会发送到这个代理 URL
}
// 如果有自定义请求头,注入到 HTTP 选项中
if customHeaders != nil {
httpOptions.Headers = make(http.Header)
for k, v := range customHeaders {
// 遍历自定义请求头并逐项设置
httpOptions.Headers[k] = v
}
}
// 如果指定了 API 版本,设置版本号
if apiVersion != "" {
httpOptions.APIVersion = apiVersion
}
return httpOptions
}
关键在于 BaseURL——所有 Gemini API 请求都会发送到这个代理 URL,而不是 Google 的官方端点。这让企业可以在 Apigee 层面实施安全策略。
5.4 三种模型的对比
三种模型各有其特点和适用场景。Gemini 模型部署在 Google Cloud 或通过 AI Studio 使用,认证方式支持 API Key 和 OAuth,完全支持流式输出和函数调用,适合开发和原型验证场景。Apigee 模型部署在企业 API 网关,认证方式通过 Apigee 自定义配置,流式支持和函数调用功能都委托给底层的 Gemini 模型,适合企业生产环境的安全合规需求。Ollama 模型部署在本地运行,不需要认证,支持流式输出,但函数调用能力取决于具体使用的模型,适合本地开发和需要隐私保护的场景。
选择哪种模型取决于你的具体需求。如果追求最简单的开发体验,使用 Gemini 模型最直接。如果企业有严格的 API 管理要求,使用 Apigee 模型可以统一管控。如果需要本地运行、数据隐私或成本控制,Ollama 模型是不错的选择。
6. Ollama 集成
ADK 的模型接口非常简洁,这意味着集成新的模型很容易。
你只需要做三件事:
- 实现
model.LLM接口 - 处理请求格式转换
- 处理响应格式转换
6.1 集成的关键点
集成新模型时,有几个关键点需要注意:
1. 内容格式转换:ADK 使用 genai.Content 作为内容格式。你的模型可能使用不同的格式(比如 OpenAI 的消息格式),需要进行转换。转换时要注意角色映射(system/user/model/assistant)、多模态内容(文本、图片、音频)、函数调用格式、函数响应格式。
2. 流式支持:如果模型支持流式输出,一定要实现。流式体验对于对话应用来说非常重要。实现流式时,你需要处理服务器发送事件(SSE)或其他流式协议、正确设置 Partial 标志、最后输出完整响应。
3. 工具调用:如果你的模型支持工具调用,要确保工具声明格式正确转换、工具调用解析正确、工具响应格式正确。
4. 错误处理:正确处理各种错误情况,包括网络错误、限流错误、内容过滤、模型错误。
6.2 示例:集成 Ollama
Ollama 是一个流行的本地模型运行工具。很多开发者想在 ADK 中使用 Ollama 运行本地开源模型。
虽然 ADK Go 目前没有内置的 Ollama 实现,但我们可以根据接口自己实现一个。
让我们看看大致的实现思路:
// OllamaModel 是 Ollama 本地模型的实现结构体
type OllamaModel struct {
// baseURL 是 Ollama 服务的地址
baseURL string
// model 是 Ollama 中的模型名称
model string
// client 是 HTTP 客户端
client *http.Client
}
// NewOllamaModel 创建一个新的 Ollama 模型实例
func NewOllamaModel(baseURL, model string) model.LLM {
return &OllamaModel{
// 设置 Ollama 服务地址
baseURL: baseURL,
// 设置模型名称
model: model,
// 使用默认 HTTP 客户端
client: http.DefaultClient,
}
}
// Name 返回模型名称
func (m *OllamaModel) Name() string {
return m.model
}
// GenerateContent 生成内容,将 genai 格式转换为 Ollama 格式后调用 API
func (m *OllamaModel) GenerateContent(ctx context.Context, req *model.LLMRequest, stream bool) iter.Seq2[*model.LLMResponse, error] {
// 步骤一:将 genai.Content 格式转换为 Ollama 的消息格式
messages := convertToOllamaMessages(req.Contents)
// 步骤二:构建 Ollama API 请求
ollamaReq := OllamaChatRequest{
// 模型名称
Model: m.model,
// 转换后的消息列表
Messages: messages,
// 是否流式输出
Stream: stream,
// ... 工具等其他参数
}
// 根据 stream 参数选择流式或非流式模式
if stream {
return m.generateStream(ctx, ollamaReq)
}
return m.generate(ctx, ollamaReq)
}
// generate 执行非流式的内容生成
func (m *OllamaModel) generate(ctx context.Context, req OllamaChatRequest) iter.Seq2[*model.LLMResponse, error] {
return func(yield func(*model.LLMResponse, error) bool) {
// 调用 Ollama API 获取响应
resp, err := m.callOllamaAPI(ctx, req)
if err != nil {
// 出错时输出错误
yield(nil, err)
return
}
// 将 Ollama 响应格式转换为 ADK 的 LLMResponse 格式
llmResp := convertOllamaResponseToLLMResponse(resp)
// 输出转换后的响应
yield(llmResp, nil)
}
}
// generateStream 执行流式的内容生成
func (m *OllamaModel) generateStream(ctx context.Context, req OllamaChatRequest) iter.Seq2[*model.LLMResponse, error] {
return func(yield func(*model.LLMResponse, error) bool) {
// 调用 Ollama 流式 API
// 逐块处理流式响应
// 转换每个块并 yield 输出
// ...
}
}
核心工作就是格式转换——在 ADK 的格式和 Ollama 的格式之间来回转换。
6.3 Ollama 官方文档的参考
根据官方文档,在 Python 版本中,Ollama 是通过 LiteLLM 集成的:
# 使用 LiteLLM 集成 Ollama 模型
root_agent = Agent(
# 模型名称格式为 "ollama_chat/模型名"
model=LiteLlm(model="ollama_chat/gemma3:latest"),
# ...
)
官方文档也提到了一些注意事项:
-
使用
ollama_chat接口,而不是ollama。使用ollama可能会导致无限工具调用循环和忽略上下文的问题。 -
设置
OLLAMA_API_BASE环境变量,确保所有请求都被正确路由。 -
选择支持工具的模型。如果你的智能体依赖工具,要确保模型本身支持工具调用。
-
调整模型模板,避免无限工具调用循环。默认的模板可能会让模型总是调用函数,你需要修改模板让模型学会在合适的时候直接回答。
这些注意事项对于 Go 版本同样适用。
6.4 其他集成方式
除了直接实现 model.LLM 接口,还有其他集成方式:
方式一:通过 LiteLLM:LiteLLM 是一个统一的 LLM 接口库,支持 100+ 种模型。你可以用 LiteLLM 作为中间层,这样就不用一个个模型去适配了。
方式二:通过 OpenAI 兼容接口:很多模型服务都提供 OpenAI 兼容的 API(包括 Ollama)。你可以实现一个通用的 OpenAI 兼容客户端,这样所有兼容 OpenAI 格式的模型都能使用。
方式三:通过 MCP:MCP(Model Context Protocol)是一个开放协议。虽然 MCP 主要用于工具,但也可以用来连接模型服务。
7. 流式模式
7.1 流式 vs 非流式
GenerateContent 方法有一个 stream 参数,用于控制是否流式输出。
无论流式还是非流式,返回类型都是 iter.Seq2[*LLMResponse, error]。这是一个很巧妙的设计:
- 流式模式:迭代器产生多个部分响应,最后是完整响应
- 非流式模式:迭代器只产生一个响应(完整响应)
这样,上层代码不需要关心是流式还是非流式——统一用迭代器处理就行。
统一的接口大大简化了上层代码。智能体不需要写两套逻辑来处理流式和非流式。
7.2 流式响应聚合器
当 LLM 以流式模式返回响应时,响应会被分成多个部分(partial responses)逐步到达。每个部分可能只包含一个文本片段、一个函数调用的部分参数,或者一个思考签名。streamingResponseAggregator 的职责是把这些碎片重新组装成完整的响应。
根据 stream_aggregator.go 的实现,聚合器维护了多个缓冲区:
// streamingResponseAggregator 是流式响应的聚合器
type streamingResponseAggregator struct {
// 元数据缓冲区
usageMetadata *genai.GenerateContentResponseUsageMetadata // 用量元数据
groundingMetadata *genai.GroundingMetadata // 接地元数据
citationMetadata *genai.CitationMetadata // 引用元数据
response *model.LLMResponse // 当前聚合中的响应
// 思考签名缓冲区
currentThoughtSignature []byte // 当前思考内容的签名
// 文本缓冲区
sequence []*genai.Part // 已完成的 Part 序列
currentTextBuffer string // 当前文本缓冲区(累积未完成的文本)
currentTextIsThought bool // 当前文本是否为思考内容
finishReason genai.FinishReason // 结束原因
// 函数调用缓冲区(流式参数的 JSON 路径组装)
currentFunctionName string // 当前函数名
currentFunctionID string // 当前函数 ID
currentFunctionArgs map[string]any // 当前函数参数(通过 JSON 路径逐步组装)
currentFunctionThoughtSignature []byte // 函数调用的思考签名
}
聚合器维护了三个主要缓冲区来处理不同类型的内容。元数据缓冲区累积 usage、grounding、citation 等元数据信息,这些信息在流式响应的最后一块中返回,聚合器会将它们合并成完整的元数据。文本缓冲区累积文本内容,特别需要区分普通文本和思考内容(Thought),因为这两者不能混在同一个 Part 中。函数调用缓冲区累积函数调用信息,支持流式参数的组装,因为 Gemini 在流式模式下可能逐步返回函数调用的参数片段。
7.2.1 文本聚合:缓冲与刷新
当 LLM 流式返回文本时,每个部分只包含一小段文字。聚合器把这些文字累积到 currentTextBuffer 中:
// processTextPart 处理文本类型的 Part
func (s *streamingResponseAggregator) processTextPart(part *genai.Part) {
// 如果当前缓冲区的思考状态与新部分不同,先刷新旧缓冲区
if s.currentTextBuffer != "" && part.Thought != s.currentTextIsThought {
s.flushTextBufferToSequence()
}
// 如果缓冲区为空,记录当前部分的思考状态
if s.currentTextBuffer == "" {
s.currentTextIsThought = part.Thought
}
// 将文本追加到缓冲区
s.currentTextBuffer += part.Text
}
这段代码处理了一个微妙的问题:思考内容(Thought)和普通文本不能混在同一个 Part 中。当 LLM 从思考切换到普通文本(或反过来)时,聚合器会先刷新当前缓冲区,把已积累的文本作为一个完整的 Part 加入序列。
7.2.2 函数调用聚合:流式参数的 JSON 路径组装
这是聚合器中最复杂的部分。Gemini 在流式模式下可能逐步返回函数调用的参数——每个部分只包含参数的一个片段,通过 JSON 路径标识。
// processStreamingFunctionCallPart 处理流式函数调用 Part
func (s *streamingResponseAggregator) processStreamingFunctionCallPart(part *genai.Part) {
// 如果 Part 中有函数名,记录下来
if part.FunctionCall.Name != "" {
s.currentFunctionName = part.FunctionCall.Name
}
// 如果 Part 中有函数 ID,记录下来
if part.FunctionCall.ID != "" {
s.currentFunctionID = part.FunctionCall.ID
}
// 遍历所有部分参数
for _, arg := range part.FunctionCall.PartialArgs {
// 获取 JSON 路径,例如 "$.city"
jsonPath := arg.JsonPath
if jsonPath == "" {
// 路径为空则跳过
continue
}
// 根据 JSON 路径获取值
value, ok := s.getValueFromPartialArg(arg, jsonPath)
if !ok {
// 获取失败则跳过
continue
}
// 根据 JSON 路径将值设置到 currentFunctionArgs 中
s.setValueByJSONPath(jsonPath, value)
}
// 如果 WillContinue 为 true,说明还有更多部分要到来
if part.FunctionCall.WillContinue != nil && *part.FunctionCall.WillContinue {
return
}
// 所有部分已到达,刷新文本缓冲区
s.flushTextBufferToSequence()
// 刷新函数调用到序列
s.flushFunctionCallToSequence()
}
setValueByJSONPath 方法支持嵌套路径(如 $.address.city),它会逐层创建 map 并设置最终值:
// setValueByJSONPath 根据 JSON 路径设置值到函数参数中
func (s *streamingResponseAggregator) setValueByJSONPath(jsonPath string, value any) {
// 如果函数参数 map 为空,先初始化
if s.currentFunctionArgs == nil {
s.currentFunctionArgs = make(map[string]any)
}
// 去掉 "$." 前缀,例如 "$.city" 变成 "city"
path := jsonPath
if strings.HasPrefix(jsonPath, "$.") {
path = jsonPath[2:]
}
// 按点号分割路径,"address.city" 变成 ["address", "city"]
pathParts := strings.Split(path, ".")
// 从根 map 开始遍历
current := s.currentFunctionArgs
// 遍历到倒数第二层,逐层创建或获取嵌套 map
for _, part := range pathParts[:len(pathParts)-1] {
// 尝试获取当前层的值
next, exists := current[part]
nextMap, ok := next.(map[string]any)
// 如果不存在或类型不对,创建新的 map
if !exists || !ok {
nextMap = make(map[string]any)
current[part] = nextMap
}
// 进入下一层
current = nextMap
}
// 获取最后一层的键名
lastKey := pathParts[len(pathParts)-1]
// 处理字符串增量拼接:如果已有值且是字符串,将新片段追加到后面
if existingValue, exists := current[lastKey]; exists {
if str, ok := existingValue.(string); ok {
if strValue, ok := value.(string); ok {
// 增量拼接字符串
value = str + strValue
}
}
}
// 设置最后一层的值
current[lastKey] = value
}
对于字符串值,它还会处理增量拼接——如果路径上已有字符串值,新的片段会追加到现有值后面。
7.2.3 Close:生成最终聚合响应
当所有流式部分处理完毕后,调用 Close() 生成最终的聚合响应:
// Close 关闭聚合器,生成最终的完整响应
func (s *streamingResponseAggregator) Close() *model.LLMResponse {
// 如果响应对象存在
if s.response != nil {
// 刷新剩余的文本缓冲区
s.flushTextBufferToSequence()
// 刷新剩余的函数调用缓冲区
s.flushFunctionCallToSequence()
// 返回完整的聚合响应
return &model.LLMResponse{
Content: &genai.Content{
// 完整的 Part 序列
Parts: s.sequence,
// 角色为模型
Role: genai.RoleModel,
},
// 用量元数据
UsageMetadata: s.usageMetadata,
// 接地元数据
GroundingMetadata: s.groundingMetadata,
// 引用元数据
CitationMetadata: s.citationMetadata,
// 错误代码
ErrorCode: errorCode,
// 错误消息
ErrorMessage: errorMessage,
// 结束原因
FinishReason: s.finishReason,
}
}
return nil
}
这个最终响应包含了所有部分的完整内容。注意它不设置 Partial = true,因为它是聚合后的完整响应。
7.2.4 ProcessResponse:处理单个流式部分
// ProcessResponse 处理单个流式响应部分
func (s *streamingResponseAggregator) ProcessResponse(ctx context.Context, resp *genai.GenerateContentResponse) iter.Seq2[*model.LLMResponse, error] {
return func(yield func(*model.LLMResponse, error) bool) {
// 如果响应中没有候选结果,直接返回
if len(resp.Candidates) == 0 {
return
}
// 取第一个候选结果
candidate := resp.Candidates[0]
// 更新元数据累积
s.updateMetadata(candidate, resp.UsageMetadata)
// 如果候选结果中没有内容,直接返回
if candidate.Content == nil {
return
}
// 遍历内容中的每个 Part
for _, part := range candidate.Content.Parts {
// 如果是文本 Part
if part.Text != "" {
s.processTextPart(part)
} else if part.FunctionCall != nil {
// 如果是函数调用 Part
s.processStreamingFunctionCallPart(part)
}
}
// 生成部分响应(用于 UI 实时显示)
partialResp := s.buildPartialResponse()
if partialResp != nil {
yield(partialResp, nil)
}
}
}
7.3 请求转换器:Genai2LLMResponse
ADK 使用转换器将 genai 库的响应格式转换为 ADK 的 LLMResponse 格式。这个转换器在 converters 包中。
// Genai2LLMResponse 将 genai 库的响应格式转换为 ADK 的 LLMResponse 格式
func Genai2LLMResponse(resp *genai.GenerateContentResponse) *model.LLMResponse {
// 如果响应为空或没有候选结果,返回 nil
if resp == nil || len(resp.Candidates) == 0 {
return nil
}
// 取第一个候选结果
candidate := resp.Candidates[0]
// 映射字段到 LLMResponse
return &model.LLMResponse{
// 生成的内容
Content: candidate.Content,
// 引用元数据
CitationMetadata: candidate.CitationMetadata,
// 接地元数据
GroundingMetadata: candidate.GroundingMetadata,
// 用量元数据
UsageMetadata: resp.UsageMetadata,
// 对数概率结果
LogprobsResult: candidate.LogprobsResult,
// 结束原因
FinishReason: candidate.FinishReason,
// ... 其他字段
}
}
转换器的作用是将 genai 库的响应格式转换为 ADK 的 LLMResponse 格式。它首先提取第一个候选响应,因为 genai 可能返回多个候选,而 ADK 只取第一个。然后将 genai 的字段映射到 LLMResponse 的字段,具体映射关系是:Candidates[0].Content 映射到 Content 字段,包含生成的内容;
Candidates[0].CitationMetadata 映射到 CitationMetadata 字段,包含引用元数据;
Candidates[0].GroundingMetadata 映射到 GroundingMetadata 字段,包含接地元数据;
UsageMetadata 映射到 UsageMetadata 字段,包含用量信息;Candidates[0].FinishReason 映射到 FinishReason 字段,包含结束原因。转换器还会处理特殊情况,如空响应和错误响应。
8. 模型配置与选择
8.1 模型选择策略
ADK 支持运行时动态切换模型。
8.1.1 模型名称优先级
// modelName 根据请求中的配置选择实际使用的模型名称
func (m *geminiModel) modelName(req *model.LLMRequest) string {
// 优先使用请求中指定的模型名称
if req.Model != "" {
return req.Model
}
// 否则使用构造时指定的默认模型名称
return m.name
}
优先级:
- 请求中的 Model 字段:可以被插件的
BeforeModelCallback修改 - 构造时指定的模型名称:作为默认值
8.1.2 插件驱动的模型切换
通过插件的 BeforeModelCallback,可以在运行时动态切换模型:
// Plugin 定义了插件结构
type Plugin struct {
// BeforeModelCallback 在模型调用前执行,可以修改请求参数
BeforeModelCallback func(ctx context.Context, req *model.LLMRequest) (*model.LLMRequest, error)
}
使用场景:
- 负载均衡:根据请求特征选择不同的模型
- 成本优化:简单请求用便宜的模型,复杂请求用贵的模型
- A/B 测试:部分用户用新模型,部分用户用旧模型
8.2 生产环境配置最佳实践
8.2.1 Gemini 模型配置
// 基本配置:使用 API Key 认证
model, err := gemini.NewModel(ctx, "gemini-2.5-flash", &genai.ClientConfig{
// 设置 API Key
APIKey: "your-api-key",
})
// 自定义 HTTP 客户端:配置超时和连接池
httpClient := &http.Client{
// 设置请求超时时间为 60 秒
Timeout: 60 * time.Second,
Transport: &http.Transport{
// 最大空闲连接数
MaxIdleConns: 100,
// 空闲连接超时时间
IdleConnTimeout: 90 * time.Second,
// TLS 握手超时时间
TLSHandshakeTimeout: 10 * time.Second,
},
}
// 使用自定义 HTTP 客户端创建模型
model, err := gemini.NewModel(ctx, "gemini-2.5-flash", &genai.ClientConfig{
// 设置 API Key
APIKey: "your-api-key",
// 使用自定义 HTTP 客户端
HTTPClient: httpClient,
})
8.2.2 Apigee 模型配置
// 设置环境变量
// Apigee 代理 URL
os.Setenv("APIGEE_PROXY_URL", "https://my-apigee-proxy.example.com")
// Google Cloud 项目 ID
os.Setenv("GOOGLE_CLOUD_PROJECT", "my-project")
// Google Cloud 区域
os.Setenv("GOOGLE_CLOUD_LOCATION", "us-central1")
// 创建 Apigee 代理模型
model, err := apigee.NewModel(ctx, "apigee/vertex_ai/gemini-2.5-flash",
// 设置自定义请求头
apigee.WithCustomHeaders(http.Header{
// 添加 API Key 请求头
"X-API-Key": []string{"my-api-key"},
}),
)
8.2.3 流式模式的注意事项
在流式模式下,需要注意:
- 取消上下文:用户中断时,及时取消 context,避免浪费资源
- 缓冲区清理:确保流式响应结束后清理缓冲区
- 错误处理:流式过程中的错误需要及时处理
8.3 完整的模型架构
8.3.1 数据流分析
- Gemini 路径:LLMAgent -> Flow -> geminiModel -> genai.Client -> Gemini API
- Apigee 路径:LLMAgent -> Flow -> apigeeModel -> geminiModel -> genai.Client(带代理配置)-> Apigee -> Gemini API
- Ollama 路径:LLMAgent -> Flow -> OllamaModel -> Ollama Server -> 本地模型
9. 总结
ADK 的模型系统虽然代码不多,但设计得很精巧。通过本文的分析,我们了解了:
-
极简接口设计:LLM 接口只有两个方法,简洁而强大
-
统一的流式抽象:流式和非流式统一用迭代器处理,简化上层逻辑
-
Gemini 实现细节:请求头拦截、内容补全、流式聚合器等
-
Apigee 代理:企业级 API 网关代理,支持认证、限流、监控
-
流式响应聚合器:处理流式文本和函数调用参数的组装
-
模型选择策略:运行时动态切换模型,支持插件驱动
-
如何集成新模型:只需要实现 LLM 接口,做好格式转换
-
设计思想:极简接口、流式优先、复用 genai 类型、可扩展元数据
模型是智能体的发动机。一个好的模型抽象层,让开发者可以根据场景灵活选择最合适的模型,而不需要修改业务逻辑。
ADK 在这方面做得相当不错。它的接口简洁,实现清晰,扩展容易。
更多推荐


所有评论(0)