【Agent开发】第六阶段:RAG 深度优化实战 —— 父子索引与上下文窗口优化 – pd的AI Agent开发笔记

文章目录


前置环境:当前环境是基于WSL2 + Ubuntu 24.04 + Docker Desktop构建的云原生开发平台,所有服务(MySQL、Redis、Qwen)均以独立容器形式运行并通过Docker Compose统一编排。如何配置请参考我的博客 WSL2 + Ubuntu 24.04 + Docker Desktop 配置双内核环境 并且补充了milvus相关的配置,如何配置请参考我的博客 【Agent开发】第三阶段:RAG 实战 —— 赋予 Agent “外脑”。 并且引入了ES检索,并且配置了ES服务,ES部分的配置请查看我的博客 【Agent开发】第五阶段:RAG 深度优化实战 —— 从“可用”到“卓越”

核心目标:打破“小 Chunk 检索准但上下文缺失,大 Chunk 上下文全但检索噪声大”的僵局。
演进路线:Advanced RAG → Context-Aware RAG。
关键概念:Small-to-Big Retrieval (小查大返)。

第三部分:父子索引与上下文窗口—— 解决“检索准”与“上下文全”的矛盾 (Parent-Child Indexing)

1. 痛点分析:为什么需要父子索引?

❌ 当前困境
在之前的架构中,我们通常选择一个固定的 Chunk Size(例如 500 字):

  • 如果 Chunk 太小 (200 字):
    • ✅ 优点:向量嵌入非常精准,能匹配到具体的细节。
    • ❌ 缺点:丢失上下文。LLM 拿到片段后不知道“他”是谁,“这个公司”指哪家,导致回答断章取义。
  • 如果 Chunk 太大 (2000 字):
    • ✅ 优点:上下文完整,LLM 能理解全局。
    • ❌ 缺点:向量被大量无关信息稀释,检索精度下降,容易匹配到错误文档。

✅ 解决方案:父子索引 (Parent-Child Indexing)
核心思想:“用小块检索,用大块生成”。

  1. 子块 (Child Chunks):小粒度分块(如 200 字),用于生成向量并建立索引。负责“被找到”。
  2. 父块 (Parent Chunks):大粒度分块(如 1000 字)或原始文档。负责“被阅读”。
  3. 映射关系:每个子块记录其所属的父块 ID。
  4. 检索流程:
    • 用户查询 → 匹配 子块。
    • 命中子块 → 通过 ID 找回对应的 父块。
    • 将 父块 发送给 LLM 生成答案。

架构设计:数据模型与流程

我们需要在现有的 Milvus 结构中增加 parent_id 字段,并调整入库和检索逻辑。

📂 新增/修改文件结构

src/rag/
    ├── chunkers.py              # 👈 更新:增加父子分块策略
    ├── ingestion.py             # 👈 更新:建立父子映射并入库
    ├── strategies/
    │   └── retrievers/
    │       └── vector_text.py   # 👈 更新:支持“查子返父”逻辑
    └── pipeline.py              # (无需大改,接口保持一致)

🗄️ Milvus Schema 变更
需要在 Collection 中增加一个标量字段 parent_id (VarChar),用于反向查找。

3. 实战编码

🛠️ 第一步:实现父子分块策略 (src/rag/chunkers.py)

我们将创建一个特殊的 Chunker,它先生成大块(父),再把大块切分成小块(子),并建立关联。

class BaseChunker(ABC):
    """分块器基类"""
    def split_documents(self, docs: List[Document]) -> List[Document]:
        raise NotImplementedError

    @abstractmethod
    def split_documents(self, docs: List[Document]) -> List[Document]:
        """
        输入:原始 Document 列表
        输出:分块后的 Document 列表 (每个 Document 的 metadata 可能包含增强信息)
        """
        pass
class RecursiveChunker(BaseChunker):
    # 已有逻辑

class FixedChunker(BaseChunker):
    # 已有逻辑

class ParentChildChunker(BaseChunker):
    """
    父子分块策略
    生成两组数据:
    1. Child Chunks: 小尺寸,用于向量化检索
    2. Parent Chunks: 大尺寸 (或原文),用于提供给 LLM
    3. 返回所有小块 (Child),但在 metadata 中注入 parent_id 和 parent_text
    """
    def __init__(self, parent_size: int = 500, child_size: int = 50, overlap: int = 50,child_overlap: int = 10, separators: List[str] = None):
        if separators is None:
            separators = ["\n\n", "\n", "。", "!", "?", " ", ""]
            
        logger.info(f"✂️ 初始化父子分块器:Parent={parent_size}, Child={child_size}, Overlap={overlap}")
        
        self.parent_splitter = RecursiveCharacterTextSplitter(
            chunk_size=parent_size,
            chunk_overlap=overlap,
            length_function=len,
            separators=separators
        )
        
        self.child_splitter = RecursiveCharacterTextSplitter(
            chunk_size=child_size,
            chunk_overlap=overlap,
            length_function=len,
            separators=child_overlap
        )

    def split_documents(self, docs: List[Document]) -> List[Document]:
        """
        兼容标准接口:输入 Document 列表,返回 Document 列表 (子块)
        """
        all_child_docs = []
        
        for doc in docs:
            # 1. 切分父块
            parent_docs = self.parent_splitter.split_documents([doc])
            
            for p_doc in parent_docs:
                # 为每个父块生成唯一 ID
                parent_id = str(uuid.uuid4())
                parent_text = p_doc.page_content
                
                # 2. 将父块切分为子块
                child_docs = self.child_splitter.split_documents([p_doc])
                
                for c_doc in child_docs:
                    # 3. 注入父子关系元数据
                    # 复制原有 metadata,避免修改原始对象
                    new_metadata = {
                        **c_doc.metadata,
                        "parent_id": parent_id,
                        "parent_text": parent_text, # 关键:存入大块文本
                        "is_child": True,
                        "chunk_type": "child"
                    }
                    
                    # 创建新的 Document 对象
                    child_doc = Document(
                        page_content=c_doc.page_content, # 小子块内容 (用于向量化)
                        metadata=new_metadata
                    )
                    all_child_docs.append(child_doc)
        
        logger.info(f"✅ 父子分块完成:输入 {len(docs)} 文档 -> 生成 {len(all_child_docs)} 个子块 (关联 {len(parent_docs)} 个父块)")
        return all_child_docs

🔄 第二步:更新入库逻辑 (src/rag/ingestion.py)

