一、背景

        在大语言模型(LLM)应用场景中,GPT-4等模型的响应生成往往需要数秒至数十秒的等待时间。传统同步请求会导致用户面对空白页面等待,体验较差。本文通过Spring WebFlux响应式编程SSE服务器推送技术,实现类似打印机的逐字流式输出效果,同时结合LangChain4j框架进行AI能力集成,有效提升用户体验。

二、技术结构设计

1. 技术选型

组件 选型 优势
响应式框架 Spring WebFlux 非阻塞IO,支持背压控制
流式协议 SSE 简单易用,自动重连机制
AI集成框架 LangChain4j 0.36.2 标准化LLM接口,支持流式回调
向量数据库 Milvus 2.5.1 低延迟相似度搜索

2. 整体方案

3. 组件关系图

三、核心实现详解

1. 依赖配置(pom.xml)

        <dependency>
            <groupId>io.milvus</groupId>
                <artifactId>milvus-sdk-java</artifactId>
            <version>2.5.1</version>
        </dependency>

        <dependency>
            <groupId>dev.langchain4j</groupId>
                <artifactId>langchain4j-milvus</artifactId>
            <version>0.36.2</version>
        </dependency>

        <dependency>
            <groupId>dev.langchain4j</groupId>
                <artifactId>langchain4j-embeddings-all-minilm-l6-v2</artifactId>
            <version>0.36.2</version>
        </dependency>

        <dependency>
            <groupId>dev.langchain4j</groupId>
                <artifactId>langchain4j-open-ai</artifactId>
            <version>0.36.2</version>
        </dependency>

        <dependency>
            <groupId>dev.langchain4j</groupId>
                <artifactId>langchain4j-open-ai-spring-boot-starter</artifactId>
            <version>0.36.2</version>
        </dependency>

        <dependency>
            <groupId>dev.langchain4j</groupId>
                <artifactId>langchain4j-reactor</artifactId>
            <version>0.36.2</version>
        </dependency>

2. 流式接口设计(Controller层)

注意接口的content-type= text/event-stream

@PostMapping(value = "/stream-chat", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChat(@RequestBody ChatReq request) {
    return chatService.streamGenerate(request)
            .timeout(Duration.ofSeconds(30))  // 防止僵尸连接
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))); // 网络抖动重试
}
@ApiModel(value = "对话请求")
public class ChatReq {

    @ApiModelProperty(value = "对话id")
    private Long chatId;

    @ApiModelProperty(value = "对话类型")
    private Integer type;

    @ApiModelProperty(value = "提问")
    private String question;

    @ApiModelProperty(value = "外部id")
    private List<Long> externalIds;

    @ApiModelProperty(value = "向量检索阈值", example = "0.5")
    @Min(value = 0)
    @Max(value = 1)
    private Double retrievalThreshold;


    @ApiModelProperty(value = "向量匹配结果数", example = "5")
    @Min(value = 1)
    private Integer topK;

   ....


}

流式处理核心逻辑(Service层)

1)主体请求

public Flux<String> chat(ChatReq chatReq) {
        // Create a Sink that will emit items to Flux
        Sinks.Many<ApiResponse<String>> sink = Sinks.many().multicast().onBackpressureBuffer();
        // 用于控制数据生成逻辑的标志
        AtomicBoolean isCancelled = new AtomicBoolean(false);
        ChatStreamingResponseHandler chatStreamingResponseHandler = new ChatStreamingResponseHandler();
        // 判断新旧对话
        if (isNewChat(chatReq.getChatId())) { // 新对话,涉及业务略过
            chatReq.setHasHistory(false);
            chatModelHandle(chatReq);
        } else { // 旧对话
            // 根据chatId查询对话类型和对话历史
            chatReq.setHasHistory(true);
            chatModelHandle(chatReq);
        }
        return sink.asFlux().doOnCancel(() -> {
            log.info("停止流处理");
            isCancelled.set(true); // 设置取消标志
            sink.tryEmitComplete(); // 停止流
        });
    }

2)构建请求参数

有会话历史,获取会话历史(请求回答和回答)

封装成ChatMessages(question存UserMessage、answer存AiMessage)

