笔记langchain-知识向量库构建
实现自kaggle。
参考自Github:llm-universe/notebook/C3 搭建知识库/C3.ipynb at main · datawhalechina/llm-universe
实现自kaggle。
1 向量及向量知识库
1.1 词向量与向量
在机器学习和自然语言处理(NLP)中,词向量(word embedding)是一种以单词为单位将每个单词转化为实数向量的技术。这些实数向量可以被计算机更好地理解和处理。词向量背后的主要想理念是相似或相关的对象在向量空间中的距离应该很近。在RAG(Retrieval Augmented Generation,检索增强生成)方面向量的优势主要有两点:
- 向量比文字更适合检索。当我们在数据库检索时,如果数据库存储的是文字,主要通过检索关键词(词法搜索)等方法找到相对匹配的数据,匹配的程度取决于数据库中的文档中是否含有查询句中的关键词;而向量中包含了原文本的语义信息,可以通过计算问题与数据库中数据的点积、余弦距离、欧几里得距离等指标,直接获取问题与数据在语义层面上的相似度;
- 向量比其它媒介的综合信息能力更强,当传统数据库存储文字、声音、图像、视频等多种媒介时,很难去将上述多种媒介构建起关联与跨模态的查询方法;但是向量却可以通过多种向量模型将多种数据映射成统一的向量形式。
在搭建 RAG 系统时,我们往往可以通过使用向量模型来构建向量,我们可以选择:
- 使用各个公司的 Embedding API;
- 在本地使用向量模型将数据构建为向量。
1.2 向量数据库
向量数据库是用于高效计算和管理大量向量数据的解决方案。向量数据库是一种专门用于存储和检索向量数据(embedding)的数据库系统。它与传统的基于关系模型的数据库不同,它主要关注的是向量数据的特性和相似性。
在向量数据库中,数据被表示为向量形式,每个向量代表一个数据项。这些向量可以是数字、文本、图像或其他类型的数据。向量数据库使用高效的索引和查询算法来加速向量数据的存储和检索过程。向量数据库中的数据以向量作为基本单位,对向量进行存储、处理及检索。向量数据库通过计算与目标向量的余弦距离、点积等获取与目标向量的相似度。当处理大量甚至海量的向量数据时,向量数据库索引和查询算法的效率明显高于传统数据库。
- Chroma:是一个轻量级向量数据库,拥有丰富的功能和简单的 API,具有简单、易用、轻量的优点,但功能相对简单且不支持GPU加速,适合初学者使用。
- Weaviate:是一个开源向量数据库。除了支持相似度搜索和最大边际相关性(MMR,Maximal Marginal Relevance)搜索外还可以支持结合多种搜索算法(基于词法搜索、向量搜索)的混合搜索,从而提高搜索结果的相关性和准确性。
- Qdrant:Qdrant使用 Rust 语言开发,有极高的检索效率和RPS(Requests Per Second),支持本地运行、部署在本地服务器及Qdrant云三种部署模式。且可以通过为页面内容和元数据制定不同的键来复用数据。
2 使用Embedding API
此处使用的是智谱API,
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory
import os
for dirname, _, filenames in os.walk('/kaggle/input'):
for filename in filenames:
print(os.path.join(dirname, filename))
# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session
!pip install zhipuai
from zhipuai import ZhipuAI
client = ZhipuAI(api_key="") # 请填写您自己的APIKey
response = client.chat.completions.create(
model="glm-4-plus", # 请填写您要调用的模型名称
messages=[
{"role": "user", "content": "作为一名营销专家,请为我的产品创作一个吸引人的口号"},
{"role": "assistant", "content": "当然,要创作一个吸引人的口号,请告诉我一些关于您产品的信息"},
{"role": "user", "content": "智谱AI开放平台"},
{"role": "assistant", "content": "点燃未来,智谱AI绘制无限,让创新触手可及!"},
{"role": "user", "content": "创作一个更精准且吸引人的口号"}
],
)
print(response)
import os
from zhipuai import ZhipuAI
def zhipu_embedding(text: str):
client = ZhipuAI(api_key='')
response = client.embeddings.create(
model="embedding-3",
input=text,
)
return response
text = '要生成 embedding 的输入文本,字符串形式。'
response = zhipu_embedding(text=text)
3 数据处理
3.1 源文档选取
此处选用一些风格迁移论文作为示例。上传后放置在/kaggle/input。都放一起即可。