修改 DataIngestion 类,使其支持动态加载不同的 Chunker,并保持 process_file 逻辑不变(因为接口统一了)。

    def process_file(self, file_path: str, category: str = "general"):
        """处理单个文件:加载 -> 分块 -> 增强 -> 入库"""
        # 1. 加载文档
        docs = self.load_document(file_path) 
        if not docs:
            return

        all_chunks = []
        
        # 2. 分块 (统一调用 split_documents,无需关心内部是简单还是父子)
        # 输入:List[Document], 输出:List[Document]
        for doc in docs:
            splits = self.text_splitter.split_documents([doc])
            all_chunks.extend(splits)
        
        logger.info(f"✂️ 分块完成,共生成 {len(all_chunks)} 个 chunks")

        # 3. 入库
        success_count = 0
        for i, chunk in enumerate(all_chunks):
            text = chunk.page_content
            
            # 跳过过短的块
            if len(text.strip()) < 5:
                continue

            # 元数据增强(可选:为了速度,生产环境可异步或批量处理)
            enhanced_meta = self.enhance_metadata(text, chunk.metadata.get("source", ""))
            summary_str = enhanced_meta.get("summary", "")
            questions_str = enhanced_meta.get("questions", "")

            # 合并元数据:保留分块器产生的 metadata (如 parent_id, parent_text)
            final_metadata = {
                **chunk.metadata,  # 👈 关键:保留 parent_id 和 parent_text
                "source": os.path.basename(file_path),
                "page": chunk.metadata.get("page", 0),
                "category": category,
                "summary": summary_str,
                "questions": questions_str
            }
            
            # 生成唯一 ID
            doc_id = f"{os.path.basename(file_path)}_{i}_{uuid.uuid4().hex[:6]}"
            
            # A. 存入 Milvus (向量化的是 chunk.page_content 即小子块)
            try:
                self.milvus.insert_data(
                    id=doc_id,
                    text=text,
                    metadata=final_metadata # metadata 里现在包含了 parent_text
                )
                success_count += 1
            except Exception as e:
                logger.error(f"❌ Milvus 插入失败:{e}")

            # B. 存入 Elasticsearch (关键词库) - 双管齐下
            if self.es.is_available():
                if questions_str:
                    self.es.indexing_question(
                        doc_id=doc_id,
                        questions=questions_str,
                        summary=summary_str,
                        text=text,
                        metadata=final_metadata
                    )
                
                if summary_str:
                    self.es.indexing_summary(
                        doc_id=doc_id,
                        summary=summary_str,
                        text=text,
                        metadata=final_metadata
                    )

        logger.info(f"✅ 文件 {file_path} 处理完毕,成功入库 {success_count}/{len(all_chunks)} 条记录")

🔍 第三步:更新检索策略 (src/rag/strategies/retrievers)

  • vector_text.py
  • vector_rewritten.py

确保检索时能利用 parent_text 实现“查子返父”。这两个文件的改法类似

# src/rag/strategies/retrievers/vector_rewritten.py
from src.rag.strategies.base import BaseRetrievalStrategy, SearchResult
from src.core.milvus_client import get_milvus_client
from src.rag.rewriter import rewriter_instance # 复用之前的重写器
from typing import List, Optional
import logging

logger = logging.getLogger(__name__)

class VectorRewrittenRetriever(BaseRetrievalStrategy):
    """
    插件 2: 变体向量检索
    策略:先让 LLM 重写查询 (Query Rewriting),再用重写后的句子进行向量搜索。
    """
    def __init__(self):
        self.milvus = get_milvus_client()
        self.rewriter = rewriter_instance

        logger.info(f"🔌 [Plugin] 加载变体向量检索插件 (Rewritten)")

    def search(self, query: str, top_k: int, filter_expr: Optional[str] = None, **kwargs) -> List[SearchResult]:
        # 1. 生成变体查询
        try:
            rewritten_query = self.rewriter.rewrite(query)
            if rewritten_query == query:
                logger.debug("⚠️ [Vector-Rewritten] 重写后无变化,跳过此路以避免重复")
                return []
            logger.info(f"🔄 [Vector-Rewritten] 变体查询:{rewritten_query}")
        except Exception as e:
            logger.error(f"❌ [Vector-Rewritten] 重写失败:{e}")
            return []
        
        hits = self.milvus.search(
            query=query,
            top_k=top_k,
            filter_expr=filter_expr,
            output_fields=["text", "metadata"]
        )

        results = []
        for hit in hits:
            meta = hit['metadata'] or {}
            original_text = hit['text']
            
            final_text = original_text
            source_tag = os.path.splitext(os.path.basename(__file__))[0]
            # 👇 核心逻辑:查子返父
            if meta.get("parent_text"):
                final_text = meta["parent_text"]
                source_tag = source_tag + "_parent"
                # 可选:在 metadata 中记录原始子块文本,方便调试
                meta['_matched_child_text'] = original_text

            results.append(SearchResult(
                    text=final_text, # 返回大块文本给 LLM
                    score=hit['score'], # 分数基于小子块匹配 (精准)
                    metadata=meta,
                    source_field=source_tag
                ))

        return results
  • es_summaries.py
  • es_questions.py

这两个文件的改法也很类似

class ESQuestionsRetriever(BaseRetrievalStrategy):
    """
    插件 3: ES 关键词检索 (Questions 字段)
    """
    def __init__(self):
        self.es = es_client_instance
        self.index_name = settings.db.es_index_questions
        if self.es.is_available():
            logger.info(f"🔌 [Plugin] ES 检索插件{self.index_name}已就绪")
        else:
            logger.warning("🔌 [Plugin] ES 不可用,此插件将自动跳过")

    def search(self, query: str, top_k: int, filter_expr: Optional[str] = None, **kwargs) -> List[SearchResult]:
        if not self.es.is_available():
            return []
            
        # 调用封装好的搜索方法
        # 注意:ES 原生不支持复杂的 JSON 过滤表达式 (如 Milvus 语法),这里暂不实现 filter_expr
        # 如果需要,可以在 ES 查询中添加 term 过滤
        hits = self.es.search_questions(query, top_k=top_k)
        
        results = []
        for hit in hits:
            meta = hit['metadata'] or {}
            original_text = hit['text']

            final_text = original_text
            source_tag = os.path.splitext(os.path.basename(__file__))[0]
            # 👇 核心逻辑:查子返父
            if meta.get("parent_text"):
                final_text = meta["parent_text"]
                source_tag = source_tag + "_parent"
                # 可选:在 metadata 中记录原始子块文本,方便调试
                meta['_matched_child_text'] = original_text

            results.append(SearchResult(
                text=final_text,
                score=hit['score'],
                metadata=meta,
                source_field=source_tag
            ))
        return results

4. 数据入库与测试

另起一个milvus的索引,另起两个es索引(为了不影响之前的数据切片方式), 修改环境变量。

# Database
MILVUS_COLLECTION=knowledge_parent_child
es_index_questions=knowledge_questions_parent_child
es_index_summaries=knowledge_summaries_parent_child

# RAG Offline (Chunking)
CHUNK_STRATEGY=parent_child

先执行,数据入库操作, 由于父子索引方式切出的切片数量很多,每个都要跑一遍元数据增强,可能会很慢

python -m src.test.test_ingestion

测试代码

通过控制变量法(保持代码不变,仅切换 .env 中的 CHUNK_STRATEGY),分别运行两次测试,最后对比两份报告,可以清晰地量化 Parent-Child 策略相对于 Recursive 策略在单路召回(只用原始vector_text主路)下的提升效果。

# src/test/recall_chunk_strategy.py
# 用法: python -m src.test.recall_chunk_strategy
import asyncio
import logging
import statistics
from typing import List, Dict, Any
from src.rag.pipeline import pipeline_instance
from src.core.config import settings
from src.rag.strategies.base import SearchResult

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("ChunkStrategyEval")

# ==========================================
# 1. 测试数据集 (与主测试集保持一致,确保可比性)
# ==========================================

