java sdk 流式回答 deepseek
deepseek中提供了多种种问答方式,一种是直接回答,另一种是流式回答。文档地址: DeepSeek API 文档。控制器处理流式返回,通过outputStream刷数据,超过30s 导致超时报错
·
文章目录
概要
调用deepseek流式问答,封装接口以流式返回
deepseek中提供了多种种问答方式,一种是直接回答,另一种是流式回答。文档地址: DeepSeek API 文档。
如何将流式的答案以流式返回给前端呢?
- 问答核心
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.zhipu.oapi.service.v4.model.ChatMessage;
import com.zhipu.oapi.service.v4.model.ChatMessageRole;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.util.Assert;
import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Flux;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@Configuration
@Slf4j
public class DeepseekClient implements BaseClient {
@Value("${deepseek.api.key}")
private String apiKey;
@Value("${deepseek.api.base-url}")
private String baseUrl;
private static final String DEEPSEEK_SPLIT = "$@$";
@Autowired
private RestTemplate restTemplate;
@Autowired
private ObjectMapper mapper;
@Autowired
@Lazy
private ChatService chatService;
@Override
@SneakyThrows
public String chat(ChatDto chatDto, String history, String param) {
List<ChatMessage> messages = new ArrayList<>();
ChatMessage chatMessage = new ChatMessage(ChatMessageRole.USER.value(), chatDto.getQuestion());
// messages.add(new ChatMessage(ChatMessageRole.SYSTEM.value(), "教学专家"));
messages.add(chatMessage);
Map<String, Object> newHashMap = MapUtil.newHashMap();
newHashMap.put("messages", messages);
newHashMap.put("model", "deepseek-reasoner");
// 设置请求头
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setBearerAuth(apiKey);
HttpEntity<Map<String, Object>> entity = new HttpEntity<>(newHashMap, headers);
log.info("deepseekClient请求入参:{}", mapper.writeValueAsString(entity));
ObjectNode chatResponse = restTemplate.postForObject(Objects.requireNonNull(baseUrl), entity, ObjectNode.class);
DeepSeekModelData modelData = JSONUtil.toBean(JSONUtil.toJsonStr(mapper.writeValueAsString(chatResponse)), DeepSeekModelData.class);
log.info("deepseekClient请求返回response:{}", JSONUtil.toJsonStr(modelData));
Assert.isTrue(ObjectUtil.isNotNull(modelData) && CollUtil.isNotEmpty(modelData.getChoices()), "请求失败");
Object reasoningContent = modelData.getChoices().get(0).getMessage().getReasoningContent();
Object answer = modelData.getChoices().get(0).getMessage().getContent();
return reasoningContent.toString() + DEEPSEEK_SPLIT + answer.toString();
}
/**
* 流式对话
*
* @param chatDto 入参
* @param history 历史
* @param param 参数
* @param conversation 会话
* @return 回答
*/
@Override
@SneakyThrows
public Flux<ServerSentEvent<String>> chatStream(ChatDto chatDto, String history, String param, Conversation conversation) {
List<ChatMessage> messages = new ArrayList<>();
ChatMessage chatMessage = new ChatMessage(ChatMessageRole.USER.value(), chatDto.getQuestion());
messages.add(new ChatMessage(ChatMessageRole.SYSTEM.value(), ObjectUtil.isNull(param) ? "教学专家" : param));
messages.add(chatMessage);
Map<String, Object> newHashMap = MapUtil.newHashMap();
newHashMap.put("messages", messages);
newHashMap.put("model", "deepseek-reasoner");
newHashMap.put("stream", Boolean.TRUE);
// 设置请求头
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setBearerAuth(apiKey);
AtomicBoolean isContent = new AtomicBoolean(false);
StringBuffer answer = new StringBuffer();
Flux<ServerSentEvent<String>> result =Flux.create(sink -> {
log.info("开始请求");
restTemplate.execute(Objects.requireNonNull(baseUrl), HttpMethod.POST, request -> {
request.getHeaders().addAll(headers);
request.getBody().write(mapper.writeValueAsString(newHashMap).getBytes(StandardCharsets.UTF_8));
}, response -> {
try (InputStream inputStream = response.getBody();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
if (StrUtil.isBlank(line) || ": keep-alive".equals(line)) {
continue;
}
// 打印当前流内容
if (StrUtil.isNotBlank(line) && line.contains("data:")) {
String jsonStr = line.split("data:")[1];
if (jsonStr.contains("[DONE]")) {
continue;
}
DeepSeekModelData modelData = JSONUtil.toBean(jsonStr, DeepSeekModelData.class);
DeepSeekModelData.ChoicesDTO.MessageDTO delta = modelData.getChoices().get(0).getDelta();
sink.next(ServerSentEvent.builder(JSONUtil.toJsonStr(delta)).build());
if (!isContent.get()) {
if (ObjectUtil.isNotNull(delta.getReasoningContent())) {
answer.append(delta.getReasoningContent());
}
}
if (ObjectUtil.isNull(delta.getReasoningContent())) {
if (isContent.compareAndSet(false, true)) {
answer.append(DEEPSEEK_SPLIT);
}
}
if (isContent.get()) {
answer.append(delta.getContent());
}
}
}
} catch (IOException e) {
log.error("读取流异常:{}", e.getMessage());
e.printStackTrace();
}
return null;
});
log.info("结束请求,记录回答");
if (StrUtil.isBlank(answer.toString())) {
sink.next(ServerSentEvent.builder("error").build());
}
sink.complete();
// 记录回答
chatService.saveAnswer(answer.toString(), conversation);
});
return result;
}
@Override
public List<? extends ChatMessage> getHistoryList(String history) {
if (StrUtil.isBlank(history)) {
return new ArrayList<>();
}
List<Message> list = JSONUtil.toList(JSONUtil.parseArray(history), Message.class);
return list.stream().map(message -> new ChatMessage(ChatMessageRole.USER.value(), message.getContent())).collect(Collectors.toList());
}
}
class DeepSeekModelData {
@JsonProperty("id")
private String id;
@JsonProperty("created")
private Integer created;
@JsonProperty("model")
private String model;
@JsonProperty("system_fingerprint")
private String systemFingerprint;
@JsonProperty("object")
private String object;
@JsonProperty("usage")
private UsageDTO usage;
@JsonProperty("choices")
private List<ChoicesDTO> choices;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Integer getCreated() {
return created;
}
public void setCreated(Integer created) {
this.created = created;
}
public String getModel() {
return model;
}
public void setModel(String model) {
this.model = model;
}
public String getSystemFingerprint() {
return systemFingerprint;
}
public void setSystemFingerprint(String systemFingerprint) {
this.systemFingerprint = systemFingerprint;
}
public String getObject() {
return object;
}
public void setObject(String object) {
this.object = object;
}
public UsageDTO getUsage() {
return usage;
}
public void setUsage(UsageDTO usage) {
this.usage = usage;
}
public List<ChoicesDTO> getChoices() {
return choices;
}
public void setChoices(List<ChoicesDTO> choices) {
this.choices = choices;
}
@JsonIgnoreProperties(ignoreUnknown = true)
public static class UsageDTO {
@JsonProperty("completion_tokens")
private Integer completionTokens;
@JsonProperty("prompt_tokens")
private Integer promptTokens;
@JsonProperty("prompt_cache_hit_tokens")
private Integer promptCacheHitTokens;
@JsonProperty("prompt_cache_miss_tokens")
private Integer promptCacheMissTokens;
@JsonProperty("total_tokens")
private Integer totalTokens;
@JsonProperty("completion_tokens_details")
private CompletionTokensDetailsDTO completionTokensDetails;
public Integer getCompletionTokens() {
return completionTokens;
}
public void setCompletionTokens(Integer completionTokens) {
this.completionTokens = completionTokens;
}
public Integer getPromptTokens() {
return promptTokens;
}
public void setPromptTokens(Integer promptTokens) {
this.promptTokens = promptTokens;
}
public Integer getPromptCacheHitTokens() {
return promptCacheHitTokens;
}
public void setPromptCacheHitTokens(Integer promptCacheHitTokens) {
this.promptCacheHitTokens = promptCacheHitTokens;
}
public Integer getPromptCacheMissTokens() {
return promptCacheMissTokens;
}
public void setPromptCacheMissTokens(Integer promptCacheMissTokens) {
this.promptCacheMissTokens = promptCacheMissTokens;
}
public Integer getTotalTokens() {
return totalTokens;
}
public void setTotalTokens(Integer totalTokens) {
this.totalTokens = totalTokens;
}
public CompletionTokensDetailsDTO getCompletionTokensDetails() {
return completionTokensDetails;
}
public void setCompletionTokensDetails(CompletionTokensDetailsDTO completionTokensDetails) {
this.completionTokensDetails = completionTokensDetails;
}
@JsonIgnoreProperties(ignoreUnknown = true)
public static class CompletionTokensDetailsDTO {
@JsonProperty("reasoning_tokens")
private Integer reasoningTokens;
public Integer getReasoningTokens() {
return reasoningTokens;
}
public void setReasoningTokens(Integer reasoningTokens) {
this.reasoningTokens = reasoningTokens;
}
}
}
@JsonIgnoreProperties(ignoreUnknown = true)
public static class ChoicesDTO {
@JsonProperty("finish_reason")
private String finishReason;
@JsonProperty("index")
private Integer index;
@JsonProperty("message")
private MessageDTO message;
@JsonProperty("logprobs")
private LogprobsDTO logprobs;
@JsonProperty("delta")
private MessageDTO delta;
public String getFinishReason() {
return finishReason;
}
public void setFinishReason(String finishReason) {
this.finishReason = finishReason;
}
public Integer getIndex() {
return index;
}
public void setIndex(Integer index) {
this.index = index;
}
public MessageDTO getMessage() {
return message;
}
public void setMessage(MessageDTO message) {
this.message = message;
}
public LogprobsDTO getLogprobs() {
return logprobs;
}
public void setLogprobs(LogprobsDTO logprobs) {
this.logprobs = logprobs;
}
public MessageDTO getDelta() {
return delta;
}
public void setDelta(MessageDTO delta) {
this.delta = delta;
}
@JsonIgnoreProperties(ignoreUnknown = true)
public static class MessageDTO {
@JsonProperty("content")
private String content;
@JsonProperty("reasoning_content")
private String reasoningContent;
@JsonProperty("role")
private String role;
@JsonProperty("tool_calls")
private List<ToolCallsDTO> toolCalls;
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getReasoningContent() {
return reasoningContent;
}
public void setReasoningContent(String reasoningContent) {
this.reasoningContent = reasoningContent;
}
public String getRole() {
return role;
}
public void setRole(String role) {
this.role = role;
}
public List<ToolCallsDTO> getToolCalls() {
return toolCalls;
}
public void setToolCalls(List<ToolCallsDTO> toolCalls) {
this.toolCalls = toolCalls;
}
@JsonIgnoreProperties(ignoreUnknown = true)
public static class ToolCallsDTO {
@JsonProperty("id")
private String id;
@JsonProperty("type")
private String type;
@JsonProperty("function")
private FunctionDTO function;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public FunctionDTO getFunction() {
return function;
}
public void setFunction(FunctionDTO function) {
this.function = function;
}
@JsonIgnoreProperties(ignoreUnknown = true)
public static class FunctionDTO {
@JsonProperty("name")
private String name;
@JsonProperty("arguments")
private String arguments;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getArguments() {
return arguments;
}
public void setArguments(String arguments) {
this.arguments = arguments;
}
}
}
}
@JsonIgnoreProperties(ignoreUnknown = true)
public static class LogprobsDTO {
@JsonProperty("content")
private List<ContentDTO> content;
public List<ContentDTO> getContent() {
return content;
}
public void setContent(List<ContentDTO> content) {
this.content = content;
}
@JsonIgnoreProperties(ignoreUnknown = true)
public static class ContentDTO {
@JsonProperty("token")
private String token;
@JsonProperty("logprob")
private Integer logprob;
@JsonProperty("bytes")
private List<Integer> bytes;
@JsonProperty("top_logprobs")
private List<TopLogprobsDTO> topLogprobs;
public String getToken() {
return token;
}
public void setToken(String token) {
this.token = token;
}
public Integer getLogprob() {
return logprob;
}
public void setLogprob(Integer logprob) {
this.logprob = logprob;
}
public List<Integer> getBytes() {
return bytes;
}
public void setBytes(List<Integer> bytes) {
this.bytes = bytes;
}
public List<TopLogprobsDTO> getTopLogprobs() {
return topLogprobs;
}
public void setTopLogprobs(List<TopLogprobsDTO> topLogprobs) {
this.topLogprobs = topLogprobs;
}
@JsonIgnoreProperties(ignoreUnknown = true)
public static class TopLogprobsDTO {
@JsonProperty("token")
private String token;
@JsonProperty("logprob")
private Integer logprob;
@JsonProperty("bytes")
private List<Integer> bytes;
public String getToken() {
return token;
}
public void setToken(String token) {
this.token = token;
}
public Integer getLogprob() {
return logprob;
}
public void setLogprob(Integer logprob) {
this.logprob = logprob;
}
public List<Integer> getBytes() {
return bytes;
}
public void setBytes(List<Integer> bytes) {
this.bytes = bytes;
}
}
}
}
}
}
- 控制器处理
@Autowired
private ChatService chatService;
@SneakyThrows
@Operation(summary = "流式对话")
@PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public StreamingResponseBody chatStream(@Valid @RequestBody ChatDto chatDto) {
CompletableFuture<Flux<ServerSentEvent<String>>> completedFuture = CompletableFuture.supplyAsync(() -> chatService.chatStream(chatDto));
Flux<ServerSentEvent<String>> serverSentEventFlux = completedFuture.get();
return outputStream -> {
serverSentEventFlux.timeout(Duration.ofMinutes(2)).onErrorReturn(ServerSentEvent.builder("error").build()).subscribe(
serverSentEvent -> {
try {
String data = serverSentEvent.data();
String finalData = " ";
if (StrUtil.startWith(data, "data:")) {
finalData = data + "\n\n";
} else {
finalData = "data: " + data + "\n\n";
}
outputStream.write(finalData.getBytes());
outputStream.flush();
} catch (IOException e) {
log.error("输出流异常:{}", e.getMessage());
throw new RuntimeException(e);
}
}
);
};
}
- 流式返回,通过outputStream刷数据,超过30s 导致超时报错
spring:
mvc:
async:
request-timeout: 180000
填加参数,设置outputStream 的异步处理数据时间
# deepseek 配置
deepseek:
api:
key: sk-48**************f43a9c99326c37
base-url: https://api.deepseek.com/chat/completions
至此实现了流式问答数据的返回。
- nginx 配置: 关闭响应缓存 既可以实现支持流
location / {
proxy_pass 10.0.56.10;
proxy_buffering off; # 关闭响应缓存
}
小结
- 文章中利用了部分zhipu中的对象,由于都是标准sdk,所以都是通用的
- 文章中会对消息进行存储,不需要的小伙伴自行移除
更多推荐
所有评论(0)