​
private void chatModelHandle(ChatReq chatReq){
        List<ChatMessage> history = new ArrayList<>();
        if (chatReq.getHasHistory()) {
            // 组装对话历史,获取question和answer分别存UserMessage和AiMessage
            history = getHistory(chatReq.getChatId());
        }
        Integer chatType = chatReq.getType();
        //依赖文本
        List<Long> externalIds = chatReq.getExternalIds();
        // 判断对话类型
        if (ChatType.NORMAL.getCode().equals(chatType)) { // 普通对话
            if (chatReq.getHasHistory()) {
                history.add(UserMessage.from(chatReq.getQuestion()));
            }
            chatStreamingResponseHandler = new ChatStreamingResponseHandler(sink, chatReq, isCancelled);
            ChatModelClient.getStreamingChatLanguageModel(chatReq.getTemperature())
                    .generate(chatReq.getHasHistory() ? history : chatReq.getQuestion(), chatStreamingResponseHandler);
        } else if (ChatType.DOCUMENT_DB.getCode().equals(chatType)) { // 文本对话
            Prompt prompt = geneRagPrompt(chatReq);
            if (chatReq.getHasHistory()) {
                history.add(UserMessage.from(prompt.text()));
            }
            chatStreamingResponseHandler = new ChatStreamingResponseHandler(sink, chatReq, isCancelled);
            ChatModelClient.getStreamingChatLanguageModel(chatReq.getTemperature())
                    .generate(chatReq.getHasHistory() ? history : prompt.text(), chatStreamingResponseHandler);
        } else {
            throw new BizException("功能待开发");
        }
    }

​

3) 向量检索优化

如果有参考文本,获取参考文本

在向量库中,根据参考文本id和向量检索阈值,查看参考文本topN

    private List<PPid> search(ChatReq chatReq, MilvusClientV2 client, MilvusConfig config, EmbeddingModel model) {
       
        //使用文本id进行查询
        TextSegment segment = TextSegment.from(chatReq.getQuestion());
        Embedding queryEmbedding = model.embed(segment).content();
        SearchResp searchResp = client.search(SearchReq.builder()
                .collectionName(config.getCollectionName())
                .data(Collections.singletonList(new FloatVec(queryEmbedding.vector())))
                .filter(String.format("ARRAY_CONTAINS(documentIdList, %s)", chatReq.getExternalIds()))
                .topK(chatReq.getTopK() == null ? config.getTopK() : chatReq.getTopK())
                .outputFields(Arrays.asList("pid", "documentId"))
                .build());
        // 过滤掉分数低于阈值的结果
        List<SearchResp.SearchResult> searchResults = searchResp.getSearchResults().get(0);
        Double minScore = chatReq.getRetrievalThreshold() == null ? config.getMinScore() : chatReq.getRetrievalThreshold();
        return searchResults.stream()
                .filter(item -> item.getScore() >= minScore)
                .sorted((item1, item2) -> Double.compare(item2.getScore(), item1.getScore()))
                .map(item -> new PPid(
                        (Long) item.getEntity().get("documentId"),
                        (Long) item.getEntity().get("pid")
                ))
                .toList();
    }

获取参考文本id后,获取文本,再封装请求模版

​
private Prompt genePrompt(String context) {
        ...
}

​

4)连接大模型客户端

public static StreamingChatLanguageModel getStreamingChatLanguageModel() {
        ChatModelConfig config = ChatConfig.getInstance().getChatModelConfig();
        return OpenAiStreamingChatModel.builder()
                .baseUrl(config.getBaseUrl())
                .modelName(config.getModelName())
                .apiKey(config.getApiKey())
                .maxTokens(config.getMaxTokens())
                .timeout(Duration.ofSeconds(config.getTimeout()))
                .build();
}

5)大模型输出结果处理


@Slf4j
@Data
@NoArgsConstructor
public class ChatStreamingResponseHandler implements StreamingResponseHandler<AiMessage> {

    private Sinks.Many<ApiResponse<String>> sink;
    private ChatReq chatReq;
    private AtomicBoolean isCancelled;

  
    public ChatStreamingResponseHandler(Sinks.Many<ApiResponse<String>> sink, ChatReq chatReq, AtomicBoolean isCancelled) {
        this.sink = sink;
        this.chatReq = chatReq;
        this.isCancelled = isCancelled;
    }


    @Override
    public void onNext(String answer) {
        //取消不输出
        if (isCancelled.get()) {
            return;
        }
        sink.tryEmitNext(BaseController.success(answer));
    }

    @Override
    public void onComplete(Response<AiMessage> response) {
           if (!isCancelled.get()) {
            sink.tryEmitNext("结束标识");
            sink.tryEmitComplete();
        }
		// 业务处理
    }

    @Override
    public void onError(Throwable error) {
         if (!isCancelled.get()) {
            sink.tryEmitError(error);
        }
        // 业务处理
    }

}

四、效果呈现

五、常见问题排查

问题现象 解决方案
SSE连接自动断开 检查Nginx代理超时设置 > 5分钟
中文乱码 添加charset=UTF-8
首次响应延迟过高 预热向量检索模型
内存泄漏 检查响应式操作符的资源释放

六、结尾

上面简要列一下实现步骤,可以留言深入讨论。

有许多体验还需要完善,以参考豆包举例:

1、实现手动停止响应

2、刷新或者页面关闭自动停止流式输出,重连后流式输出继续

3、将多个Token打包发送,减少SSE帧数量

扩展阅读

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