TEST_CASES = [
    {
        "category": "精确关键词匹配 (Easy)",
        "questions": [
            "根据《奇葩星球有限公司员工差旅与报销标准手册》,乘坐高铁的票价超过多少元将被视为“奢侈出行”?",
            "在《小模型训练奇谭》中,推荐的“摸鱼Transformer”模型有多少参数?",
            "奇葩星球公司禁止报销哪种咖啡?",
        ],
        "keywords": [
            ["高铁", "奢侈出行", "10元"],
            ["摸鱼Transformer", "参数", "42000"],
            ["禁止报销", "咖啡", "星巴克"]
        ]
    },
    {
        "category": "语义理解与推理 (Medium)",
        "questions": [
            "如果一名员工想通过步行出差获得最高荣誉,他每走一公里能拿到多少补贴?",
            "在预算极其有限的情况下,《小模型训练奇谭》认为真正的智能取决于什么?",
            "《小模型训练奇谭》推荐哪些非传统的数据来源用于训练小模型?",
        ],
        "keywords": [
            ["步行", "补贴", "50元"],
            ["智能", "脑洞密度", "参数量"],
            ["数据来源", "微信群", "分手短信", "流浪猫"]
        ]
    },
    {
        "category": "多跳推理 (Hard)",
        "questions": [
            "一名员工因工作压力在出差途中哭泣,根据公司规定,他如何能将情绪转化为实际的经济补偿?",
            "《小模型训练奇谭》中提到的“穷人版分布式训练”是如何利用社交工具实现的?",
        ],
        "keywords": [
            ["哭泣", "眼泪", "情感天平", "补贴"],
            ["穷人版分布式训练", "微信群", "梯度", "红包"]
        ]
    },
    {
        "category": "区分相似概念 (Confusion)",
        "questions": [
            "“泡面”在《奇葩星球有限公司员工差旅与报销标准手册》和《小模型训练奇谭》中分别扮演什么角色?",
            "两份文件都提到了“老板”,它们各自的态度或处理方式有何核心区别?",
        ],
        "keywords": [
            ["泡面", "觉醒补贴", "泡面RNN", "电热水壶"],
            ["老板", "黑暗料理", "画饼", "老板探测器"]
        ]
    }
]

# ==========================================
# 2. 评估逻辑
# ==========================================

def check_hit(results: List[SearchResult], keywords: List[str]) -> Dict[str, Any]:
    """检查检索结果是否命中"""
    for rank, res in enumerate(results):
        text = res.text.lower()
        is_hit = False
        for kw in keywords:
            if len(kw) > 2 and kw.lower() in text:
                is_hit = True
                break
        
        if is_hit:
            return {
                "hit": True,
                "rank": rank + 1,
                "source_plugin": res.source_field,
                "snippet": text[:100] + "...",
                "score": res.score,
                "text_length": len(res.text), # 👈 新增:记录返回文本长度
                "is_parent_mode": res.source_field.endswith("_parent") or res.metadata.get('_retrieval_mode') == 'child_to_parent' # 👈 新增:检测是否触发父子模式
            }
    
    return {
        "hit": False,
        "rank": -1,
        "source_plugin": "None",
        "snippet": "No relevant chunk found.",
        "score": 0,
        "text_length": 0,
        "is_parent_mode": False
    }

async def run_single_test(question: str, keywords: List[str], top_k: int = 5) -> Dict[str, Any]:
    """运行单个问题的测试"""
    logger.info(f"\n❓ 测试问题:{question}")
    
    try:
        results = await pipeline_instance.run(query=question, top_k=top_k)
        
        if not results:
            logger.warning("⚠️ 无检索结果")
            return {"question": question, "hit": False, "rank": -1, "total_results": 0, "text_lengths": [], "parent_mode_count": 0}
        
        eval_result = check_hit(results, keywords)
        
        # 统计本次查询的文本长度分布和父子模式触发情况
        text_lengths = [len(r.text) for r in results]
        parent_mode_count = sum(1 for r in results if r.source_field.endswith("_parent") or r.metadata.get('_retrieval_mode') == 'child_to_parent')
        
        logger.info(f"✅ 评估结果:Hit={eval_result['hit']}, Rank={eval_result['rank']}, Source={eval_result['source_plugin']}")
        logger.debug(f"   片段:{eval_result['snippet']} | 平均长度:{statistics.mean(text_lengths):.0f}")
        
        return {
            "question": question,
            "hit": eval_result["hit"],
            "rank": eval_result["rank"],
            "source_plugin": eval_result["source_plugin"],
            "total_results": len(results),
            "top_score": results[0].score if results else 0,
            "text_lengths": text_lengths,       # 👈 新增
            "parent_mode_count": parent_mode_count # 👈 新增
        }
        
    except Exception as e:
        logger.error(f"❌ 执行出错:{e}")
        import traceback
        traceback.print_exc()
        return {"question": question, "hit": False, "rank": -1, "error": str(e), "text_lengths": [], "parent_mode_count": 0}

async def main():
    print("="*80)
    print("🧪 RAG 分块策略对比测试 (单路召回)")
    print("="*80)
    
    # 打印当前关键配置
    strategy = settings.rag_offline.chunk_strategy
    use_parent = getattr(settings.rag_online, 'use_parent_context', False)
    hybrid = settings.search.enable_hybrid_search
    
    print(f"⚙️ 当前配置:")
    print(f"   - 分块策略 (CHUNK_STRATEGY): {strategy}")
    print(f"   - 启用父子上下文 (USE_PARENT_CONTEXT): {use_parent}")
    print(f"   - 混合检索 (ENABLE_HYBRID_SEARCH): {hybrid} (建议为 False 以测试单路)")
    print("="*80)
    
    if hybrid:
        logger.warning("⚠️ 检测到混合检索已开启。为了纯粹对比分块策略,建议在 .env 中设置 SEARCH_ENABLE_HYBRID_SEARCH=False")
        exit(1)

    all_results = []
    category_stats = {}
    
    # 用于统计全局的文本长度和父子模式使用情况
    all_text_lengths = []
    total_parent_mode_hits = 0

    for case in TEST_CASES:
        cat_name = case["category"]
        logger.info(f"\n--- 类别:{cat_name} ---")
        category_stats[cat_name] = {"total": 0, "hit": 0, "ranks": []}
        
        for q, kws in zip(case["questions"], case["keywords"]):
            res = await run_single_test(q, kws, top_k=5)
            res["category"] = cat_name
            all_results.append(res)
            
            category_stats[cat_name]["total"] += 1
            if res.get("hit"):
                category_stats[cat_name]["hit"] += 1
                category_stats[cat_name]["ranks"].append(res["rank"])
            
            # 累积统计
            if res.get("text_lengths"):
                all_text_lengths.extend(res["text_lengths"])
            if res.get("parent_mode_count"):
                total_parent_mode_hits += res["parent_mode_count"]

    # ==========================================
    # 3. 生成报告
    # ==========================================
    print("\n" + "="*80)
    print("📊 评估报告总结")
    print("="*80)
    
    total_q = len(all_results)
    total_hit = sum(1 for r in all_results if r.get("hit"))
    overall_hit_rate = total_hit / total_q if total_q > 0 else 0
    
    # 计算 MRR
    reciprocal_ranks = [1.0/r["rank"] for r in all_results if r.get("hit") and r["rank"] > 0]
    mrr = sum(reciprocal_ranks) / len(reciprocal_ranks) if reciprocal_ranks else 0.0
    
    # 计算文本长度统计
    avg_text_len = statistics.mean(all_text_lengths) if all_text_lengths else 0
    median_text_len = statistics.median(all_text_lengths) if all_text_lengths else 0
    
    print(f"\n🏆 总体表现:")
    print(f"   - 总问题数:{total_q}")
    print(f"   - 命中数 (Hit@5): {total_hit}")
    print(f"   - 命中率 (Hit Rate): {overall_hit_rate:.2%}")
    print(f"   - 平均倒数排名 (MRR): {mrr:.4f}")
    
    print(f"\n📏 上下文窗口分析:")
    print(f"   - 返回片段平均长度:{avg_text_len:.0f} 字")
    print(f"   - 返回片段中位长度:{median_text_len:.0f} 字")
    if strategy == "parent_child" and use_parent:
        print(f"   - 触发'查子返父'次数:{total_parent_mode_hits} 次 (共 {total_q * 5} 个结果)")
        print(f"   - 父子模式覆盖率:{(total_parent_mode_hits / (total_q * 5)) * 100:.1f}%")
    else:
        print(f"   - 父子模式:未启用或非 Parent-Child 策略")

    print(f"\n📈 分类别表现:")
    print(f"{'类别':<30} | {'命中率':<10} | {'MRR':<10} | {'平均排名':<10}")
    print("-" * 70)
    
    for cat, stats in category_stats.items():
        hit_rate = stats["hit"] / stats["total"] if stats["total"] > 0 else 0
        ranks = stats["ranks"]
        cat_mrr = sum([1.0/r for r in ranks]) / len(ranks) if ranks else 0
        avg_rank = sum(ranks) / len(ranks) if ranks else 0
        
        print(f"{cat:<30} | {hit_rate:>6.2%}     | {cat_mrr:>6.4f}     | {avg_rank:>6.2f}")
    
    print("\n💡 策略对比指南:")
    print("   请保存此报告,然后修改 .env 文件:")
    print("   1. 将 CHUNK_STRATEGY 从 'recursive' 改为 'parent_child' (或反之)")
    print("   2. 再次运行本测试脚本")
    print("   3. 对比两份报告的 [命中率] 和 [上下文窗口分析]")
    print("   👉 预期:Parent-Child 策略在保持命中率的同时,应显著增加 [平均长度],从而提升 LLM 回答质量。")
    
    print("="*80)

