OneRAG系列 [求Star ⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐]

  1. 从零开始写RAG - OneRAG (一) - 知乎

  2. 从零开始写RAG - OneRAG (二) - 知乎

https://github.com/Hlufies/OneRAG.githttps://github.com/Hlufies/OneRAG.git

从零开始写RAG - OneRAG (二)

继上一章完成NativeRAG框架的搭建之后,今天继续扩展。扩展的功能如下

  • Ollama + Deepseek本地化部署

  • 扩展多格式文件处理功能,完成自动化处理

  • 扩展SemanticChunker,主要采用Spacy和NLTK进行语义分割

Ollama + Deepseek

Deepseek国内比较火的语言模型,趁着热度试一下。这里我们采用Ollama进行本地化部署。

1. 首先在终端启动Ollma

Ollama启动

ollama serve

 

2. 然后在Generator文件下,添加DeepseekOllamaGenerator接口。我们这里采用的是70b deepseek-r1。配合LLMChain框架进行prompt模板化。

class DeepseekOllamaGenerator:
    def __init__(self):
        super().__init__()
    def generate(self, query, retrievalChunks: List[str]) -> str:
        llm = Ollama(model="deepseek-r1:70b")
        context = ""
        for i, chunk in enumerate(retrievalChunks):
            context += f"reference infomation {i+1}: \n{chunk}\n\n"

        templatel_prompt = "根据参考文档回答问题{query}\n\n{context}"
        
        # 创建RAG Prompt模板
        QA_PROMPT = PromptTemplate(
            input_variables=["query", "context"],
            template=templatel_prompt
        )

        analysis_chain = LLMChain(
            llm=llm,
            prompt=QA_PROMPT,
            verbose=True
        )

        try:
            # 执行分析流程
            response = analysis_chain.invoke({
                "context": context,
                "query": query,
            })
            return response
        except Exception as e:
            raise ValueError(f"DeepseekAPIGenerator_retrieval error: {e}")

3. prompt模板 Deepseek输出

扩展多格式文件处理功能,完成自动化处理

这里主要集成了langchain_community的document_loaders框架(感觉已经比较成熟了,目前没必要自己写了)。这里做一个方法简单映射就可以实现自动化了。

from langchain_community.document_loaders import (
    PyPDFLoader, 
    PDFPlumberLoader,
    TextLoader,
    UnstructuredWordDocumentLoader,
    UnstructuredPowerPointLoader,
    UnstructuredExcelLoader,
    CSVLoader,
    UnstructuredMarkdownLoader,
    UnstructuredXMLLoader,
    UnstructuredHTMLLoader,
)

class AutoProcessor(DataProcessor):
    def process(self, file_path):
        DOCUMENT_LOADER_MAPPING = {
            ".pdf": (PDFPlumberLoader, {}),
            ".txt": (TextLoader, {"encoding": "utf8"}),
            ".doc": (UnstructuredWordDocumentLoader, {}),
            ".docx": (UnstructuredWordDocumentLoader, {}),
            ".ppt": (UnstructuredPowerPointLoader, {}),
            ".pptx": (UnstructuredPowerPointLoader, {}),
            ".xlsx": (UnstructuredExcelLoader, {}),
            ".csv": (CSVLoader, {}),
            ".md": (UnstructuredMarkdownLoader, {}),
            ".xml": (UnstructuredXMLLoader, {}),
            ".html": (UnstructuredHTMLLoader, {}),
        }

        ext = Path(file_path).suffix.lower()
        loader_tuple = DOCUMENT_LOADER_MAPPING.get(ext) 

        if loader_tuple:
            processor_class, args = loader_tuple 
            processor = processor_class(file_path, **args) 
            documents = [doc.page_content for doc in loader.load()]
            text = ''
            for document in documents:
                text += clean_text(document)
            return text
        else:
            raise ValueError(f"no match processor error: {e}")
            exit(0)

扩展SemanticChunker,主要采用Spacy和NLTK

语义分块感觉是一个比较科学的方案,这里主要使用Spacy和NLTK。学到这里我有个疑惑就是chunk_size和chunk_overlap相当于一个超参数,那么有没有比较自动的方案决定这个超参数呢?可以不可以把这个任务完全交给大模型做决策?