3.2 数据读取
篇幅考虑,并未放置输出
!pip install langchain_community
!pip install pymupdf
from langchain_community.document_loaders import PyMuPDFLoader
# 创建一个 PyMuPDFLoader Class 实例,输入为待加载的 pdf 文档路径
loader = PyMuPDFLoader("/kaggle/input/clb-paper/1-s2.0-S0925231225008987-main.pdf")
# 调用 PyMuPDFLoader Class 的函数 load 对 pdf 文件进行加载
pdf_pages = loader.load()
print(f"载入后的变量类型为:{type(pdf_pages)},", f"该 PDF 一共包含 {len(pdf_pages)} 页")
pdf_page = pdf_pages[1]
print(f"每一个元素的类型:{type(pdf_page)}.",
f"该文档的描述性数据:{pdf_page.metadata}",
f"查看该文档的内容:\n{pdf_page.page_content}",
sep="\n------\n")
3.3 数据清洗
我们期望知识库的数据尽量是有序的、优质的、精简的,因此我们要删除低质量的、甚至影响理解的文本数据。使用正则表达式匹配并删除掉\n。
#数据清洗
import re
pattern = re.compile(r'[^\u4e00-\u9fff](\n)[^\u4e00-\u9fff]', re.DOTALL)
pdf_page.page_content = re.sub(pattern, lambda match: match.group(0).replace('\n', ''), pdf_page.page_content)
print(pdf_page.page_content)
pdf_page.page_content = pdf_page.page_content.replace('•', '')
pdf_page.page_content = pdf_page.page_content.replace(' ', '')
print(pdf_page.page_content)
3.4 文档分割
#文档分割
# 由于单个文档的长度往往会超过模型支持的上下文,导致检索得到的知识太长超出模型的处理能力,因此,在构建向量知识库的过程中,我们往往需要对文档进行分割,将单个文档按长度或者按固定的规则分割成若干个 chunk,然后将每个 chunk 转化为词向量,存储到向量数据库中。
# 在检索时,我们会以 chunk 作为检索的元单位,也就是每一次检索到 k 个 chunk 作为模型可以参考来回答用户问题的知识,这个 k 是我们可以自由设定的。
# Langchain 中文本分割器都根据 chunk_size (块大小)和 chunk_overlap (块与块之间的重叠大小)进行分割。
# chunk_size 指每个块包含的字符或 Token (如单词、句子等)的数量
# chunk_overlap 指两个块之间共享的字符数量,用于保持上下文的连贯性,避免分割丢失上下文信息
# Langchain 提供多种文档分割方式,区别在怎么确定块与块之间的边界、块由哪些字符/token组成、以及如何测量块大小
# RecursiveCharacterTextSplitter(): 按字符串分割文本,递归地尝试按不同的分隔符进行分割文本。
# CharacterTextSplitter(): 按字符来分割文本。
# MarkdownHeaderTextSplitter(): 基于指定的标题来分割markdown 文件。
# TokenTextSplitter(): 按token来分割文本。
# SentenceTransformersTokenTextSplitter(): 按token来分割文本
# Language(): 用于 CPP、Python、Ruby、Markdown 等。
# NLTKTextSplitter(): 使用 NLTK(自然语言工具包)按句子分割文本。
# SpacyTextSplitter(): 使用 Spacy按句子的切割文本。
#导入文本分割器
from langchain_text_splitters import RecursiveCharacterTextSplitter
# 知识库中单段文本长度
CHUNK_SIZE = 500
# 知识库中相邻文本重合长度
OVERLAP_SIZE = 50
# 使用递归字符文本分割器
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=OVERLAP_SIZE
)
text_splitter.split_text(pdf_page.page_content[0:1000])
split_docs = text_splitter.split_documents(pdf_pages)
print(f"切分后的文件数量:{len(split_docs)}")
print(f"切分后的字符数(可以用来大致评估 token 数):{sum([len(doc.page_content) for doc in split_docs])}")
4 搭建并使用向量数据库
4.1 前序配置
即前面的处理
4.2 构建Chroma向量库
#搭建并使用向量数据库
import os
from dotenv import load_dotenv, find_dotenv
# 读取本地/项目的环境变量。
# find_dotenv()寻找并定位.env文件的路径
# load_dotenv()读取该.env文件,并将其中的环境变量加载到当前的运行环境中
# 如果你设置的是全局的环境变量,这行代码则没有任何作用。
_ = load_dotenv(find_dotenv())
# 获取folder_path下所有文件路径,储存在file_paths里
file_paths = []
folder_path = '/kaggle/input'
for root, dirs, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
file_paths.append(file_path)
print(file_paths[:6])
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_community.document_loaders import UnstructuredMarkdownLoader
# 遍历文件路径并把实例化的loader存放在loaders里
loaders = []
for file_path in file_paths:
file_type = file_path.split('.')[-1]
if file_type == 'pdf':
loaders.append(PyMuPDFLoader(file_path))
elif file_type == 'md':
loaders.append(UnstructuredMarkdownLoader(file_path))
# 下载文件并存储到text
texts = []
for loader in loaders: texts.extend(loader.load())
# text = texts[1]
# print(f"每一个元素的类型:{type(text)}.",
# f"该文档的描述性数据:{text.metadata}",
# f"查看该文档的内容:\n{text.page_content[0:]}",
# sep="\n------\n")
from langchain_text_splitters import RecursiveCharacterTextSplitter
# 切分文档
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500, chunk_overlap=50)
split_docs = text_splitter.split_documents(texts)
from typing import List
from langchain_core.embeddings import Embeddings
class ZhipuAIEmbeddings(Embeddings):
"""`Zhipuai Embeddings` embedding models."""
def __init__(self):
"""
实例化ZhipuAI为values["client"]
Args:
values (Dict): 包含配置信息的字典,必须包含 client 的字段.
Returns:
values (Dict): 包含配置信息的字典。如果环境中有zhipuai库,则将返回实例化的ZhipuAI类;否则将报错 'ModuleNotFoundError: No module named 'zhipuai''.
"""
from zhipuai import ZhipuAI
self.client = ZhipuAI(api_key='')
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""
生成输入文本列表的 embedding.
Args:
texts (List[str]): 要生成 embedding 的文本列表.
Returns:
List[List[float]]: 输入列表中每个文档的 embedding 列表。每个 embedding 都表示为一个浮点值列表。
"""
result = []
for i in range(0, len(texts), 64):
embeddings = self.client.embeddings.create(
model="embedding-3",
input=texts[i:i+64]
)
result.extend([embeddings.embedding for embeddings in embeddings.data])
return result
def embed_query(self, text: str) -> List[float]:
"""
生成输入文本的 embedding.
Args:
texts (str): 要生成 embedding 的文本.
Return:
embeddings (List[float]): 输入文本的 embedding,一个浮点数值列表.
"""
return self.embed_documents([text])[0]
embedding = ZhipuAIEmbeddings()
此处chromadb langchain版本不完全相配,其中一些包不适配,在修改后,会出现warning,代码跑通后,并未进行后续处理。
!pip uninstall -y chromadb langchain opentelemetry-api opentelemetry-sdk
!pip install chromadb==0.4.13 langchain==0.0.334 opentelemetry-api==1.19.0 opentelemetry-sdk==1.19.0
验证
!python -c "from chromadb.api import API; print('API导入成功')"
检查当前Pydantic版本
!pip show pydantic | grep Version
!pip uninstall -y pydantic
!pip install pydantic==2.6.3
验证导入
!python -c "from pydantic import model_validator; print('Pydantic v2安装成功')"
!python -c "from langchain_community.vectorstores import Chroma; print('LangChain导入成功')"
4.3 向量检索
#构建Chroma向量库
#Langchain 集成了超过 30 个不同的向量存储库。我们选择 Chroma 是因为它轻量级且数据存储在内存中,这使得它非常容易启动和开始使用。
# 定义持久化路径
persist_directory = '/kaggle/working/vector_db/chroma'
from langchain_community.vectorstores import Chroma
vectordb = Chroma.from_documents(
documents=split_docs,
embedding=embedding,
persist_directory=persist_directory # 允许我们将persist_directory目录保存到磁盘上
)
print(f"向量库中存储的数量:{vectordb._collection.count()}")
question="什么是风格迁移"
sim_docs = vectordb.similarity_search(question,k=10)
print(f"检索到的内容数:{len(sim_docs)}")
for i, sim_doc in enumerate(sim_docs):
print(f"检索到的第{i}个内容: \n{sim_doc.page_content[:200]}", end="\n--------------\n")
mmr_docs = vectordb.max_marginal_relevance_search(question,k=10)
for i, sim_doc in enumerate(mmr_docs):
print(f"MMR 检索到的第{i}个内容: \n{sim_doc.page_content[:200]}", end="\n--------------\n")
更多推荐


所有评论(0)