0 环境准备

  • ollama已部署推理模型deepseek-r1:8b
  • ollama已部署嵌入模型quentinz/bge-large-zh-v1.5:latest
  • 已安装miniconda环境

1 milvus环境准备

参考以下链接构建好milvus服务:milvus实战-基于Ollama+bge-large-zh搭建嵌入模型,fastAPI提供http服务将PDF文件写入milvus向量库-CSDN博客

        构建完成后请求上传pdf文件http接口,将pdf文件存储至milvus服务器后,此时milvus库中pdf-documents队列已存在数据,后续章节的查询基于以上博客的内容完成。

        知识库和问答系统拆开构建是大规模部署的解决方案,也能节省问答过程中构建向量库的时间,提高问答系统效率,也避免了重复构建向量库。

2 开发环境准备

2.1 创建python环境

         通过conda命令创建python环境,保持买个项目的python环境独立,防止项目之间包冲突,方便管理项目依赖。

conda create -n LangchainDemo python=3.10

2.2 pycharm创建项目

  • 解释器类型:选择自定义环境
  • 环境:选择现有
  • 类型:选择conda
  • 环境:选择上一步创建的环境

2.3 激活python环境 

conda activate LangchainDemo

2.4 安装项目依赖包
       

安装项目必要的依赖,包含langchain、ollama等。

pip install langchain
pip install langchain-community 
pip install langchain-core
pip install langchain-experimental
pip install langchain-ollama

3 程序实现

3.1 导入依赖包

import asyncio

from langchain_community.vectorstores import Milvus
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_ollama import OllamaEmbeddings, ChatOllama

3.2 构建嵌入模型

ollama_baseurl = "http://localhost:11434"

embeddings_model = OllamaEmbeddings(
    model="quentinz/bge-large-zh-v1.5:latest",
    base_url=ollama_baseurl)

3.3 配置Milvus链接

#  配置Milvus连接参数
collection_name = "pdf_documents"
milvus_collection = Milvus(
    embedding_function=embeddings_model,
    collection_name=collection_name,
    connection_args={"host": "localhost",
                     "port": "19530"},
    consistency_level="Strong",  # 根据业务需求选择一致性级别
    # 指定实际存储文本内容的字段名
    text_field="content"
)

3.4 初始化LLM模型

#  初始化LLM模型
model_name = "deepseek-r1:8b"
chat_llm = ChatOllama(
    temperature=0.0,
    model=model_name,
    base_url=ollama_baseurl
)

3.5 创建相似度检索器

# 创建相似度检索器
retriever = milvus_collection.as_retriever(
    search_type="similarity",
    search_kwargs={
        "k": 3}
)

3.6 构建提示词

# 构建提示词
prompt = ChatPromptTemplate.from_template(
    """
    请根据上下文之间输出最终答案,不要包含任何推理过程,如果不知道答案,请输出不知道
    上下文: {context}
    问题: {question}
    答案:
    """
)

3.7 构建langchain请求链

#  构建请求链
setup_retriever = RunnableParallel({"context": retriever, "question" : RunnablePassthrough()})

chain = setup_retriever | prompt | chat_llm | StrOutputParser()

3.8 构建结果流式输出

async def stream_data():
    async for chunk in chain.astream("爱奇艺选用了哪些数据库?"):
        print(chunk, flush=True, end="")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(stream_data())

4 测试

        在idea中运行此脚本,可以看到结果如下:

附录一:完整代码

import asyncio

from langchain_community.vectorstores import Milvus
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_ollama import OllamaEmbeddings, ChatOllama

ollama_baseurl = "http://localhost:11434"

embeddings_model = OllamaEmbeddings(
    model="quentinz/bge-large-zh-v1.5:latest",
    base_url=ollama_baseurl)


#  配置Milvus连接参数
collection_name = "pdf_documents"
milvus_collection = Milvus(
    embedding_function=embeddings_model,
    collection_name=collection_name,
    connection_args={"host": "localhost",
                     "port": "19530"},
    consistency_level="Strong",  # 根据业务需求选择一致性级别
    # 指定实际存储文本内容的字段名
    text_field="content"
)


#  初始化LLM模型
model_name = "deepseek-r1:8b"
chat_llm = ChatOllama(
    temperature=0.0,
    model=model_name,
    base_url=ollama_baseurl
)

# 创建相似度检索器
retriever = milvus_collection.as_retriever(
    search_type="similarity",
    search_kwargs={
        "k": 3}
)