class SemanticSpacyChunker(Chunker):
    """基于spaCy语义分析的智能文本分割器"""
    def __init__(
        self,
        model_name: str = "zh_core_web_sm",  # 支持中英文模型切换: en_core_web_sm
        chunk_size: int = 512,
        chunk_overlap: int = 64,
        use_sentence: bool = True  # 是否基于句子拆分
    ):
        self.nlp = spacy.load(model_name)
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.use_sentence = use_sentence

    def split_text(self, text: str) -> List[str]:
        """核心分割逻辑"""
        doc = self.nlp(text)
        
        if self.use_sentence:
            sentences = [sent.text for sent in doc.sents]
        else:
            sentences = [token.text for token in doc if not token.is_punct]
        # 动态合并句子/词块
        current_chunk = []
        current_length = 0
        chunks = []

        for sent in sentences:
            sent_length = len(sent)
            
            # 判断是否超过阈值
            if current_length + sent_length > self.chunk_size:
                if current_chunk:
                    # 中文用空字符串连接
                    chunks.append("".join(current_chunk))
                    
                    # 精确计算重叠字符数
                    overlap_buffer = []
                    overlap_length = 0
                    # 逆向遍历寻找重叠边界
                    for s in reversed(current_chunk):
                        if overlap_length + len(s) > self.chunk_overlap:
                            break
                        overlap_buffer.append(s)
                        overlap_length += len(s)
                    # 恢复原始顺序
                    current_chunk = list(reversed(overlap_buffer))
                    current_length = overlap_length
                    
            current_chunk.append(sent)
            current_length += sent_length

        # 处理剩余内容
        if current_chunk:
            chunks.append("".join(current_chunk))
        return chunks

    def chunk(self, docs: str) -> List[str]:
        """文档处理入口"""
        chunks = self.split_text(docs)
        return chunks
class SemanticNLTKChunker(Chunker):
    """基于NLTK的智能语义分块器,支持中英文混合文本"""
    def __init__(
        self,
        chunk_size: int = 500,
        chunk_overlap: int = 50,
        language: str = "chinese",
        use_jieba: bool = True
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.language = language
        self.use_jieba = use_jieba

        # 初始化中文分词器
        if self.language == "chinese" and self.use_jieba:
            jieba.initialize()
    def _chinese_sentence_split(self, text: str) -> List[str]:
        """基于结巴分词的智能分句"""
        if not self.use_jieba:
            return [text]
            
        delimiters = {'。', '!', '?', ';', '…'}
        sentences = []
        buffer = []
        
        for word in jieba.cut(text):
            buffer.append(word)
            if word in delimiters:
                sentences.append(''.join(buffer))
                buffer = []
        
        if buffer:  # 处理末尾无标点的句子
            sentences.append(''.join(buffer))
        return sentences

    def split_text(self, text: str) -> List[str]:
        """多语言分句逻辑"""
        sentences = []
        if self.language == "chinese":
            sentences =  self._chinese_sentence_split(text)
        else:
            nltk.download('punkt_tab')
            sentences =  sent_tokenize(text, language=self.language)

        """动态合并句子并保留字符重叠"""
        chunks = []
        current_chunk = []
        current_length = 0
        overlap_buffer = []

        for sent in sentences:
            sent_len = len(sent)
            
            # 触发分块条件
            if current_length + sent_len > self.chunk_size:
                if current_chunk:
                    chunks.append("".join(current_chunk))
                    
                    # 计算重叠部分
                    overlap_buffer = []
                    overlap_length = 0
                    for s in reversed(current_chunk):
                        if overlap_length + len(s) > self.chunk_overlap:
                            break
                        overlap_buffer.append(s)
                        overlap_length += len(s)
                        
                    current_chunk = list(reversed(overlap_buffer))
                    current_length = overlap_length
            
            current_chunk.append(sent)
            current_length += sent_len

        for chunk in chunks:
            print(len(chunk))

        # 处理剩余内容
        if current_chunk:
            chunks.append("".join(current_chunk))
        return chunks

    def chunk(self, docs: str) -> List[str]:
        chunks = self.split_text(docs)
        return chunks

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