概要

调用deepseek流式问答,封装接口以流式返回

deepseek中提供了多种种问答方式,一种是直接回答,另一种是流式回答。文档地址: DeepSeek API 文档

如何将流式的答案以流式返回给前端呢?

  • 问答核心

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.zhipu.oapi.service.v4.model.ChatMessage;
import com.zhipu.oapi.service.v4.model.ChatMessageRole;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.util.Assert;
import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Flux;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

@Configuration
@Slf4j
public class DeepseekClient implements BaseClient {

    @Value("${deepseek.api.key}")
    private String apiKey;

    @Value("${deepseek.api.base-url}")
    private String baseUrl;

    private static final String DEEPSEEK_SPLIT = "$@$";

    @Autowired
    private RestTemplate restTemplate;

    @Autowired
    private ObjectMapper mapper;

    @Autowired
    @Lazy
    private ChatService chatService;

    @Override
    @SneakyThrows
    public String chat(ChatDto chatDto, String history, String param) {
        List<ChatMessage> messages = new ArrayList<>();
        ChatMessage chatMessage = new ChatMessage(ChatMessageRole.USER.value(), chatDto.getQuestion());
//        messages.add(new ChatMessage(ChatMessageRole.SYSTEM.value(), "教学专家"));
        messages.add(chatMessage);
        Map<String, Object> newHashMap = MapUtil.newHashMap();
        newHashMap.put("messages", messages);
        newHashMap.put("model", "deepseek-reasoner");
        // 设置请求头
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        headers.setBearerAuth(apiKey);
        HttpEntity<Map<String, Object>> entity = new HttpEntity<>(newHashMap, headers);
        log.info("deepseekClient请求入参:{}", mapper.writeValueAsString(entity));
        ObjectNode chatResponse = restTemplate.postForObject(Objects.requireNonNull(baseUrl), entity, ObjectNode.class);
        DeepSeekModelData modelData = JSONUtil.toBean(JSONUtil.toJsonStr(mapper.writeValueAsString(chatResponse)), DeepSeekModelData.class);
        log.info("deepseekClient请求返回response:{}", JSONUtil.toJsonStr(modelData));
        Assert.isTrue(ObjectUtil.isNotNull(modelData) && CollUtil.isNotEmpty(modelData.getChoices()), "请求失败");
        Object reasoningContent = modelData.getChoices().get(0).getMessage().getReasoningContent();
        Object answer = modelData.getChoices().get(0).getMessage().getContent();
        return reasoningContent.toString() + DEEPSEEK_SPLIT + answer.toString();
    }

    /**
     * 流式对话
     *
     * @param chatDto      入参
     * @param history      历史
     * @param param        参数
     * @param conversation 会话
     * @return 回答
     */
    @Override
    @SneakyThrows
    public Flux<ServerSentEvent<String>> chatStream(ChatDto chatDto, String history, String param, Conversation conversation) {
        List<ChatMessage> messages = new ArrayList<>();
        ChatMessage chatMessage = new ChatMessage(ChatMessageRole.USER.value(), chatDto.getQuestion());
        messages.add(new ChatMessage(ChatMessageRole.SYSTEM.value(), ObjectUtil.isNull(param) ? "教学专家" : param));
        messages.add(chatMessage);
        Map<String, Object> newHashMap = MapUtil.newHashMap();
        newHashMap.put("messages", messages);
        newHashMap.put("model", "deepseek-reasoner");
        newHashMap.put("stream", Boolean.TRUE);
        // 设置请求头
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        headers.setBearerAuth(apiKey);

        AtomicBoolean isContent = new AtomicBoolean(false);

        StringBuffer answer = new StringBuffer();
        Flux<ServerSentEvent<String>> result =Flux.create(sink -> {
            log.info("开始请求");
            restTemplate.execute(Objects.requireNonNull(baseUrl), HttpMethod.POST, request -> {
                request.getHeaders().addAll(headers);
                request.getBody().write(mapper.writeValueAsString(newHashMap).getBytes(StandardCharsets.UTF_8));
            }, response -> {
                try (InputStream inputStream = response.getBody();
                     BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
                    String line;
                    while ((line = reader.readLine()) != null) {
                        if (StrUtil.isBlank(line) || ": keep-alive".equals(line)) {
                            continue;
                        }
                        // 打印当前流内容
                        if (StrUtil.isNotBlank(line) && line.contains("data:")) {
                            String jsonStr = line.split("data:")[1];
                            if (jsonStr.contains("[DONE]")) {
                                continue;
                            }
                            DeepSeekModelData modelData = JSONUtil.toBean(jsonStr, DeepSeekModelData.class);
                            DeepSeekModelData.ChoicesDTO.MessageDTO delta = modelData.getChoices().get(0).getDelta();
                            sink.next(ServerSentEvent.builder(JSONUtil.toJsonStr(delta)).build());
                            if (!isContent.get()) {
                                if (ObjectUtil.isNotNull(delta.getReasoningContent())) {
                                    answer.append(delta.getReasoningContent());
                                }
                            }
                            if (ObjectUtil.isNull(delta.getReasoningContent())) {
                                if (isContent.compareAndSet(false, true)) {
                                    answer.append(DEEPSEEK_SPLIT);
                                }
                            }
                            if (isContent.get()) {
                                answer.append(delta.getContent());
                            }
                        }
                    }
                } catch (IOException e) {
                    log.error("读取流异常:{}", e.getMessage());
                    e.printStackTrace();
                }
                return null;
            });
            log.info("结束请求,记录回答");
            if (StrUtil.isBlank(answer.toString())) {
                sink.next(ServerSentEvent.builder("error").build());
            }
            sink.complete();
            // 记录回答
            chatService.saveAnswer(answer.toString(), conversation);
        });
        return result;
    }