# 构建提示词
prompt = ChatPromptTemplate.from_template(
    """
    请根据上下文之间输出最终答案,不要包含任何推理过程,如果不知道答案,请输出不知道
    上下文: {context}
    问题: {question}
    答案:
    """
)


#  构建请求链
setup_retriever = RunnableParallel({"context": retriever, "question" : RunnablePassthrough()})

chain = setup_retriever | prompt | chat_llm | StrOutputParser()
# print(chain.invoke("爱奇艺选用了哪些数据库?"))

async def stream_data():
    async for chunk in chain.astream("爱奇艺选用了哪些数据库?"):
        print(chunk, flush=True, end="")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(stream_data())

附录二:错误处理

milvus_collection = Milvus(
Traceback (most recent call last):
  File "D:\user\PycharmProject\LangchainDemoProject\LangchainMilvusDemo.py", line 67, in <module>
    loop.run_until_complete(stream_data())
  File "E:\dev\conda\envs\LangchainDemo\lib\asyncio\base_events.py", line 649, in run_until_complete
    return future.result()
  File "D:\user\PycharmProject\LangchainDemoProject\LangchainMilvusDemo.py", line 61, in stream_data
    async for chunk in chain.astream("爱奇艺选用了哪些数据库?"):
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\runnables\base.py", line 3439, in astream
    async for chunk in self.atransform(input_aiter(), config, **kwargs):
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\runnables\base.py", line 3422, in atransform
    async for chunk in self._atransform_stream_with_config(
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\runnables\base.py", line 2313, in _atransform_stream_with_config
    chunk = cast(Output, await py_anext(iterator))
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\runnables\base.py", line 3392, in _atransform
    async for output in final_pipeline:
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\output_parsers\transform.py", line 85, in atransform
    async for chunk in self._atransform_stream_with_config(
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\runnables\base.py", line 2266, in _atransform_stream_with_config
    final_input: Optional[Input] = await py_anext(input_for_tracing, None)
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\utils\aiter.py", line 74, in anext_impl
    return await __anext__(iterator)
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\utils\aiter.py", line 123, in tee_peer
    item = await iterator.__anext__()
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\runnables\base.py", line 1455, in atransform
    async for ichunk in input:
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\runnables\base.py", line 1455, in atransform
    async for ichunk in input:
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\runnables\base.py", line 3923, in atransform
    async for chunk in self._atransform_stream_with_config(
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\runnables\base.py", line 2313, in _atransform_stream_with_config
    chunk = cast(Output, await py_anext(iterator))
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\runnables\base.py", line 3910, in _atransform
    chunk = AddableDict({step_name: task.result()})
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\runnables\base.py", line 3893, in get_next_chunk
    return await py_anext(generator)
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\runnables\base.py", line 1473, in atransform
    async for output in self.astream(final, config, **kwargs):
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\runnables\base.py", line 1020, in astream
    yield await self.ainvoke(input, config, **kwargs)
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\retrievers.py", line 322, in ainvoke
    result = await self._aget_relevant_documents(
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\vectorstores\base.py", line 1098, in _aget_relevant_documents
    docs = await self.vectorstore.asimilarity_search(query, **_kwargs)
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\vectorstores\base.py", line 645, in asimilarity_search
    return await run_in_executor(None, self.similarity_search, query, k=k, **kwargs)
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\runnables\config.py", line 588, in run_in_executor
    return await asyncio.get_running_loop().run_in_executor(
  File "E:\dev\conda\envs\LangchainDemo\lib\concurrent\futures\thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_core\runnables\config.py", line 579, in wrapper
    return func(*args, **kwargs)
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_community\vectorstores\milvus.py", line 663, in similarity_search
    res = self.similarity_search_with_score(
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_community\vectorstores\milvus.py", line 736, in similarity_search_with_score
    res = self.similarity_search_with_score_by_vector(
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_community\vectorstores\milvus.py", line 795, in similarity_search_with_score_by_vector
    doc = self._parse_document(data)
  File "E:\dev\conda\envs\LangchainDemo\lib\site-packages\langchain_community\vectorstores\milvus.py", line 1026, in _parse_document
    page_content=data.pop(self._text_field),
KeyError: 'text'

在配置Milvus连接参数时一定要配置text_field,且与参考博客中定义的集合的内容字段一致,参考博客定义的内容字段是context,此处我们也要定义成context。如下图:

注意,collection_name也要与参考博客一致,否则问答结果不对。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