SSE接口实战:从ChatGPT流式回复到股票行情,用Java和Vue手把手搭建你的第一个流式应用
本文详细介绍了如何使用SSE(Server-Sent Events)技术构建流式应用,从ChatGPT流式回复到实时股票行情展示。通过Java和Vue.js的实战代码示例,展示了SSE在单向数据推送场景下的优势,包括原生断线重连、简单协议和浏览器原生支持。文章还提供了生产环境中的SSE最佳实践,帮助开发者高效实现低延迟的实时数据推送。
SSE接口实战:从ChatGPT流式回复到股票行情,用Java和Vue手把手搭建你的第一个流式应用
最近在开发一个需要实时展示股票行情的项目时,我遇到了一个技术选型难题:如何在保证低延迟的同时,又能简化客户端的实现复杂度?经过一番调研和对比,我最终选择了Server-Sent Events(SSE)技术方案。与WebSocket相比,SSE在单向数据推送场景下有着独特的优势,特别是在金融数据展示、AI对话流式回复等场景中表现尤为出色。
1. 为什么选择SSE而不是WebSocket?
在实时通信领域,WebSocket无疑是知名度更高的技术,但SSE在某些特定场景下其实更适合。去年我在开发一个类ChatGPT应用时,就深刻体会到了这一点。
SSE的核心优势:
- 原生支持断线重连:客户端自动尝试重新连接
- 更简单的协议:基于HTTP,不需要额外的协议升级
- 内置事件类型:支持自定义事件类型(message, error等)
- 浏览器原生支持:现代浏览器都内置了EventSource API
与WebSocket的对比:
| 特性 | SSE | WebSocket |
|---|---|---|
| 通信方向 | 服务器→客户端单向 | 双向通信 |
| 协议基础 | HTTP | 独立的ws协议 |
| 断线重连 | 内置支持 | 需要手动实现 |
| 浏览器支持 | 原生EventSource API | 原生WebSocket API |
| 适合场景 | 实时通知、数据推送 | 即时聊天、游戏 |
提示:如果你的应用只需要服务器向客户端推送数据(如股票行情、新闻推送、进度更新),SSE通常是更简单高效的选择。
2. Java后端实现:构建高效的SSE服务
使用Spring Boot搭建SSE服务非常简单。下面是我在实际项目中优化过的实现方案:
@RestController
@RequestMapping("/api/sse")
public class StockSseController {
private static final Logger logger = LoggerFactory.getLogger(StockSseController.class);
private final Set<SseEmitter> emitters = Collections.newSetFromMap(new ConcurrentHashMap<>());
@GetMapping("/stocks")
public SseEmitter streamStockPrices() {
SseEmitter emitter = new SseEmitter(60_000L); // 60秒超时
emitters.add(emitter);
emitter.onCompletion(() -> {
logger.info("Emitter completed: {}", emitter);
emitters.remove(emitter);
});
emitter.onTimeout(() -> {
logger.info("Emitter timed out: {}", emitter);
emitters.remove(emitter);
});
emitter.onError((ex) -> {
logger.error("Emitter error: {}", emitter, ex);
emitters.remove(emitter);
});
return emitter;
}
@Scheduled(fixedRate = 1000)
public void sendStockUpdates() {
List<SseEmitter> deadEmitters = new ArrayList<>();
emitters.forEach(emitter -> {
try {
StockPrice price = generateRandomStockPrice();
emitter.send(SseEmitter.event()
.id(String.valueOf(System.currentTimeMillis()))
.name("stockUpdate")
.data(price, MediaType.APPLICATION_JSON));
} catch (Exception e) {
deadEmitters.add(emitter);
}
});
emitters.removeAll(deadEmitters);
}
private StockPrice generateRandomStockPrice() {
// 模拟生成股票价格
return new StockPrice("AAPL",
150 + Math.random() * 10,
System.currentTimeMillis());
}
}
这段代码实现了几个关键优化点:
- 使用
ConcurrentHashMap存储emitters,确保线程安全 - 为emitter设置了合理的60秒超时时间
- 添加了完整的生命周期回调处理
- 使用Spring的
@Scheduled定时推送数据
实际项目中的经验:
- 对于生产环境,建议添加心跳机制,定期发送空消息保持连接
- 可以考虑使用Redis Pub/Sub来跨服务实例同步emitters
- 对于大量连接场景,需要注意JVM内存管理
3. Vue.js前端:优雅处理流式数据
前端实现SSE接收同样简单。下面是我在金融项目中使用的增强版Vue组件:
<template>
<div class="stock-container">
<h2>实时股票行情</h2>
<div class="stock-grid">
<div v-for="stock in stocks" :key="stock.symbol" class="stock-card">
<div class="symbol">{{ stock.symbol }}</div>
<div class="price" :class="{up: stock.isUp, down: !stock.isUp}">
{{ stock.price.toFixed(2) }}
</div>
<div class="change">{{ stock.change >= 0 ? '+' : '' }}{{ stock.change.toFixed(2) }}</div>
</div>
</div>
<div v-if="error" class="error-message">
连接异常: {{ error }} ({{ retryCount }}/3)
</div>
</div>
</template>
<script>
export default {
data() {
return {
stocks: [],
error: null,
retryCount: 0,
eventSource: null
};
},
mounted() {
this.connectSSE();
},
beforeUnmount() {
this.closeSSE();
},
methods: {
connectSSE() {
this.closeSSE();
this.eventSource = new EventSource('/api/sse/stocks');
this.eventSource.addEventListener('stockUpdate', (event) => {
const data = JSON.parse(event.data);
this.updateStock(data);
this.error = null;
this.retryCount = 0;
});
this.eventSource.addEventListener('error', (event) => {
if (event.target.readyState === EventSource.CLOSED) {
this.error = '连接已关闭';
} else {
this.error = '连接异常';
}
if (this.retryCount < 3) {
setTimeout(() => {
this.retryCount++;
this.connectSSE();
}, 1000 * this.retryCount);
}
});
},
updateStock(newStock) {
const index = this.stocks.findIndex(s => s.symbol === newStock.symbol);
if (index >= 0) {
const oldPrice = this.stocks[index].price;
newStock.isUp = newStock.price >= oldPrice;
newStock.change = newStock.price - oldPrice;
this.stocks.splice(index, 1, newStock);
} else {
newStock.isUp = true;
newStock.change = 0;
this.stocks.push(newStock);
}
},
closeSSE() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
}
}
};
</script>
这个组件实现了几个实用功能:
- 自动重连机制(最多重试3次)
- 股票价格变化方向指示(涨/跌)
- 组件卸载时自动关闭连接
- 价格变化动画效果(通过CSS类)
4. 高级应用:SSE在AI对话场景的实践
ChatGPT的流式回复体验令人印象深刻,用SSE同样可以实现类似效果。下面分享我在开发AI客服系统时的关键实现:
后端关键代码:
@GetMapping("/ai/chat")
public SseEmitter chatStream(@RequestParam String question) {
SseEmitter emitter = new SseEmitter(180_000L); // 3分钟超时
executorService.submit(() -> {
try {
// 模拟AI分次生成回复
String fullResponse = aiService.generateResponse(question);
int chunkSize = 5;
for (int i = 0; i < fullResponse.length(); i += chunkSize) {
String chunk = fullResponse.substring(i,
Math.min(i + chunkSize, fullResponse.length()));
emitter.send(SseEmitter.event()
.data(chunk)
.id(String.valueOf(i))
.comment("部分响应"));
Thread.sleep(100); // 模拟处理延迟
}
emitter.send(SseEmitter.event()
.name("complete")
.data("[[END]]"));
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
前端处理流式文本的关键技巧:
// 在Vue组件中
this.eventSource.addEventListener('message', (event) => {
if (event.data === '[[END]]') {
this.isLoading = false;
return;
}
this.responseText += event.data;
this.$nextTick(() => {
// 自动滚动到底部
const container = this.$refs.chatContainer;
container.scrollTop = container.scrollHeight;
});
});
性能优化建议:
- 使用
TextEncoder/TextDecoder处理非ASCII字符 - 对于长对话,考虑分页加载历史消息
- 添加打字机动画效果提升用户体验
- 实现客户端缓存减少重复请求
5. 生产环境中的SSE最佳实践
在实际部署SSE服务时,我总结了以下经验教训:
连接管理:
- 配置合理的超时时间(通常30-120秒)
- 实现心跳机制(每15-30秒发送注释消息)
- 使用Nginx等代理时,调整相关超时设置:
proxy_read_timeout 300s; proxy_connect_timeout 75s;
错误处理:
- 客户端应监听error事件并实现指数退避重连
- 服务端应记录连接异常情况
- 考虑添加连接状态监控
安全考虑:
- 使用HTTPS加密通信
- 实现认证机制(如JWT)
- 限制每个客户端的连接数
- 添加CORS配置(如果跨域)
扩展性方案:
// 使用Redis发布订阅实现多实例同步
@Bean
public RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener((message, pattern) -> {
String stockJson = new String(message.getBody());
StockPrice price = objectMapper.readValue(stockJson, StockPrice.class);
broadcastStockUpdate(price);
}, new ChannelTopic("stockUpdates"));
return container;
}
在最近的一个项目中,我们使用SSE处理了日均100万+的连接,峰值时约2万并发连接。通过合理的JVM调优和Nginx配置,单个4核8G的服务器实例就能轻松应对这样的负载。
更多推荐



所有评论(0)