    @Override
    public List<? extends ChatMessage> getHistoryList(String history) {
        if (StrUtil.isBlank(history)) {
            return new ArrayList<>();
        }
        List<Message> list = JSONUtil.toList(JSONUtil.parseArray(history), Message.class);
        return list.stream().map(message -> new ChatMessage(ChatMessageRole.USER.value(), message.getContent())).collect(Collectors.toList());
    }
}

class DeepSeekModelData {

    @JsonProperty("id")
    private String id;
    @JsonProperty("created")
    private Integer created;
    @JsonProperty("model")
    private String model;
    @JsonProperty("system_fingerprint")
    private String systemFingerprint;
    @JsonProperty("object")
    private String object;
    @JsonProperty("usage")
    private UsageDTO usage;
    @JsonProperty("choices")
    private List<ChoicesDTO> choices;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Integer getCreated() {
        return created;
    }

    public void setCreated(Integer created) {
        this.created = created;
    }

    public String getModel() {
        return model;
    }

    public void setModel(String model) {
        this.model = model;
    }

    public String getSystemFingerprint() {
        return systemFingerprint;
    }

    public void setSystemFingerprint(String systemFingerprint) {
        this.systemFingerprint = systemFingerprint;
    }

    public String getObject() {
        return object;
    }

    public void setObject(String object) {
        this.object = object;
    }

    public UsageDTO getUsage() {
        return usage;
    }

    public void setUsage(UsageDTO usage) {
        this.usage = usage;
    }

    public List<ChoicesDTO> getChoices() {
        return choices;
    }

    public void setChoices(List<ChoicesDTO> choices) {
        this.choices = choices;
    }

    @JsonIgnoreProperties(ignoreUnknown = true)
    public static class UsageDTO {
        @JsonProperty("completion_tokens")
        private Integer completionTokens;
        @JsonProperty("prompt_tokens")
        private Integer promptTokens;
        @JsonProperty("prompt_cache_hit_tokens")
        private Integer promptCacheHitTokens;
        @JsonProperty("prompt_cache_miss_tokens")
        private Integer promptCacheMissTokens;
        @JsonProperty("total_tokens")
        private Integer totalTokens;
        @JsonProperty("completion_tokens_details")
        private CompletionTokensDetailsDTO completionTokensDetails;

        public Integer getCompletionTokens() {
            return completionTokens;
        }

        public void setCompletionTokens(Integer completionTokens) {
            this.completionTokens = completionTokens;
        }

        public Integer getPromptTokens() {
            return promptTokens;
        }

        public void setPromptTokens(Integer promptTokens) {
            this.promptTokens = promptTokens;
        }

        public Integer getPromptCacheHitTokens() {
            return promptCacheHitTokens;
        }

