最近流行的ChatGPT,好奇他的流文字是怎么传输,所以去研究了,并复现了一下。

后端用的是langchain+fastapi,用到了starlette的一个插件包,sse_starlette返回

先定义langchain的CallbackHandler:

import queue
import sys
from typing import Any, Dict, List, Union

from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult

class StreamingCallbackHandler(BaseCallbackHandler):
    def __init__(self):
        self.tokens = queue.Queue()
        self.stream_end_flag = False
        super(BaseCallbackHandler, self).__init__()

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        self.tokens.put(token)
        sys.stdout.write(token)
        sys.stdout.flush()

    def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
        self.tokens.put(StopIteration)
import asyncio
from fastapi import FastAPI
from typing import Annotated
from langchain.llms import OpenAI
from sse_starlette.sse import EventSourceResponse
from stream_callback import StreamingCallbackHandler

app = FastAPI()


@app.post('/simpleChat', response_class=EventSourceResponse)
async def simple_chat(data: Annotated[dict, Body()]):
    app_input = data.get('appInput')
    callback_handler = StreamingCallbackHandler()
    chat_prompt = PromptTemplate(
        input_variables=['human_input'],
        template='''{human_input}'''
    )
    chain = LLMChain(
        llm=OpenAI(
            temperature=0.8,
            request_timeout=setting.REQUEST_TIMEOUT,
            max_retries=1,
            max_tokens=2048,
            streaming=True,
            ),
        prompt=chat_prompt
    )
    task = chain.aapplly([{'human_input': app_input}], callbacks=[callback_handler])
    loop = asyncio.get_event_loop()
    asyncio.run_coroutine_threadsafe(task, loop)

    def resp():
        while True:
            try:
                tk = callback_handler.tokens.get()
                if tk is StopIteration: raise tk
                yield tk
            except StopIteration:
                raise StopIteration
    
    return EventSourceResponse(resp())


前端用的是vue, 由于源生sse并不支持post的方式请求,因此使用fetch-event-source包进行post的请求。

npm install @microsoft/fetch-event-source  # 使用npm工具安装
<template>
    <div>
        <span>{{ content }}</span>
    </div>
    <div>
        <el-form :model="form">
            <el-form-item>
                <el-input v-model="form.appInput" />
                <el-button type="primary" @click="submitChat"/>
            </el-form-item>
        </el-form>
    </div>
</template>

<script setup lang='ts'>
import {fetchEventSource} from "@microsoft/fetch-event-source"
const form = reactive({
  appInput: ''
});
const content = ref<string>('')
const submitChat = () => {
    if (form.appInput !== ''){
          content.value = ''
          fetchEventSource('/api/v1/simpleChat', {
          method: 'POST',
          headers: {
            'Content-Type': 'application/json',
          },
          body:JSON.stringify({
            chatInput: form.appInput,
          }),
          onmessage(ev) {
            content.value+=ev.data
          }
        })
    }

}

</script>

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