if __name__ == "__main__":
    asyncio.run(main())

执行测试

python -m src.test.recall_chunk_strategy

parent_child模式下的配置

# [其他内容不变]
# Database
MILVUS_COLLECTION=knowledge_parent_child
es_index_questions=knowledge_questions_parent_child
es_index_summaries=knowledge_summaries_parent_child

# RAG Online (Retrieval & Rerank)
ROUGH_TOP_K=2
FINAL_TOP_K=1

# RAG Offline (Chunking)
CHUNK_STRATEGY=parent_child

# Search Strategies (Plugins)
SEARCH_ENABLE_HYBRID_SEARCH=False
SEARCH_PLUGIN_REWRITTEN_QUERY=False
SEARCH_PLUGIN_ES_QUESTIONS=False
SEARCH_PLUGIN_ES_SUMMARIES=False

在这里插入图片描述

recursive模式下的配置

# [其他内容不变]
# Database
MILVUS_COLLECTION=knowledge_base
es_index_questions=knowledge_questions
es_index_summaries=es_index_summaries # 这个之前写错了,就一直保留到现在

# RAG Offline (Chunking)
CHUNK_STRATEGY=recursive

# RAG Online (Retrieval & Rerank)
ROUGH_TOP_K=2
FINAL_TOP_K=1

# Search Strategies (Plugins)
SEARCH_ENABLE_HYBRID_SEARCH=False
SEARCH_PLUGIN_REWRITTEN_QUERY=False
SEARCH_PLUGIN_ES_QUESTIONS=False
SEARCH_PLUGIN_ES_SUMMARIES=False

在这里插入图片描述

对比两份报告时,重点关注以下指标的变化:

指标 预期变化 (Parent-Child vs Recursive) 含义解读
Hit Rate (命中率) 持平 或 微升 子块更小,向量更纯净,理论上召回更准;但如果父块切分不当也可能略降。理想情况是持平。
MRR (排名) 提升 更精准的小块匹配应该能让正确答案排在更前面。
平均长度 (Avg Len) 显著增加 ⬆️ 这是核心指标。Recursive 可能返回 200 字,而 Parent-Child 应返回 1000 字。这意味着 LLM 拥有了更多上下文。
父子模式覆盖率 > 80% 如果开启了 USE_PARENT_CONTEXT,绝大多数命中的结果都应该触发“查子返父”。

💡 为什么这样做更科学?

  1. 隔离变量:通过环境变量控制,代码完全一致,排除了代码逻辑干扰。
  2. 单路纯净:关闭混合检索,单独评估向量检索在不同分块粒度下的表现,避免 ES 干扰结论。
  3. 上下文感知:不仅看“能不能找到”(Hit Rate),还看“找到的内容够不够用”(Text Length),这才是 RAG 最终效果的瓶颈所在。

第四部分:父子索引优化–Sentence as Child

在纯粹的“向量检索”阶段,按句子划分(Sentence Chunking)往往确实没有明显优势,甚至经常表现更差。

但这并不代表它“没用”,而是它的适用场景发挥作用的位置与其他策略不同。让我们深入剖析一下为什么会出现这种情况,以及句子级分块的真正价值在哪里。


1. 为什么句子级分块在“向量检索”中显得弱势?

❌ 核心痛点:语义碎片化 (Semantic Fragmentation)
向量模型(Embedding Model)需要一定的上下文才能理解一个句子的完整含义。

  • 例子
    • 原文:“虽然这家公司的财务报表看起来很好,但是其现金流存在严重问题。”
    • 按句子切分
      • Chunk A: “虽然这家公司的财务报表看起来很好。” → \rightarrow 向量含义:正面、财务好。
      • Chunk B: “但是其现金流存在严重问题。” → \rightarrow 向量含义:负面、现金流差。
    • 用户查询:“这家公司的财务状况如何?”
    • 结果
      • Chunk A 的向量可能匹配到“财务好”的查询,导致误召回(幻觉源头)。
      • Chunk B 丢失了“虽然…但是…”的转折逻辑,丢失了前提条件。
  • 对比 Recursive/Parent-Child:它们会保留前后句的关联,向量能捕捉到“表面好但实际差”的复杂语义。

❌ 噪声敏感
短句子往往包含大量代词(“他”、“它”、“这”),脱离上下文后,这些句子的向量表示非常模糊,容易匹配到不相关的文档。


2. 那句子级分块到底有什么用?(它的杀手锏)

句子级分块的优势不在“向量相似度计算”,而在以下三个特定场景:

✅ 场景一:高精度的关键词匹配 (Keyword Search / BM25)

如果你的检索主要依赖 Elasticsearch (BM25) 而不是向量:

  • 长段落会稀释关键词密度(Term Frequency)。
  • 短句子里关键词密度极高,BM25 打分会非常高。
  • 结论:在纯关键词检索或混合检索的 ES 路中,句子级分块往往比长段落更精准。
✅ 场景二:极短文本或结构化数据
  • 法律条款、诗歌、代码行、日志条目:这些内容本身就是以“句/行”为最小语义单位的。强行合并反而会破坏结构。
  • 例子:搜索“第 3 条第 2 款关于违约的规定”。如果按 500 字切分,可能把第 3 条和第 4 条混在一起;按句子切分则能精准定位。
✅ 场景三:作为 Parent-Child 中的 “Child” (最佳实践!)

这才是句子级分块的终极形态

  • 不要直接用句子级分块去检索并返回给 LLM(太碎了)。
  • 而是:用句子级分块作为 Child Chunks 生成向量,然后映射到 Parent Chunks(段落级)。
  • 效果
    • 检索时:利用句子的纯净向量进行高精度匹配(解决了长段落语义稀释问题)。
    • 返回时:通过 Parent ID 找回完整的段落(解决了句子语义缺失问题)。
  • 这就是所谓的 “Sentence Window Retrieval”(LlamaIndex 中的经典策略)。

3. 三种策略的横向对比总结

