
AI文本SSE流式输出(JAVA版)
当使用ChatGPT或者Deepseek时,模型的回复不是一次性生成整个回答的,而是逐字逐句地生成。这是因为语言模型需要在每个时间步骤预测下一个最合适的单词或字符。如果等待整个回复生成后再输出到网页,会导致用户长时间等待,极大降低用户体验。本文的目的就是通过SSE(Server-Sent Events)流式输出技术,向前端提供所需的参数本文基于JDK8、硅基流动API做的输出。
前言
当使用ChatGPT或者Deepseek时,模型的回复不是一次性生成整个回答的,而是逐字逐句地生成。这是因为语言模型需要在每个时间步骤预测下一个最合适的单词或字符。如果等待整个回复生成后再输出到网页,会导致用户长时间等待,极大降低用户体验。
本文的目的就是通过SSE(Server-Sent Events)流式输出技术,向前端提供所需的实时参数
本文基于JDK8、硅基流动API做的输出
SSE技术
SSE (Server-SentEvents) 技术是一种用于实现服务器主动推送数据给客户端的通信协议。相比传统的请求-响应模式,SSE提供了一种持久连接,允许服务器随时向客户端发送事件和数据,实现了实时性的消息传递。
SSE的工作原理非常简单直观。客户端通过与服务器建立一条持久化的HTTP 连接,然后服务器使用该连接将数据以事件流(eventstream)的形式发送给客户端。这些事件流由多个事件(event)组成,每个事件包含一个标识符、类型和数据字段。客户端通过监听事件流来获取最新的数据,并在接收到事件后进行处理。
与WebSocket 技术相比,SSE使用的是基于 HTTP的长轮询机制,而不需要建立全双工的网络连接。这使得SSE 更容易在现有的基础设施上部署,无需特殊的代理或中间件支持。另外,SSE能够与现有的 Web技术(如 AJAX 和RESTful API)很好地集成,同时也更适合传输较少频繁更新的数据。
SSE 的优点包括:
实时性:SSE 允许服务器主动将数据推送给客户端,实现实时更新和通知。
简单易用:SSE 基于标准的 HTTP 协议,无需额外的库或协议转换。
可靠性:SSE 使用 HTTP 连接,兼容性好,并能通过处理连接断开和错误情况来确保数据传输的可靠性。
轻量级:与 WebSocket 相比,SSE 不需要建立全双工连接,减少了通信的开销和服务器负载。
然而,SSE也有一些限制。由于 SSE基于 HTTP长轮询机制,每个请求都需要建立和维护一个持久化连接,这可能导致较高的资源消耗。此外,SSE适用于单向通信,即服务器向客户端发送数据,而客户端无法向服务器发送消息。
综上所述,SSE技术提供了一种简单、实时的服务器推送数据给客户端的方法,适用于需要实现实时更新和通知的应用场景。它在Web 开发中具有广泛的应用,可用于构建聊天应用、实时监控系统等,并为开发人员带来便利和灵活性。
后端代码
使用硅基流动的api,入参有个属性为 stream 要设置为 true
逻辑类,参数讲解:AI改文(小说推文Java版)
// Results 就是用于返回的
public Results test(List<Messages> messagesList, String userId) throws IOException {
// 本来 messagesList 和 userId 为入参的 为了方便观看 就写死了
messagesList = Lists.newArrayList(new Messages().setRole("user").setContent("你好"));
// 这个是用 webSocket 做的转发
userId = "123";
// 判断用户是否存在
if (!webSocket.userExistence(userId)) {
return Results.success("该用户不存在");
}
// api 所需要的入参 stream 要为 true (默认为 false)
SiliconFlowChatDto siliconFlowChatDto = new SiliconFlowChatDto()
.setModel("deepseek-ai/DeepSeek-R1-Distill-Qwen-32B")
.setStream(true)
.setMessages(messagesList);
BufferedReader reader = null;
try {
URL url = new URL("https://api.siliconflow.cn/v1/chat/completions");
// 建立链接
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
// 以下参数不需要改动
connection.setRequestMethod("POST");
connection.setRequestProperty("Accept", "text/event-stream");
connection.setRequestProperty("Content-type", "application/json; charset=UTF-8");
connection.setRequestProperty("Cache-Control", "no-cache");
connection.setRequestProperty("Connection", "keep-alive");
// 秘钥 就不展示了 不清楚的可以去看我上一篇文章 格式为 "Bearer 秘钥"
connection.setRequestProperty("authorization", siliconFlowChat.getPassword());
// 允许输入和输出
connection.setDoInput(true);
connection.setDoOutput(true);
// 设置超时为0,表示无限制
connection.setConnectTimeout(0);
connection.setReadTimeout(0);
// 传参
String params = JSON.toJSONString(siliconFlowChatDto);
// 写入POST数据
DataOutputStream out = new DataOutputStream(connection.getOutputStream());
out.write(params.getBytes(StandardCharsets.UTF_8));
out.flush();
out.close();
// 读取SSE事件
reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8));
String line;
// 这里就是处理参数了 参数如下(我只截取了部分)
//data: {"id":"019540337f44336edcc7a0f4196cf877","object":"chat.completion.chunk","created":1740538871,"model":"deepseek-ai/DeepSeek-R1","choices":[{"index":0,"delta":{"content":"技术的关键","reasoning_content":null,"role":"assistant"},"finish_reason":null,"content_filter_results":{"hate":{"filtered":false},"self_harm":{"filtered":false},"sexual":{"filtered":false},"violence":{"filtered":false}}}],"system_fingerprint":"","usage":{"prompt_tokens":26,"completion_tokens":1170,"total_tokens":1196}}
//
//data: {"id":"019540337f44336edcc7a0f4196cf877","object":"chat.completion.chunk","created":1740538871,"model":"deepseek-ai/DeepSeek-R1","choices":[{"index":0,"delta":{"content":"。","reasoning_content":null,"role":"assistant"},"finish_reason":null,"content_filter_results":{"hate":{"filtered":false},"self_harm":{"filtered":false},"sexual":{"filtered":false},"violence":{"filtered":false}}}],"system_fingerprint":"","usage":{"prompt_tokens":26,"completion_tokens":1171,"total_tokens":1197}}
//
//data: {"id":"019540337f44336edcc7a0f4196cf877","object":"chat.completion.chunk","created":1740538871,"model":"deepseek-ai/DeepSeek-R1","choices":[{"index":0,"delta":{"content":"","reasoning_content":null,"role":"assistant"},"finish_reason":"stop","content_filter_results":{"hate":{"filtered":false},"self_harm":{"filtered":false},"sexual":{"filtered":false},"violence":{"filtered":false}}}],"system_fingerprint":"","usage":{"prompt_tokens":26,"completion_tokens":1171,"total_tokens":1197}}
//
//data: [DONE]
while ((line = reader.readLine()) != null) {
// 为空的不管他
if (StringUtils.isBlank(line)) {
continue;
}
// 包含 [DONE] 的 为结束标记
if (line.contains("[DONE]")) {
break;
}
// 先去掉字符串 "data: " 其他的字符串作为json反序列化处理
String chatStr = line.replaceAll("data:", "");
SiliconFlowChatVo json = JSONObject.parseObject(chatStr, SiliconFlowChatVo.class);
SiliconFlowChatVo.Choices choices = json.getChoices().get(0);
SiliconFlowChatVo.Delta delta = choices.getDelta();
webSocket.sendOneMessage(userId, Results.successStr(delta));
}
reader.close();
// 断开链接
connection.disconnect();
return Results.success();
} catch (Exception e) {
log.error("硅基流动 SSE流式 报错", e);
return Results.failed(e.getMessage());
} finally {
IoUtil.close(reader);
}
}
WebSocket:springboot整合websocket,超简单入门
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}") // 接口路径 ws://localhost:8080/websocket/userId;
public class WebSocket {
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 用户ID
*/
private String userId;
// concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
// 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
// 注:底下WebSocket是当前类名
private static final CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();
// 用来存在线连接用户信息
private static final ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();
/**
* 链接成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "userId") String userId) {
try {
this.session = session;
this.userId = userId;
webSockets.add(this);
sessionPool.put(userId, session);
log.info("【websocket消息】有新的连接,总数为:{}", webSockets.size());
} catch (Exception e) {
log.error("onOpen 报错", e);
}
}
/**
* 链接关闭调用的方法
*/
@OnClose
public void onClose() {
try {
webSockets.remove(this);
sessionPool.remove(this.userId);
log.info("【websocket消息】连接断开,总数为:{}", webSockets.size());
} catch (Exception e) {
log.error("onClose 报错", e);
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message
*/
@OnMessage
public void onMessage(String message) {
log.info("【websocket消息】收到客户端消息:{}", message);
}
/**
* 发送错误时的处理
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误,原因:" + error.getMessage(), error);
}
/**
* 此为广播消息
*
* @param message
*/
public void sendAllMessage(String message) {
log.info("【websocket消息】广播消息:" + message);
for (WebSocket webSocket : webSockets) {
try {
if (webSocket.session.isOpen()) {
webSocket.session.getAsyncRemote().sendText(message);
}
} catch (Exception e) {
log.error("sendAllMessage 报错", e);
}
}
}
/**
* 此为单点消息
*
* @param userId
* @param message
*/
public void sendOneMessage(String userId, String message) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("【websocket消息】 单点消息: 发给用户【{}】:【{}】", userId, message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
log.error("sendOneMessage 报错", e);
}
}
}
/**
* 此为单点消息(多人)
*
* @param userIds
* @param message
*/
public void sendMoreMessage(String[] userIds, String message) {
for (String userId : userIds) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("【websocket消息】 单点消息(多人): 发给用户【{}】:【{}】", userId, message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
log.error("sendMoreMessage 报错", e);
}
}
}
}
/**
* 用户是否存在
*/
public Boolean userExistence(String userId) {
return sessionPool.containsKey(userId);
}
}
测试(我这里就不用自己写的前端测试了),使用webSocket的在线测试工具来测试WebSocket在线测试工具
用户为 123
后端日志
测试工具
更多推荐
所有评论(0)