        public void setPromptCacheHitTokens(Integer promptCacheHitTokens) {
            this.promptCacheHitTokens = promptCacheHitTokens;
        }

        public Integer getPromptCacheMissTokens() {
            return promptCacheMissTokens;
        }

        public void setPromptCacheMissTokens(Integer promptCacheMissTokens) {
            this.promptCacheMissTokens = promptCacheMissTokens;
        }

        public Integer getTotalTokens() {
            return totalTokens;
        }

        public void setTotalTokens(Integer totalTokens) {
            this.totalTokens = totalTokens;
        }

        public CompletionTokensDetailsDTO getCompletionTokensDetails() {
            return completionTokensDetails;
        }

        public void setCompletionTokensDetails(CompletionTokensDetailsDTO completionTokensDetails) {
            this.completionTokensDetails = completionTokensDetails;
        }

        @JsonIgnoreProperties(ignoreUnknown = true)
        public static class CompletionTokensDetailsDTO {
            @JsonProperty("reasoning_tokens")
            private Integer reasoningTokens;

            public Integer getReasoningTokens() {
                return reasoningTokens;
            }

            public void setReasoningTokens(Integer reasoningTokens) {
                this.reasoningTokens = reasoningTokens;
            }
        }
    }

    @JsonIgnoreProperties(ignoreUnknown = true)
    public static class ChoicesDTO {
        @JsonProperty("finish_reason")
        private String finishReason;
        @JsonProperty("index")
        private Integer index;
        @JsonProperty("message")
        private MessageDTO message;
        @JsonProperty("logprobs")
        private LogprobsDTO logprobs;

        @JsonProperty("delta")
        private MessageDTO delta;


        public String getFinishReason() {
            return finishReason;
        }

        public void setFinishReason(String finishReason) {
            this.finishReason = finishReason;
        }

        public Integer getIndex() {
            return index;
        }

        public void setIndex(Integer index) {
            this.index = index;
        }

        public MessageDTO getMessage() {
            return message;
        }

        public void setMessage(MessageDTO message) {
            this.message = message;
        }

        public LogprobsDTO getLogprobs() {
            return logprobs;
        }

        public void setLogprobs(LogprobsDTO logprobs) {
            this.logprobs = logprobs;
        }

        public MessageDTO getDelta() {
            return delta;
        }

        public void setDelta(MessageDTO delta) {
            this.delta = delta;
        }

        @JsonIgnoreProperties(ignoreUnknown = true)
        public static class MessageDTO {
            @JsonProperty("content")
            private String content;
            @JsonProperty("reasoning_content")
            private String reasoningContent;
            @JsonProperty("role")
            private String role;
            @JsonProperty("tool_calls")
            private List<ToolCallsDTO> toolCalls;

            public String getContent() {
                return content;
            }

            public void setContent(String content) {
                this.content = content;
            }

            public String getReasoningContent() {
                return reasoningContent;
            }

            public void setReasoningContent(String reasoningContent) {
                this.reasoningContent = reasoningContent;
            }

            public String getRole() {
                return role;
            }

            public void setRole(String role) {
                this.role = role;
            }

            public List<ToolCallsDTO> getToolCalls() {
                return toolCalls;
            }

            public void setToolCalls(List<ToolCallsDTO> toolCalls) {
                this.toolCalls = toolCalls;
            }

            @JsonIgnoreProperties(ignoreUnknown = true)
            public static class ToolCallsDTO {
                @JsonProperty("id")
                private String id;
                @JsonProperty("type")
                private String type;
                @JsonProperty("function")
                private FunctionDTO function;

                public String getId() {
                    return id;
                }

                public void setId(String id) {
                    this.id = id;
                }

                public String getType() {
                    return type;
                }

                public void setType(String type) {
                    this.type = type;
                }

                public FunctionDTO getFunction() {
                    return function;
                }

                public void setFunction(FunctionDTO function) {
                    this.function = function;
                }

                @JsonIgnoreProperties(ignoreUnknown = true)
                public static class FunctionDTO {
                    @JsonProperty("name")
                    private String name;
                    @JsonProperty("arguments")
                    private String arguments;

                    public String getName() {
                        return name;
                    }

                    public void setName(String name) {
                        this.name = name;
                    }