特性 Recursive (递归) Parent-Child (父子) Sentence (句子级)
粒度 中等 (200-500 字) 子块小 (100-200 字) + 父块大 极小 (10-50 字)
向量语义完整性 ⭐⭐⭐⭐ (较好) ⭐⭐⭐⭐ (子块较纯净) ⭐⭐ (较差,易丢失上下文)
关键词匹配精度 ⭐⭐⭐ ⭐⭐⭐⭐ (子块密度高) ⭐⭐⭐⭐⭐ (极高)
LLM 上下文质量 ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ (返回的是大块) ⭐ (太碎,LLM 无法推理)
最佳适用场景 通用文档、博客、文章 企业知识库、长文档、复杂推理 法律条款、日志、作为父子索引的子块
单独使用推荐度 ✅ 推荐 (稳健) ✅✅ 强烈推荐 (SOTA) ❌ 不推荐 (除非特殊场景)

Sentence as Child的模式

Sentence-as-Child 模式。这是目前 RAG 领域公认的“版本答案”之一(LlamaIndex 称之为 Sentence Window Retrieval)。

🎯 核心逻辑

  1. 父块 (Parent):保持较大的段落(如 800-1000 字),提供完整上下文给 LLM。
  2. 子块 (Child):不再使用简单的字符切分,而是使用语义完整的句子作为最小单元。
  • 优势:一个句子通常表达一个完整的命题,生成的向量(Embedding)语义最纯净,检索匹配度最高。
  1. 映射:多个句子子块 → 映射到同一个父块 ID。

1. 🛠️ 升级分块器 (src/rag/chunkers.py)

在中文场景下,jieba 是处理分句和分词最成熟、轻量且准确的库。使用它不仅能准确识别中文标点(如 。!?),还能避免正则表达式在处理复杂文本(如包含英文缩写、数字混合)时的边界错误。(英文句切分可以使用 nltk)

pip install jieba
import jieba

class BaseChildSplitter(ABC):
    """
    子块分块器基类 (专门用于 Parent-Child 模式)
    负责将单个父块文本切分为多个子块 Document
    """
    @abstractmethod
    def split_text(self, text: str, metadata: Dict[str, Any]) -> List[Document]:
        """
        Args:
            text: 父块的文本内容
            metadata: 父块的元数据 (将传递给子块)
        Returns:
            子块 Document 列表
        """
        pass

class ParentChildChunker(BaseChunker):
    """
    父子分块策略 (Parent-Child Indexing)
    
    逻辑:
    1. 将原文切分为大块 (Parent)。
    2. 利用注入的 child_splitter 将每个大块切分为小块 (Child)。
    3. 返回所有小块 (Child),但在 metadata 中注入 parent_id 和 parent_text。
    
    依赖注入:
    - child_splitter: 由外部工厂传入的具体 BaseChildSplitter 实例
    """
    
    def __init__(
        self, 
        parent_size: int = 1000, 
        parent_overlap: int = 100,
        child_splitter: Optional[BaseChildSplitter] = None,
        parent_separators: List[str] = None
    ):
            
        self.parent_size = parent_size
        self.parent_overlap = parent_overlap
        
        # 初始化父分块器
        self.parent_splitter = RecursiveCharacterTextSplitter(
            chunk_size=parent_size,
            chunk_overlap=parent_overlap,
            length_function=len,
            separators=parent_separators
        )
        
        # 依赖注入:子分块器 (必须由外部传入,此处不依赖 settings)
        if child_splitter is None:
            # 如果外部没传,默认给一个递归子分块器 (兜底策略)
            logger.warning("⚠️ ParentChildChunker 未传入 child_splitter,使用默认 RecursiveChildSplitter")
            self.child_splitter = RecursiveChildSplitter(
                chunk_size=200, 
                chunk_overlap=0, 
                separators=["\n\n", "\n", "。", "!", "?", " ", ""]
            )
        else:
            self.child_splitter = child_splitter
            
        logger.info(f"✂️ 初始化父子分块器:ParentSize={parent_size}, ChildStrategy={self.child_splitter.__class__.__name__}")

    def split_documents(self, docs: List[Document]) -> List[Document]:
        all_child_docs = []
        total_parents = 0
        
        for doc in docs:
            # 1. 切分父块
            parent_docs = self.parent_splitter.split_documents([doc])
            total_parents += len(parent_docs)
            
            for p_doc in parent_docs:
                parent_id = str(uuid.uuid4())
                parent_text = p_doc.page_content
                
                # 2. 使用注入的子分块器切分父块
                # 传入父块的元数据,子分块器会保留并增强它
                child_docs = self.child_splitter.split_text(parent_text, p_doc.metadata)
                
                for c_doc in child_docs:
                    # 3. 注入父子关系元数据
                    new_metadata = {
                        **c_doc.metadata,
                        "parent_id": parent_id,
                        "parent_text": parent_text, # 关键:存入大块文本供检索后返回
                        "is_child": True,
                        "chunk_type": f"child_{self.child_splitter.__class__.__name__}"
                    }
                    
                    child_doc = Document(
                        page_content=c_doc.page_content, # 小子块内容 (用于向量化)
                        metadata=new_metadata
                    )
                    all_child_docs.append(child_doc)
        
        logger.info(f"✅ 父子分块完成:输入 {len(docs)} 文档 -> 生成 {len(all_child_docs)} 个子块 (关联 {total_parents} 个父块)")
        return all_child_docs

# ==========================================
# 2. 子块分块器实现 (Concrete Child Splitters)
# ==========================================

class RecursiveChildSplitter(BaseChildSplitter):
    """
    基于字符递归切分的子块策略
    适用于:通用文档,追求均匀切分
    """
    def __init__(self, chunk_size: int, chunk_overlap: int, separators: List[str]):
        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=separators
        )
        logger.debug(f"   [ChildSplitter] 初始化:Recursive (Size={chunk_size}, Overlap={chunk_overlap})")

    def split_text(self, text: str, metadata: Dict[str, Any]) -> List[Document]:
        temp_doc = Document(page_content=text, metadata=metadata)
        return self.splitter.split_documents([temp_doc])

class SentenceChildSplitter(BaseChildSplitter):
    """
    基于 Jieba 的句子切分策略 (Sentence-as-Child)
    
    优势:
    1. 利用 jieba 强大的中文分句能力,准确识别 。!?等标点。
    2. 自动处理中英文混排、数字、缩写等复杂情况。
    3. 比正则更稳健,不易产生空切片或错误截断。
    """
    def __init__(self, min_sentence_len: int = 10):
        self.min_sentence_len = min_sentence_len
        # 预加载 jieba 字典 (可选,加速首次运行)
        # jieba.initialize() 
        logger.debug(f"   [ChildSplitter] 初始化:Jieba Sentence (MinLen={min_sentence_len})")

    def _split_into_sentences(self, text: str) -> List[str]:
        """
        使用 jieba 进行分句
        原理:jieba.lcut 会把标点符号也作为单独的词切分出来,我们据此重组句子。
        """
        # 使用 jieba 精确模式切分
        words = jieba.lcut(text)
        
        sentences = []
        current_sent = ""
        
        # 定义句末标点集合 (中文 + 英文)
        end_punctuations = {'。', '!', '?', '!', '?', '…', '…', '.'}
        # 换行符也视为句子结束
        newlines = {'\n', '\r'}
        
        for word in words:
            current_sent += word
            
            # 判断是否结束一个句子
            if word.strip() in end_punctuations or word in newlines:
                s = current_sent.strip()
                if len(s) >= self.min_sentence_len:
                    sentences.append(s)
                current_sent = ""
        
        # 处理最后一段没有标点的情况
        if current_sent.strip():
            s = current_sent.strip()
            if len(s) >= self.min_sentence_len:
                sentences.append(s)
                
        return sentences

    def split_text(self, text: str, metadata: Dict[str, Any]) -> List[Document]:
        sentences = self._split_into_sentences(text)
        docs = []
        
        if not sentences:
            # 如果分句失败(极少见),返回整个文本作为一个块,防止数据丢失
            logger.warning(f"⚠️ Jieba 分句结果为空,回退为单块。文本前50字:{text[:50]}...")
            return [Document(page_content=text, metadata=metadata)]
            
        for i, sent in enumerate(sentences):
            doc = Document(
                page_content=sent,
                metadata={
                    **metadata,
                    "sentence_index": i,
                    "total_sentences_in_parent": len(sentences),
                    "chunk_method": "jieba_sentence"
                }
            )
            docs.append(doc)
            
        return docs

# ==========================================
# 3. 子分块器工厂 (Internal Factory for Decoupling)
# ==========================================
# 注意:这是一个纯逻辑工厂,不包含 settings 依赖,由外部 factories.py 调用

class ChildSplitterFactory:
    """
    子分块器创建工厂
    根据传入的策略字符串和参数创建具体的子分块器实例
    """
    
    @staticmethod
    def create(strategy: str, **kwargs) -> BaseChildSplitter:
        strategy = strategy.lower()
        
        if strategy == "sentence" or strategy == "sentence_window" or strategy == "jieba":
            return SentenceChildSplitter(
                min_sentence_len=kwargs.get('min_sentence_len', 10)
            )
        elif strategy == "recursive" or strategy == "fixed" or strategy == "char":
            return RecursiveChildSplitter(
                chunk_size=kwargs.get('chunk_size', 200),
                chunk_overlap=kwargs.get('chunk_overlap', 0),
                separators=kwargs.get('separators', ["\n\n", "\n", "。", "!", "?", " ", ""])
            )
        else:
            logger.warning(f"⚠️ 未知子分块策略 '{strategy}',降级使用 recursive")
            return RecursiveChildSplitter(
                chunk_size=kwargs.get('chunk_size', 200),
                chunk_overlap=kwargs.get('chunk_overlap', 0),
                separators=kwargs.get('separators', ["\n\n", "\n", "。", "!", "?", " ", ""])
            )

2. 🏭 更新工厂组装 (src/rag/factories.py)

class ChunkerFactory:
    @staticmethod
    def get_chunker() -> BaseChunker:
        ...[前面内容不变]
        elif strategy == "parent_child":
            # ==========================================
            # 1. 解析子分块器相关配置
            # ==========================================
            child_strategy = settings.rag_offline.child_splitter_strategy
            child_chunk_size = settings.rag_offline.child_chunk_size
            child_chunk_overlap = settings.rag_offline.child_chunk_overlap
            min_sentence_len = settings.rag_offline.min_sentence_length
            parent_size = settings.rag_offline.chunk_size
            parent_overlap = settings.rag_offline.chunk_overlap
            
            logger.info(f"   └─ 子分块策略:{child_strategy}")
            logger.info(f"   └─ 父块大小:{parent_size}, 重叠:{parent_overlap}")

            # ==========================================
            # 2. 使用工厂创建【子分块器实例】 (依赖注入的核心)
            # ==========================================
            child_splitter = ChildSplitterFactory.create(
                strategy=child_strategy,
                chunk_size=child_chunk_size,
                chunk_overlap=child_chunk_overlap,
                min_sentence_len=min_sentence_len,
                separators=separators
            )
            
            # ==========================================
            # 3. 创建【父分块器】并注入子分块器
            # ==========================================
            return ParentChildChunker(
                parent_size=parent_size,
                parent_overlap=parent_overlap,
                child_splitter=child_splitter,  # 👈 关键:注入已经初始化好的子分块器
                parent_separators=["\n\n", "\n"] # 父块通常只按段落切分
            )

✅ 架构优势总结

  1. 高内聚低耦合:ParentChildChunker 不需要知道具体的子分块逻辑,只负责调度。ChildSplitterFactory 封装了所有创建细节。
  2. 极易扩展:未来如果想加一个 “SemanticChildSplitter” (用小模型聚类切分),只需:
    • 新建类继承 BaseChildSplitter。
    • 在 ChildSplitterFactory 注册。
    • 在 .env 加个配置,无需修改任何业务代码。
  3. 配置驱动:完全通过环境变量切换策略,方便进行 A/B 测试。
  4. 统一接口:对外依然暴露标准的 split_documents 方法,上层 Ingestion 流程无感知。

3. ⚙️ 更新配置文件 (src/core/config.py & .env)

class RagOfflineSettings(BaseSettings):
    """RAG 检索前 (Offline) 配置:分块与入库"""
    # ...[前面内容不变]
    
    # 父子分块特有参数
    # 子分块器配置
    child_chunk_size: int = 80
    child_chunk_overlap: int = 0
    child_splitter_strategy: str = "recursive"  # 可选:recursive, sentence
    min_sentence_length: int = 10

在运行测试前,还要修改.env中的内容

# 分块策略修改、要重新生成索引和向量
MILVUS_COLLECTION=knowledge_child_sentences
es_index_questions=knowledge_questions_child_sentences
es_index_summaries=knowledge_summaries_child_sentences

# RAG Offline (Chunking)
CHUNK_STRATEGY=parent_child
CHILD_SPLITTER_STRATEGY=sentence
CHILD_CHUNK_SIZE=80
CHILD_CHUNK_OVERLAP=0
MIN_SENTENCE_LENGTH=10

# RAG Online (Retrieval & Rerank)
ROUGH_TOP_K=2
FINAL_TOP_K=1

# Search Strategies (Plugins)
SEARCH_ENABLE_HYBRID_SEARCH=False
SEARCH_PLUGIN_REWRITTEN_QUERY=False
SEARCH_PLUGIN_ES_QUESTIONS=False
SEARCH_PLUGIN_ES_SUMMARIES=False

4. 数据入库与测试

先执行,数据入库操作, 由于父子索引方式切出的切片数量很多,每个都要跑一遍元数据增强,可能会很慢

python -m src.test.test_ingestion

然后再执行命中测试:

python -m src.test.recall_chunk_strategy

在这里插入图片描述

可以看到parent_child的MRR比前面用recursive 要高一点

MRR评估:

在命中测试(或更准确地说,在排序质量评估)中,MRR(Mean Reciprocal Rank,平均倒数排名)是越高越好

  • ✅ 为什么 MRR 越高越好?

MRR 的核心思想是:正确答案(或相关结果)在排序列表中出现的位置越靠前,得分越高

  • 计算公式:
    MRR = 1 Q ∑ i = 1 Q 1 rank i \text{MRR} = \frac{1}{Q} \sum_{i=1}^{Q} \frac{1}{\text{rank}_i} MRR=Q1i=1Qranki1
  • $ Q $:查询总数
  • $ \text{rank}_i $:第 $ i $ 个查询中,第一个相关结果的排名(从 1 开始)
  • 如果某次查询没有相关结果,则 $ \frac{1}{\text{rank}_i} = 0 $
  • 举例说明:
    | 查询 | 正确答案排名 | 倒数排名(Reciprocal Rank) |
    |------|---------------|-------------------------------|
    | Q1 | 1 | $ 1/1 = 1.0 $ |
    | Q2 | 3 | $ 1/3 \approx 0.333 $ |
    | Q3 | 未命中 | $ 0 $ |

→ MRR = $ (1.0 + 0.333 + 0) / 3 \approx 0.444 $

  • 如果所有正确答案都排在第 1 位 → MRR = 1.0最佳
  • 如果正确答案都很靠后或经常未命中 → MRR 接近 0最差