                    public String getArguments() {
                        return arguments;
                    }

                    public void setArguments(String arguments) {
                        this.arguments = arguments;
                    }
                }
            }
        }

        @JsonIgnoreProperties(ignoreUnknown = true)
        public static class LogprobsDTO {
            @JsonProperty("content")
            private List<ContentDTO> content;

            public List<ContentDTO> getContent() {
                return content;
            }

            public void setContent(List<ContentDTO> content) {
                this.content = content;
            }

            @JsonIgnoreProperties(ignoreUnknown = true)
            public static class ContentDTO {
                @JsonProperty("token")
                private String token;
                @JsonProperty("logprob")
                private Integer logprob;
                @JsonProperty("bytes")
                private List<Integer> bytes;
                @JsonProperty("top_logprobs")
                private List<TopLogprobsDTO> topLogprobs;

                public String getToken() {
                    return token;
                }

                public void setToken(String token) {
                    this.token = token;
                }

                public Integer getLogprob() {
                    return logprob;
                }

                public void setLogprob(Integer logprob) {
                    this.logprob = logprob;
                }

                public List<Integer> getBytes() {
                    return bytes;
                }

                public void setBytes(List<Integer> bytes) {
                    this.bytes = bytes;
                }

                public List<TopLogprobsDTO> getTopLogprobs() {
                    return topLogprobs;
                }

                public void setTopLogprobs(List<TopLogprobsDTO> topLogprobs) {
                    this.topLogprobs = topLogprobs;
                }

                @JsonIgnoreProperties(ignoreUnknown = true)
                public static class TopLogprobsDTO {
                    @JsonProperty("token")
                    private String token;
                    @JsonProperty("logprob")
                    private Integer logprob;
                    @JsonProperty("bytes")
                    private List<Integer> bytes;

                    public String getToken() {
                        return token;
                    }

                    public void setToken(String token) {
                        this.token = token;
                    }

                    public Integer getLogprob() {
                        return logprob;
                    }

                    public void setLogprob(Integer logprob) {
                        this.logprob = logprob;
                    }

                    public List<Integer> getBytes() {
                        return bytes;
                    }

                    public void setBytes(List<Integer> bytes) {
                        this.bytes = bytes;
                    }
                }
            }
        }
    }
}

   
  • 控制器处理
   @Autowired
    private ChatService chatService;

    @SneakyThrows
    @Operation(summary = "流式对话")
    @PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public StreamingResponseBody chatStream(@Valid @RequestBody ChatDto chatDto) {
        CompletableFuture<Flux<ServerSentEvent<String>>> completedFuture = CompletableFuture.supplyAsync(() -> chatService.chatStream(chatDto));
        Flux<ServerSentEvent<String>> serverSentEventFlux = completedFuture.get();
        return outputStream -> {
            serverSentEventFlux.timeout(Duration.ofMinutes(2)).onErrorReturn(ServerSentEvent.builder("error").build()).subscribe(
                    serverSentEvent -> {
                        try {
                            String data = serverSentEvent.data();
                            String finalData = " ";
                            if (StrUtil.startWith(data, "data:")) {
                                finalData = data + "\n\n";
                            } else {
                                finalData = "data: " + data + "\n\n";
                            }
                            outputStream.write(finalData.getBytes());
                            outputStream.flush();
                        } catch (IOException e) {
                            log.error("输出流异常:{}", e.getMessage());
                            throw new RuntimeException(e);
                        }
                    }
            );
        };
    }
  • 流式返回,通过outputStream刷数据,超过30s 导致超时报错
spring:
  mvc:
    async:
      request-timeout: 180000

填加参数,设置outputStream 的异步处理数据时间

# deepseek 配置
deepseek:
  api:
    key: sk-48**************f43a9c99326c37
    base-url: https://api.deepseek.com/chat/completions

至此实现了流式问答数据的返回。

  • nginx 配置: 关闭响应缓存 既可以实现支持流
location / {
    proxy_pass 10.0.56.10;
    proxy_buffering off; # 关闭响应缓存
}

小结

  1. 文章中利用了部分zhipu中的对象,由于都是标准sdk,所以都是通用的
  2. 文章中会对消息进行存储,不需要的小伙伴自行移除
Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