因此,MRR 的取值范围是 [0, 1],值越大表示系统排序能力越强,用户体验越好


📌 应用场景中的意义
  • RAG 系统:MRR 高意味着用户问题的相关文档被排在召回列表前列,有利于后续生成高质量答案。
  • 推荐系统:MRR 高说明用户感兴趣的商品/内容出现在推荐列表顶部。
  • 搜索引擎 / 知识图谱补全:MRR 是衡量“首个正确结果是否靠前”的黄金指标。

⚠️ 注意:MRR 只关注第一个相关结果的位置,不关心后面是否还有更多相关项。如果需要评估整体排序质量,应结合 NDCGMAP 使用。


✅ 总结
指标 趋势 含义
MRR 越高越好 首个相关结果越靠前,系统排序越精准
Hit Rate / Recall 越高越好 找到的相关结果越多
NDCG 越高越好 整体排序质量越高(考虑位置和相关性)

所以,在命中测试或 RAG 评估中,追求更高的 MRR 是正确的优化方向

第五部分:HyDE (Hypothetical Document Embeddings)—— 用“虚构的答案”来寻找“真实的文档”

核心目标:解决用户提问(Query)与专业文档(Document)之间的语义鸿沟和词汇不匹配问题。
演进路线:Query Rewriting → HyDE (生成式检索)。
关键概念:Embed the Answer, Retrieve the Question。

1. 痛点分析:为什么需要 HyDE?

❌ 传统检索的困境

  • 用户问:“电脑蓝屏了怎么办?” (口语化、症状描述)
  • 文档写:“Windows 操作系统致命错误修复指南:重启并检查内存转储。” (专业术语、解决方案)
  • 结果:
    • 关键词匹配 (BM25):完全没命中(“蓝屏”vs“致命错误”)。
    • 向量匹配 (Dense):虽然语义相近,但由于表述角度完全不同(问题 vs 答案),向量空间距离可能较远,导致排名靠后。

✅ HyDE 的解决方案
核心思想:“以答找文”。

  1. 生成 (Generate):让 LLM 根据用户问题,虚构一篇“完美的答案文档”(不需要事实正确,只要语义相关)。
  2. 嵌入 (Embed):将这篇虚构的文档转化为向量。
  3. 检索 (Retrieve):用虚构文档的向量去检索真实的文档库。
    • 原理:虚构的答案在文体、结构、词汇上与真实答案高度相似,因此在向量空间中距离更近。

2. 架构设计:集成到现有流程

我们将 HyDE 集成到现有的 Rewriter 模块中,作为 VectorRewrittenRetriever 的一种增强模式。
📂 修改文件

  • src/rag/rewriter.py: 增加 HyDE 生成逻辑。
  • src/rag/strategies/retrievers/vector_rewritten.py: 支持使用 HyDE 生成的文本进行检索。
  • src/core/config.py: 增加 HyDE 配置开关。
  • .env: 新增环境变量。

3. 实战编码

🛠️ 第一步:实现 HyDE 生成器 (src/rag/rewriter.py)

我们复用现有的 rewriter 客户端,增加一个专门生成“虚构文档”的方法。

👍 核心差异只是 Prompt 模板、温度 和 名称

  • 使用 配置字典 + 单一执行类 的模式,不仅代码量减少 80%,而且后续添加新策略(比如 Multi-Query)只需要在字典里加一行配置,完全不需要改代码逻辑。
# src/rag/rewriter.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from src.core.config import settings
from src.utils.xml_parser import remove_think_and_n
import logging
from typing import Dict, Any

logger = logging.getLogger(__name__)

# ==========================================
# 1. 策略配置中心 (Data-Driven)
# ==========================================
STRATEGY_CONFIGS: Dict[str, Dict[str, Any]] = {
    "standard": {
        "name": "StandardRewrite",
        "temperature": settings.llm.rewrite_temperature,
        "system_prompt": """
        你是一个高级 RAG 系统的查询优化专家。
        任务:将用户的自然语言问题改写为更适合向量数据库检索的陈述句。
        策略:口语转书面语、疑问转陈述、术语对齐、去噪。
        约束:只输出改写后的句子,无解释。
        """
    },
    "hyde": {
        "name": "HyDE (Hypothetical Document)",
        "temperature": settings.llm.hyde_temperature,
        "system_prompt": """
        请阅读以下问题,并撰写一段**虚构的、详细的**文档片段来回答这个问题。
        要求:
        1. 语气专业、客观,模仿技术文档或百科全书风格。
        2. 包含可能出现在真实文档中的关键词、术语和具体步骤。
        3. 不需要保证事实绝对正确,但必须在语义和结构上像一个真实的答案。
        4. 长度控制在 100-200 字左右。
        约束:只输出虚构的文档内容,不要包含“以下是虚构文档”等前缀。
        """
    },
    # 未来扩展示例:
    # "multi_query": { ... },
    # "step_back": { ... },
}

# ==========================================
# 2. 统一执行器 (Single Executor)
# ==========================================
class QueryRewriter:
    """
    统一查询重写器
    通过 strategy_name 动态加载配置,无重复代码
    """
    def __init__(self, strategy_name: str = "standard"):
        self.strategy_name = strategy_name.lower()
        
        # 获取配置,降级处理
        config = STRATEGY_CONFIGS.get(self.strategy_name, STRATEGY_CONFIGS["standard"])
        
        if self.strategy_name not in STRATEGY_CONFIGS:
            logger.warning(f"⚠️ 未知策略 '{self.strategy_name}',降级使用 standard")
            self.strategy_name = "standard"
            config = STRATEGY_CONFIGS["standard"]

        self.name = config["name"]
        self.temperature = config["temperature"]
        
        # 初始化 LLM (每个策略独立实例以隔离温度配置)
        self.llm = ChatOpenAI(
            model=settings.llm.model_name,
            base_url=settings.llm.base_url,
            api_key=settings.llm.api_key,
            temperature=self.temperature
        )
        
        # 构建 Prompt
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", config["system_prompt"]),
            ("human", "{original_query}")
        ])
        
        logger.info(f"📝 初始化重写器:{self.name} (Temp={self.temperature})")

    def rewrite(self, original_query: str) -> str:
        """执行重写 """
        try:
            chain = self.prompt | self.llm
            response = chain.invoke({"original_query": original_query})
            result = remove_think_and_n(response.content.strip())
            
            logger.debug(f"✨ [{self.name}] '{original_query}' -> '{result[:50]}...'")
            return result
        except Exception as e:
            logger.error(f"❌ [{self.name}] 失败,降级使用原查询:{e}")
            return original_query

# ==========================================
# 3. 工厂函数 (Factory Function)
# ==========================================
def get_rewriter(strategy_name: str = None) -> QueryRewriter:
    """工厂函数:根据配置返回重写器实例"""
    if not strategy_name:
        strategy_name = getattr(settings.search, 'rewritten_strategy', 'standard')
    return QueryRewriter(strategy_name=strategy_name)

🔄 第二步:更新检索插件(src/rag/strategies/retrievers/vector_rewritten.py)

修改插件加载,注意此时该插件能构造两种模式的检索器:

  1. standard:使用陈述句改写的
  2. hyde:使用虚拟文档改写的(记得在config.py中添加相关的配置变量)
# src/rag/strategies/retrievers/vector_rewritten.py
from src.rag.strategies.base import BaseRetrievalStrategy, SearchResult
from src.core.milvus_client import get_milvus_client
from src.rag.rewriter import get_rewriter # 复用之前的重写器
from typing import List, Optional
import logging
import os

logger = logging.getLogger(__name__)

class VectorRewrittenRetriever(BaseRetrievalStrategy):
    """
    插件 2: 变体向量检索
    策略:先让 LLM 重写查询 (Query Rewriting),再用重写后的句子进行向量搜索。
    """
    def __init__(self, strategy_name: str = "standard"):
        self.milvus = get_milvus_client()
        self.rewriter = get_rewriter(strategy_name=strategy_name)

        logger.info(f"🔌 [Plugin] 加载变体向量检索插件 ({strategy_name})")

🚀 第三步:更新融合器 (src/rag/strategies/composer.py)

class RetrieverComposer:
    """
    检索器组装器
    职责:根据配置动态加载多个检索插件,并行执行,并使用 RRF 融合结果。
    """
    def __init__(self):
        self.retrievers: List[BaseRetrievalStrategy] = []
        self.rrf_engine = RRFFusionEngine(k=settings.search.rrf_k)
        
        self._load_plugins()

    def _load_plugins(self):
        """根据配置动态加载插件"""
        # 1. 主路:永远加载
        self.retrievers.append(VectorTextRetriever())
        logger.info("✅ [Composer] 已加载主路:VectorText")
        
        # 2. 变体路:如果开启混合检索
        if settings.search.enable_hybrid_search:
            # 2. 改写路:如果开启改写
            if settings.search.plugin_rewritten_query:
                self.retrievers.append(VectorRewrittenRetriever('standard'))
                logger.info("✅ [Composer] 已加载变体路:VectorRewritten-standard")
            
            if settings.search.plugin_rewritten_hyde:
                self.retrievers.append(VectorRewrittenRetriever('hyde'))
                logger.info("✅ [Composer] 已加载变体路:VectorRewritten-hyde")

            # 3. ES 路:如果配置了 ES
            if settings.db.es_host:
                # 3. ES - Questions 路
                if settings.search.plugin_es_questions:
                    es_retriever = ESQuestionsRetriever()
                    if es_retriever.es.is_available(): # 只有连接成功才加入
                        self.retrievers.append(es_retriever)
                        logger.info("✅ [Composer] 已加载 ES - Questions 路:ESQuestions")
                # 4. ES - Summaries 路
                if settings.search.plugin_es_summaries:
                    es_retriever = ESSummariesRetriever()
                    if es_retriever.es.is_available(): # 只有连接成功才加入
                        self.retrievers.append(es_retriever)
                        logger.info("✅ [Composer] 已加载 ES - Summaries 路:ESSummaries")

4. 执行命中测试

修改.env文件

# Search Strategies (Plugins)
SEARCH_ENABLE_HYBRID_SEARCH=True
SEARCH_PLUGIN_REWRITTEN_QUERY=False
SEARCH_PLUGIN_REWRITTEN_HYDE=True
SEARCH_PLUGIN_ES_QUESTIONS=False
SEARCH_PLUGIN_ES_SUMMARIES=False
SEARCH_RRF_K=60

运行测试,注意把exit(1)的代码注释掉

python -m src.test.recall_chunk_strategy

结果有点令人失望,即使多了一路,评估指标也没有上去,可能跟文章本身就太奇葩了,大模型也预料不到有这样的报销政策,而且比较的对象也不应该选择这个,parent_child是属于数据处理方面的策略,而HyDE是属于检索策略,应该是HyDE与standard单独比较才对。

在这里插入图片描述

🏗️ 项目架构全景图与演进规划

本阶段在第五阶段“混合检索框架”基础上,进一步完成了两条关键进化路线:

  1. 数据侧:引入 Parent-Child Indexing + 可插拔 Child Splitter(递归/句子级),实现“查子返父”。
  2. 检索侧:引入 HyDE 重写策略,并将重写器升级为策略驱动模式(standard / hyde)。

因此当前系统已经从“多路检索可用”,走向“上下文更完整、召回表达更鲁棒”的 Context-Aware RAG 架构。

📂 当前项目结构解析

src/
├── main.py                          # 🚀 应用入口
│
├── core/                            # 🧱 基础设施层
│   ├── config.py                    # ⚙️ 配置中心(含 Parent-Child / HyDE 开关)
│   ├── redis_client.py              # 💾 短期记忆(LangGraph Checkpointer)
│   ├── db_session.py                # 🗄️ MySQL 连接池
│   ├── models.py                    # 📑 MySQL 模型
│   ├── milvus_client.py             # 🔭 Milvus 客户端
│   └── es_client.py                 # 🔎 Elasticsearch 客户端
│
├── rag/                             # 🧠 RAG 引擎层(第六阶段核心演进区)
│   ├── chunkers.py                  # ✂️ 分块体系(ParentChildChunker + ChildSplitterFactory)
│   ├── factories.py                 # 🏭 工厂模块(配置驱动注入 Child Splitter)
│   ├── ingestion.py                 # 📥 数据摄入(保留 parent_id/parent_text 元数据)
│   ├── pipeline.py                  # 🔄 检索执行流
│   ├── rewriter.py                  # ✍️ 查询重写器(standard / hyde 策略化)
│   ├── reranker.py                  # ⚖️ 重排序模块
│   ├── fusion/
│   │   └── rrf.py                   # 🔗 RRF 融合
│   └── strategies/
│       ├── base.py                  # 基础接口(SearchResult / BaseRetrievalStrategy)
│       ├── composer.py              # 🎼 检索器组装(异步并发 + RRF)
│       ├── metadata_filter.py       # 🔒 过滤表达式构建
│       └── retrievers/
│           ├── vector_text.py       # 插件 1:主路向量检索
│           ├── vector_rewritten.py  # 插件 2:改写向量检索(支持 HyDE)
│           ├── es_questions.py      # 插件 3:ES Questions
│           └── es_summaries.py      # 插件 4:ES Summaries
│
├── Mini_Agent/                      # 🤖 Agent 编排层
│   ├── agent_framework.py           # Agent 框架封装
│   ├── agent_v1.py                  # Agent 版本实现
│   ├── graph.py                     # LangGraph 工作流
│   ├── state.py                     # 状态定义
│   └── tools/
│       ├── base_tools.py            # 基础工具
│       ├── memory_tools.py          # 长期记忆工具
│       └── rag_tools.py             # RAG 工具(调用 pipeline)
│
├── service/                         # 🧩 业务服务层(预留)
│
├── utils/
│   └── xml_parser.py                # 🧰 输出清洗 / XML 解析辅助
│
└── test/                            # 🧪 测试与评估
    ├── test_ingestion.py            # 入库测试
    ├── recall_test.py               # 多路召回评估
    ├── recall_chunk_strategy.py     # 分块策略对比评估(第六阶段重点)
    ├── test_dynamic_rerank.py       # 动态重排测试
    ├── sync_es.py                   # Milvus -> ES 同步脚本
    └── ...

🌊 系统数据流向架构图

在这里插入图片描述

在这里插入图片描述

🔑 第六阶段关键能力落点

能力方向 关键实现 价值
上下文窗口优化 ParentChildChunker + parent_text 回传 兼顾“检索精度”与“上下文完整性”
子分块可插拔 BaseChildSplitter + ChildSplitterFactory 递归切分与句子切分可配置切换
重写策略升级 rewriter.py 策略化(standard / hyde 从“改写句子”升级到“生成假设文档”
多路调度融合 composer.py + rrf.py 多插件并发召回,降低单路偏差
配置驱动实验 config.py + .env 参数化 支持 A/B 测试与低成本试错
Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