【Agent开发】第六阶段:RAG 深度优化实战 —— 父子索引与上下文窗口优化
特性Recursive (递归)Parent-Child (父子)Sentence (句子级)粒度中等 (200-500 字)子块小 (100-200 字) + 父块大极小 (10-50 字)向量语义完整性⭐⭐⭐⭐ (较好)⭐⭐⭐⭐ (子块较纯净)⭐⭐ (较差,易丢失上下文)关键词匹配精度⭐⭐⭐⭐⭐⭐⭐ (子块密度高)⭐⭐⭐⭐⭐ (极高)LLM 上下文质量⭐⭐⭐⭐⭐⭐⭐⭐⭐ (返回的是大块)⭐ (
【Agent开发】第六阶段:RAG 深度优化实战 —— 父子索引与上下文窗口优化 – pd的AI Agent开发笔记
文章目录
- 第三部分:父子索引与上下文窗口—— 解决“检索准”与“上下文全”的矛盾 (Parent-Child Indexing)
- 第四部分:父子索引优化--Sentence as Child
- 第五部分:HyDE (Hypothetical Document Embeddings)—— 用“虚构的答案”来寻找“真实的文档”
- 🏗️ 项目架构全景图与演进规划
前置环境:当前环境是基于WSL2 + Ubuntu 24.04 + Docker Desktop构建的云原生开发平台,所有服务(MySQL、Redis、Qwen)均以独立容器形式运行并通过Docker Compose统一编排。如何配置请参考我的博客 WSL2 + Ubuntu 24.04 + Docker Desktop 配置双内核环境 并且补充了milvus相关的配置,如何配置请参考我的博客 【Agent开发】第三阶段:RAG 实战 —— 赋予 Agent “外脑”。 并且引入了ES检索,并且配置了ES服务,ES部分的配置请查看我的博客 【Agent开发】第五阶段:RAG 深度优化实战 —— 从“可用”到“卓越”。
核心目标:打破“小 Chunk 检索准但上下文缺失,大 Chunk 上下文全但检索噪声大”的僵局。
演进路线:Advanced RAG → Context-Aware RAG。
关键概念:Small-to-Big Retrieval (小查大返)。
第三部分:父子索引与上下文窗口—— 解决“检索准”与“上下文全”的矛盾 (Parent-Child Indexing)
1. 痛点分析:为什么需要父子索引?
❌ 当前困境
在之前的架构中,我们通常选择一个固定的 Chunk Size(例如 500 字):
- 如果 Chunk 太小 (200 字):
- ✅ 优点:向量嵌入非常精准,能匹配到具体的细节。
- ❌ 缺点:丢失上下文。LLM 拿到片段后不知道“他”是谁,“这个公司”指哪家,导致回答断章取义。
- 如果 Chunk 太大 (2000 字):
- ✅ 优点:上下文完整,LLM 能理解全局。
- ❌ 缺点:向量被大量无关信息稀释,检索精度下降,容易匹配到错误文档。
✅ 解决方案:父子索引 (Parent-Child Indexing)
核心思想:“用小块检索,用大块生成”。
- 子块 (Child Chunks):小粒度分块(如 200 字),用于生成向量并建立索引。负责“被找到”。
- 父块 (Parent Chunks):大粒度分块(如 1000 字)或原始文档。负责“被阅读”。
- 映射关系:每个子块记录其所属的父块 ID。
- 检索流程:
- 用户查询 → 匹配 子块。
- 命中子块 → 通过 ID 找回对应的 父块。
- 将 父块 发送给 LLM 生成答案。
架构设计:数据模型与流程
我们需要在现有的 Milvus 结构中增加 parent_id 字段,并调整入库和检索逻辑。
📂 新增/修改文件结构
src/rag/
├── chunkers.py # 👈 更新:增加父子分块策略
├── ingestion.py # 👈 更新:建立父子映射并入库
├── strategies/
│ └── retrievers/
│ └── vector_text.py # 👈 更新:支持“查子返父”逻辑
└── pipeline.py # (无需大改,接口保持一致)
🗄️ Milvus Schema 变更
需要在 Collection 中增加一个标量字段 parent_id (VarChar),用于反向查找。
3. 实战编码
🛠️ 第一步:实现父子分块策略 (src/rag/chunkers.py)
我们将创建一个特殊的 Chunker,它先生成大块(父),再把大块切分成小块(子),并建立关联。
class BaseChunker(ABC):
"""分块器基类"""
def split_documents(self, docs: List[Document]) -> List[Document]:
raise NotImplementedError
@abstractmethod
def split_documents(self, docs: List[Document]) -> List[Document]:
"""
输入:原始 Document 列表
输出:分块后的 Document 列表 (每个 Document 的 metadata 可能包含增强信息)
"""
pass
class RecursiveChunker(BaseChunker):
# 已有逻辑
class FixedChunker(BaseChunker):
# 已有逻辑
class ParentChildChunker(BaseChunker):
"""
父子分块策略
生成两组数据:
1. Child Chunks: 小尺寸,用于向量化检索
2. Parent Chunks: 大尺寸 (或原文),用于提供给 LLM
3. 返回所有小块 (Child),但在 metadata 中注入 parent_id 和 parent_text
"""
def __init__(self, parent_size: int = 500, child_size: int = 50, overlap: int = 50,child_overlap: int = 10, separators: List[str] = None):
if separators is None:
separators = ["\n\n", "\n", "。", "!", "?", " ", ""]
logger.info(f"✂️ 初始化父子分块器:Parent={parent_size}, Child={child_size}, Overlap={overlap}")
self.parent_splitter = RecursiveCharacterTextSplitter(
chunk_size=parent_size,
chunk_overlap=overlap,
length_function=len,
separators=separators
)
self.child_splitter = RecursiveCharacterTextSplitter(
chunk_size=child_size,
chunk_overlap=overlap,
length_function=len,
separators=child_overlap
)
def split_documents(self, docs: List[Document]) -> List[Document]:
"""
兼容标准接口:输入 Document 列表,返回 Document 列表 (子块)
"""
all_child_docs = []
for doc in docs:
# 1. 切分父块
parent_docs = self.parent_splitter.split_documents([doc])
for p_doc in parent_docs:
# 为每个父块生成唯一 ID
parent_id = str(uuid.uuid4())
parent_text = p_doc.page_content
# 2. 将父块切分为子块
child_docs = self.child_splitter.split_documents([p_doc])
for c_doc in child_docs:
# 3. 注入父子关系元数据
# 复制原有 metadata,避免修改原始对象
new_metadata = {
**c_doc.metadata,
"parent_id": parent_id,
"parent_text": parent_text, # 关键:存入大块文本
"is_child": True,
"chunk_type": "child"
}
# 创建新的 Document 对象
child_doc = Document(
page_content=c_doc.page_content, # 小子块内容 (用于向量化)
metadata=new_metadata
)
all_child_docs.append(child_doc)
logger.info(f"✅ 父子分块完成:输入 {len(docs)} 文档 -> 生成 {len(all_child_docs)} 个子块 (关联 {len(parent_docs)} 个父块)")
return all_child_docs
🔄 第二步:更新入库逻辑 (src/rag/ingestion.py)
修改 DataIngestion 类,使其支持动态加载不同的 Chunker,并保持 process_file 逻辑不变(因为接口统一了)。
def process_file(self, file_path: str, category: str = "general"):
"""处理单个文件:加载 -> 分块 -> 增强 -> 入库"""
# 1. 加载文档
docs = self.load_document(file_path)
if not docs:
return
all_chunks = []
# 2. 分块 (统一调用 split_documents,无需关心内部是简单还是父子)
# 输入:List[Document], 输出:List[Document]
for doc in docs:
splits = self.text_splitter.split_documents([doc])
all_chunks.extend(splits)
logger.info(f"✂️ 分块完成,共生成 {len(all_chunks)} 个 chunks")
# 3. 入库
success_count = 0
for i, chunk in enumerate(all_chunks):
text = chunk.page_content
# 跳过过短的块
if len(text.strip()) < 5:
continue
# 元数据增强(可选:为了速度,生产环境可异步或批量处理)
enhanced_meta = self.enhance_metadata(text, chunk.metadata.get("source", ""))
summary_str = enhanced_meta.get("summary", "")
questions_str = enhanced_meta.get("questions", "")
# 合并元数据:保留分块器产生的 metadata (如 parent_id, parent_text)
final_metadata = {
**chunk.metadata, # 👈 关键:保留 parent_id 和 parent_text
"source": os.path.basename(file_path),
"page": chunk.metadata.get("page", 0),
"category": category,
"summary": summary_str,
"questions": questions_str
}
# 生成唯一 ID
doc_id = f"{os.path.basename(file_path)}_{i}_{uuid.uuid4().hex[:6]}"
# A. 存入 Milvus (向量化的是 chunk.page_content 即小子块)
try:
self.milvus.insert_data(
id=doc_id,
text=text,
metadata=final_metadata # metadata 里现在包含了 parent_text
)
success_count += 1
except Exception as e:
logger.error(f"❌ Milvus 插入失败:{e}")
# B. 存入 Elasticsearch (关键词库) - 双管齐下
if self.es.is_available():
if questions_str:
self.es.indexing_question(
doc_id=doc_id,
questions=questions_str,
summary=summary_str,
text=text,
metadata=final_metadata
)
if summary_str:
self.es.indexing_summary(
doc_id=doc_id,
summary=summary_str,
text=text,
metadata=final_metadata
)
logger.info(f"✅ 文件 {file_path} 处理完毕,成功入库 {success_count}/{len(all_chunks)} 条记录")
🔍 第三步:更新检索策略 (src/rag/strategies/retrievers)
- vector_text.py
- vector_rewritten.py
确保检索时能利用 parent_text 实现“查子返父”。这两个文件的改法类似
# src/rag/strategies/retrievers/vector_rewritten.py
from src.rag.strategies.base import BaseRetrievalStrategy, SearchResult
from src.core.milvus_client import get_milvus_client
from src.rag.rewriter import rewriter_instance # 复用之前的重写器
from typing import List, Optional
import logging
logger = logging.getLogger(__name__)
class VectorRewrittenRetriever(BaseRetrievalStrategy):
"""
插件 2: 变体向量检索
策略:先让 LLM 重写查询 (Query Rewriting),再用重写后的句子进行向量搜索。
"""
def __init__(self):
self.milvus = get_milvus_client()
self.rewriter = rewriter_instance
logger.info(f"🔌 [Plugin] 加载变体向量检索插件 (Rewritten)")
def search(self, query: str, top_k: int, filter_expr: Optional[str] = None, **kwargs) -> List[SearchResult]:
# 1. 生成变体查询
try:
rewritten_query = self.rewriter.rewrite(query)
if rewritten_query == query:
logger.debug("⚠️ [Vector-Rewritten] 重写后无变化,跳过此路以避免重复")
return []
logger.info(f"🔄 [Vector-Rewritten] 变体查询:{rewritten_query}")
except Exception as e:
logger.error(f"❌ [Vector-Rewritten] 重写失败:{e}")
return []
hits = self.milvus.search(
query=query,
top_k=top_k,
filter_expr=filter_expr,
output_fields=["text", "metadata"]
)
results = []
for hit in hits:
meta = hit['metadata'] or {}
original_text = hit['text']
final_text = original_text
source_tag = os.path.splitext(os.path.basename(__file__))[0]
# 👇 核心逻辑:查子返父
if meta.get("parent_text"):
final_text = meta["parent_text"]
source_tag = source_tag + "_parent"
# 可选:在 metadata 中记录原始子块文本,方便调试
meta['_matched_child_text'] = original_text
results.append(SearchResult(
text=final_text, # 返回大块文本给 LLM
score=hit['score'], # 分数基于小子块匹配 (精准)
metadata=meta,
source_field=source_tag
))
return results
- es_summaries.py
- es_questions.py
这两个文件的改法也很类似
class ESQuestionsRetriever(BaseRetrievalStrategy):
"""
插件 3: ES 关键词检索 (Questions 字段)
"""
def __init__(self):
self.es = es_client_instance
self.index_name = settings.db.es_index_questions
if self.es.is_available():
logger.info(f"🔌 [Plugin] ES 检索插件{self.index_name}已就绪")
else:
logger.warning("🔌 [Plugin] ES 不可用,此插件将自动跳过")
def search(self, query: str, top_k: int, filter_expr: Optional[str] = None, **kwargs) -> List[SearchResult]:
if not self.es.is_available():
return []
# 调用封装好的搜索方法
# 注意:ES 原生不支持复杂的 JSON 过滤表达式 (如 Milvus 语法),这里暂不实现 filter_expr
# 如果需要,可以在 ES 查询中添加 term 过滤
hits = self.es.search_questions(query, top_k=top_k)
results = []
for hit in hits:
meta = hit['metadata'] or {}
original_text = hit['text']
final_text = original_text
source_tag = os.path.splitext(os.path.basename(__file__))[0]
# 👇 核心逻辑:查子返父
if meta.get("parent_text"):
final_text = meta["parent_text"]
source_tag = source_tag + "_parent"
# 可选:在 metadata 中记录原始子块文本,方便调试
meta['_matched_child_text'] = original_text
results.append(SearchResult(
text=final_text,
score=hit['score'],
metadata=meta,
source_field=source_tag
))
return results
4. 数据入库与测试
另起一个milvus的索引,另起两个es索引(为了不影响之前的数据切片方式), 修改环境变量。
# Database
MILVUS_COLLECTION=knowledge_parent_child
es_index_questions=knowledge_questions_parent_child
es_index_summaries=knowledge_summaries_parent_child
# RAG Offline (Chunking)
CHUNK_STRATEGY=parent_child
先执行,数据入库操作, 由于父子索引方式切出的切片数量很多,每个都要跑一遍元数据增强,可能会很慢
python -m src.test.test_ingestion
测试代码
通过控制变量法(保持代码不变,仅切换 .env 中的 CHUNK_STRATEGY),分别运行两次测试,最后对比两份报告,可以清晰地量化 Parent-Child 策略相对于 Recursive 策略在单路召回(只用原始vector_text主路)下的提升效果。
# src/test/recall_chunk_strategy.py
# 用法: python -m src.test.recall_chunk_strategy
import asyncio
import logging
import statistics
from typing import List, Dict, Any
from src.rag.pipeline import pipeline_instance
from src.core.config import settings
from src.rag.strategies.base import SearchResult
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("ChunkStrategyEval")
# ==========================================
# 1. 测试数据集 (与主测试集保持一致,确保可比性)
# ==========================================
TEST_CASES = [
{
"category": "精确关键词匹配 (Easy)",
"questions": [
"根据《奇葩星球有限公司员工差旅与报销标准手册》,乘坐高铁的票价超过多少元将被视为“奢侈出行”?",
"在《小模型训练奇谭》中,推荐的“摸鱼Transformer”模型有多少参数?",
"奇葩星球公司禁止报销哪种咖啡?",
],
"keywords": [
["高铁", "奢侈出行", "10元"],
["摸鱼Transformer", "参数", "42000"],
["禁止报销", "咖啡", "星巴克"]
]
},
{
"category": "语义理解与推理 (Medium)",
"questions": [
"如果一名员工想通过步行出差获得最高荣誉,他每走一公里能拿到多少补贴?",
"在预算极其有限的情况下,《小模型训练奇谭》认为真正的智能取决于什么?",
"《小模型训练奇谭》推荐哪些非传统的数据来源用于训练小模型?",
],
"keywords": [
["步行", "补贴", "50元"],
["智能", "脑洞密度", "参数量"],
["数据来源", "微信群", "分手短信", "流浪猫"]
]
},
{
"category": "多跳推理 (Hard)",
"questions": [
"一名员工因工作压力在出差途中哭泣,根据公司规定,他如何能将情绪转化为实际的经济补偿?",
"《小模型训练奇谭》中提到的“穷人版分布式训练”是如何利用社交工具实现的?",
],
"keywords": [
["哭泣", "眼泪", "情感天平", "补贴"],
["穷人版分布式训练", "微信群", "梯度", "红包"]
]
},
{
"category": "区分相似概念 (Confusion)",
"questions": [
"“泡面”在《奇葩星球有限公司员工差旅与报销标准手册》和《小模型训练奇谭》中分别扮演什么角色?",
"两份文件都提到了“老板”,它们各自的态度或处理方式有何核心区别?",
],
"keywords": [
["泡面", "觉醒补贴", "泡面RNN", "电热水壶"],
["老板", "黑暗料理", "画饼", "老板探测器"]
]
}
]
# ==========================================
# 2. 评估逻辑
# ==========================================
def check_hit(results: List[SearchResult], keywords: List[str]) -> Dict[str, Any]:
"""检查检索结果是否命中"""
for rank, res in enumerate(results):
text = res.text.lower()
is_hit = False
for kw in keywords:
if len(kw) > 2 and kw.lower() in text:
is_hit = True
break
if is_hit:
return {
"hit": True,
"rank": rank + 1,
"source_plugin": res.source_field,
"snippet": text[:100] + "...",
"score": res.score,
"text_length": len(res.text), # 👈 新增:记录返回文本长度
"is_parent_mode": res.source_field.endswith("_parent") or res.metadata.get('_retrieval_mode') == 'child_to_parent' # 👈 新增:检测是否触发父子模式
}
return {
"hit": False,
"rank": -1,
"source_plugin": "None",
"snippet": "No relevant chunk found.",
"score": 0,
"text_length": 0,
"is_parent_mode": False
}
async def run_single_test(question: str, keywords: List[str], top_k: int = 5) -> Dict[str, Any]:
"""运行单个问题的测试"""
logger.info(f"\n❓ 测试问题:{question}")
try:
results = await pipeline_instance.run(query=question, top_k=top_k)
if not results:
logger.warning("⚠️ 无检索结果")
return {"question": question, "hit": False, "rank": -1, "total_results": 0, "text_lengths": [], "parent_mode_count": 0}
eval_result = check_hit(results, keywords)
# 统计本次查询的文本长度分布和父子模式触发情况
text_lengths = [len(r.text) for r in results]
parent_mode_count = sum(1 for r in results if r.source_field.endswith("_parent") or r.metadata.get('_retrieval_mode') == 'child_to_parent')
logger.info(f"✅ 评估结果:Hit={eval_result['hit']}, Rank={eval_result['rank']}, Source={eval_result['source_plugin']}")
logger.debug(f" 片段:{eval_result['snippet']} | 平均长度:{statistics.mean(text_lengths):.0f}")
return {
"question": question,
"hit": eval_result["hit"],
"rank": eval_result["rank"],
"source_plugin": eval_result["source_plugin"],
"total_results": len(results),
"top_score": results[0].score if results else 0,
"text_lengths": text_lengths, # 👈 新增
"parent_mode_count": parent_mode_count # 👈 新增
}
except Exception as e:
logger.error(f"❌ 执行出错:{e}")
import traceback
traceback.print_exc()
return {"question": question, "hit": False, "rank": -1, "error": str(e), "text_lengths": [], "parent_mode_count": 0}
async def main():
print("="*80)
print("🧪 RAG 分块策略对比测试 (单路召回)")
print("="*80)
# 打印当前关键配置
strategy = settings.rag_offline.chunk_strategy
use_parent = getattr(settings.rag_online, 'use_parent_context', False)
hybrid = settings.search.enable_hybrid_search
print(f"⚙️ 当前配置:")
print(f" - 分块策略 (CHUNK_STRATEGY): {strategy}")
print(f" - 启用父子上下文 (USE_PARENT_CONTEXT): {use_parent}")
print(f" - 混合检索 (ENABLE_HYBRID_SEARCH): {hybrid} (建议为 False 以测试单路)")
print("="*80)
if hybrid:
logger.warning("⚠️ 检测到混合检索已开启。为了纯粹对比分块策略,建议在 .env 中设置 SEARCH_ENABLE_HYBRID_SEARCH=False")
exit(1)
all_results = []
category_stats = {}
# 用于统计全局的文本长度和父子模式使用情况
all_text_lengths = []
total_parent_mode_hits = 0
for case in TEST_CASES:
cat_name = case["category"]
logger.info(f"\n--- 类别:{cat_name} ---")
category_stats[cat_name] = {"total": 0, "hit": 0, "ranks": []}
for q, kws in zip(case["questions"], case["keywords"]):
res = await run_single_test(q, kws, top_k=5)
res["category"] = cat_name
all_results.append(res)
category_stats[cat_name]["total"] += 1
if res.get("hit"):
category_stats[cat_name]["hit"] += 1
category_stats[cat_name]["ranks"].append(res["rank"])
# 累积统计
if res.get("text_lengths"):
all_text_lengths.extend(res["text_lengths"])
if res.get("parent_mode_count"):
total_parent_mode_hits += res["parent_mode_count"]
# ==========================================
# 3. 生成报告
# ==========================================
print("\n" + "="*80)
print("📊 评估报告总结")
print("="*80)
total_q = len(all_results)
total_hit = sum(1 for r in all_results if r.get("hit"))
overall_hit_rate = total_hit / total_q if total_q > 0 else 0
# 计算 MRR
reciprocal_ranks = [1.0/r["rank"] for r in all_results if r.get("hit") and r["rank"] > 0]
mrr = sum(reciprocal_ranks) / len(reciprocal_ranks) if reciprocal_ranks else 0.0
# 计算文本长度统计
avg_text_len = statistics.mean(all_text_lengths) if all_text_lengths else 0
median_text_len = statistics.median(all_text_lengths) if all_text_lengths else 0
print(f"\n🏆 总体表现:")
print(f" - 总问题数:{total_q}")
print(f" - 命中数 (Hit@5): {total_hit}")
print(f" - 命中率 (Hit Rate): {overall_hit_rate:.2%}")
print(f" - 平均倒数排名 (MRR): {mrr:.4f}")
print(f"\n📏 上下文窗口分析:")
print(f" - 返回片段平均长度:{avg_text_len:.0f} 字")
print(f" - 返回片段中位长度:{median_text_len:.0f} 字")
if strategy == "parent_child" and use_parent:
print(f" - 触发'查子返父'次数:{total_parent_mode_hits} 次 (共 {total_q * 5} 个结果)")
print(f" - 父子模式覆盖率:{(total_parent_mode_hits / (total_q * 5)) * 100:.1f}%")
else:
print(f" - 父子模式:未启用或非 Parent-Child 策略")
print(f"\n📈 分类别表现:")
print(f"{'类别':<30} | {'命中率':<10} | {'MRR':<10} | {'平均排名':<10}")
print("-" * 70)
for cat, stats in category_stats.items():
hit_rate = stats["hit"] / stats["total"] if stats["total"] > 0 else 0
ranks = stats["ranks"]
cat_mrr = sum([1.0/r for r in ranks]) / len(ranks) if ranks else 0
avg_rank = sum(ranks) / len(ranks) if ranks else 0
print(f"{cat:<30} | {hit_rate:>6.2%} | {cat_mrr:>6.4f} | {avg_rank:>6.2f}")
print("\n💡 策略对比指南:")
print(" 请保存此报告,然后修改 .env 文件:")
print(" 1. 将 CHUNK_STRATEGY 从 'recursive' 改为 'parent_child' (或反之)")
print(" 2. 再次运行本测试脚本")
print(" 3. 对比两份报告的 [命中率] 和 [上下文窗口分析]")
print(" 👉 预期:Parent-Child 策略在保持命中率的同时,应显著增加 [平均长度],从而提升 LLM 回答质量。")
print("="*80)
if __name__ == "__main__":
asyncio.run(main())
执行测试
python -m src.test.recall_chunk_strategy
parent_child模式下的配置
# [其他内容不变]
# Database
MILVUS_COLLECTION=knowledge_parent_child
es_index_questions=knowledge_questions_parent_child
es_index_summaries=knowledge_summaries_parent_child
# RAG Online (Retrieval & Rerank)
ROUGH_TOP_K=2
FINAL_TOP_K=1
# RAG Offline (Chunking)
CHUNK_STRATEGY=parent_child
# Search Strategies (Plugins)
SEARCH_ENABLE_HYBRID_SEARCH=False
SEARCH_PLUGIN_REWRITTEN_QUERY=False
SEARCH_PLUGIN_ES_QUESTIONS=False
SEARCH_PLUGIN_ES_SUMMARIES=False

recursive模式下的配置
# [其他内容不变]
# Database
MILVUS_COLLECTION=knowledge_base
es_index_questions=knowledge_questions
es_index_summaries=es_index_summaries # 这个之前写错了,就一直保留到现在
# RAG Offline (Chunking)
CHUNK_STRATEGY=recursive
# RAG Online (Retrieval & Rerank)
ROUGH_TOP_K=2
FINAL_TOP_K=1
# Search Strategies (Plugins)
SEARCH_ENABLE_HYBRID_SEARCH=False
SEARCH_PLUGIN_REWRITTEN_QUERY=False
SEARCH_PLUGIN_ES_QUESTIONS=False
SEARCH_PLUGIN_ES_SUMMARIES=False

对比两份报告时,重点关注以下指标的变化:
| 指标 | 预期变化 (Parent-Child vs Recursive) | 含义解读 |
|---|---|---|
| Hit Rate (命中率) | 持平 或 微升 | 子块更小,向量更纯净,理论上召回更准;但如果父块切分不当也可能略降。理想情况是持平。 |
| MRR (排名) | 提升 | 更精准的小块匹配应该能让正确答案排在更前面。 |
| 平均长度 (Avg Len) | 显著增加 ⬆️ | 这是核心指标。Recursive 可能返回 200 字,而 Parent-Child 应返回 1000 字。这意味着 LLM 拥有了更多上下文。 |
| 父子模式覆盖率 | > 80% | 如果开启了 USE_PARENT_CONTEXT,绝大多数命中的结果都应该触发“查子返父”。 |
💡 为什么这样做更科学?
- 隔离变量:通过环境变量控制,代码完全一致,排除了代码逻辑干扰。
- 单路纯净:关闭混合检索,单独评估向量检索在不同分块粒度下的表现,避免 ES 干扰结论。
- 上下文感知:不仅看“能不能找到”(Hit Rate),还看“找到的内容够不够用”(Text Length),这才是 RAG 最终效果的瓶颈所在。
第四部分:父子索引优化–Sentence as Child
在纯粹的“向量检索”阶段,按句子划分(Sentence Chunking)往往确实没有明显优势,甚至经常表现更差。
但这并不代表它“没用”,而是它的适用场景和发挥作用的位置与其他策略不同。让我们深入剖析一下为什么会出现这种情况,以及句子级分块的真正价值在哪里。
1. 为什么句子级分块在“向量检索”中显得弱势?
❌ 核心痛点:语义碎片化 (Semantic Fragmentation)
向量模型(Embedding Model)需要一定的上下文才能理解一个句子的完整含义。
- 例子:
- 原文:“虽然这家公司的财务报表看起来很好,但是其现金流存在严重问题。”
- 按句子切分:
- Chunk A: “虽然这家公司的财务报表看起来很好。” → \rightarrow → 向量含义:正面、财务好。
- Chunk B: “但是其现金流存在严重问题。” → \rightarrow → 向量含义:负面、现金流差。
- 用户查询:“这家公司的财务状况如何?”
- 结果:
- Chunk A 的向量可能匹配到“财务好”的查询,导致误召回(幻觉源头)。
- Chunk B 丢失了“虽然…但是…”的转折逻辑,丢失了前提条件。
- 对比 Recursive/Parent-Child:它们会保留前后句的关联,向量能捕捉到“表面好但实际差”的复杂语义。
❌ 噪声敏感
短句子往往包含大量代词(“他”、“它”、“这”),脱离上下文后,这些句子的向量表示非常模糊,容易匹配到不相关的文档。
2. 那句子级分块到底有什么用?(它的杀手锏)
句子级分块的优势不在“向量相似度计算”,而在以下三个特定场景:
✅ 场景一:高精度的关键词匹配 (Keyword Search / BM25)
如果你的检索主要依赖 Elasticsearch (BM25) 而不是向量:
- 长段落会稀释关键词密度(Term Frequency)。
- 短句子里关键词密度极高,BM25 打分会非常高。
- 结论:在纯关键词检索或混合检索的 ES 路中,句子级分块往往比长段落更精准。
✅ 场景二:极短文本或结构化数据
- 法律条款、诗歌、代码行、日志条目:这些内容本身就是以“句/行”为最小语义单位的。强行合并反而会破坏结构。
- 例子:搜索“第 3 条第 2 款关于违约的规定”。如果按 500 字切分,可能把第 3 条和第 4 条混在一起;按句子切分则能精准定位。
✅ 场景三:作为 Parent-Child 中的 “Child” (最佳实践!)
这才是句子级分块的终极形态!
- 不要直接用句子级分块去检索并返回给 LLM(太碎了)。
- 而是:用句子级分块作为 Child Chunks 生成向量,然后映射到 Parent Chunks(段落级)。
- 效果:
- 检索时:利用句子的纯净向量进行高精度匹配(解决了长段落语义稀释问题)。
- 返回时:通过 Parent ID 找回完整的段落(解决了句子语义缺失问题)。
- 这就是所谓的 “Sentence Window Retrieval”(LlamaIndex 中的经典策略)。
3. 三种策略的横向对比总结
| 特性 | Recursive (递归) | Parent-Child (父子) | Sentence (句子级) |
|---|---|---|---|
| 粒度 | 中等 (200-500 字) | 子块小 (100-200 字) + 父块大 | 极小 (10-50 字) |
| 向量语义完整性 | ⭐⭐⭐⭐ (较好) | ⭐⭐⭐⭐ (子块较纯净) | ⭐⭐ (较差,易丢失上下文) |
| 关键词匹配精度 | ⭐⭐⭐ | ⭐⭐⭐⭐ (子块密度高) | ⭐⭐⭐⭐⭐ (极高) |
| LLM 上下文质量 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ (返回的是大块) | ⭐ (太碎,LLM 无法推理) |
| 最佳适用场景 | 通用文档、博客、文章 | 企业知识库、长文档、复杂推理 | 法律条款、日志、作为父子索引的子块 |
| 单独使用推荐度 | ✅ 推荐 (稳健) | ✅✅ 强烈推荐 (SOTA) | ❌ 不推荐 (除非特殊场景) |
Sentence as Child的模式
Sentence-as-Child 模式。这是目前 RAG 领域公认的“版本答案”之一(LlamaIndex 称之为 Sentence Window Retrieval)。
🎯 核心逻辑
- 父块 (Parent):保持较大的段落(如 800-1000 字),提供完整上下文给 LLM。
- 子块 (Child):不再使用简单的字符切分,而是使用语义完整的句子作为最小单元。
- 优势:一个句子通常表达一个完整的命题,生成的向量(Embedding)语义最纯净,检索匹配度最高。
- 映射:多个句子子块 → 映射到同一个父块 ID。
1. 🛠️ 升级分块器 (src/rag/chunkers.py)
在中文场景下,jieba 是处理分句和分词最成熟、轻量且准确的库。使用它不仅能准确识别中文标点(如 。!?),还能避免正则表达式在处理复杂文本(如包含英文缩写、数字混合)时的边界错误。(英文句切分可以使用 nltk)
pip install jieba
import jieba
class BaseChildSplitter(ABC):
"""
子块分块器基类 (专门用于 Parent-Child 模式)
负责将单个父块文本切分为多个子块 Document
"""
@abstractmethod
def split_text(self, text: str, metadata: Dict[str, Any]) -> List[Document]:
"""
Args:
text: 父块的文本内容
metadata: 父块的元数据 (将传递给子块)
Returns:
子块 Document 列表
"""
pass
class ParentChildChunker(BaseChunker):
"""
父子分块策略 (Parent-Child Indexing)
逻辑:
1. 将原文切分为大块 (Parent)。
2. 利用注入的 child_splitter 将每个大块切分为小块 (Child)。
3. 返回所有小块 (Child),但在 metadata 中注入 parent_id 和 parent_text。
依赖注入:
- child_splitter: 由外部工厂传入的具体 BaseChildSplitter 实例
"""
def __init__(
self,
parent_size: int = 1000,
parent_overlap: int = 100,
child_splitter: Optional[BaseChildSplitter] = None,
parent_separators: List[str] = None
):
self.parent_size = parent_size
self.parent_overlap = parent_overlap
# 初始化父分块器
self.parent_splitter = RecursiveCharacterTextSplitter(
chunk_size=parent_size,
chunk_overlap=parent_overlap,
length_function=len,
separators=parent_separators
)
# 依赖注入:子分块器 (必须由外部传入,此处不依赖 settings)
if child_splitter is None:
# 如果外部没传,默认给一个递归子分块器 (兜底策略)
logger.warning("⚠️ ParentChildChunker 未传入 child_splitter,使用默认 RecursiveChildSplitter")
self.child_splitter = RecursiveChildSplitter(
chunk_size=200,
chunk_overlap=0,
separators=["\n\n", "\n", "。", "!", "?", " ", ""]
)
else:
self.child_splitter = child_splitter
logger.info(f"✂️ 初始化父子分块器:ParentSize={parent_size}, ChildStrategy={self.child_splitter.__class__.__name__}")
def split_documents(self, docs: List[Document]) -> List[Document]:
all_child_docs = []
total_parents = 0
for doc in docs:
# 1. 切分父块
parent_docs = self.parent_splitter.split_documents([doc])
total_parents += len(parent_docs)
for p_doc in parent_docs:
parent_id = str(uuid.uuid4())
parent_text = p_doc.page_content
# 2. 使用注入的子分块器切分父块
# 传入父块的元数据,子分块器会保留并增强它
child_docs = self.child_splitter.split_text(parent_text, p_doc.metadata)
for c_doc in child_docs:
# 3. 注入父子关系元数据
new_metadata = {
**c_doc.metadata,
"parent_id": parent_id,
"parent_text": parent_text, # 关键:存入大块文本供检索后返回
"is_child": True,
"chunk_type": f"child_{self.child_splitter.__class__.__name__}"
}
child_doc = Document(
page_content=c_doc.page_content, # 小子块内容 (用于向量化)
metadata=new_metadata
)
all_child_docs.append(child_doc)
logger.info(f"✅ 父子分块完成:输入 {len(docs)} 文档 -> 生成 {len(all_child_docs)} 个子块 (关联 {total_parents} 个父块)")
return all_child_docs
# ==========================================
# 2. 子块分块器实现 (Concrete Child Splitters)
# ==========================================
class RecursiveChildSplitter(BaseChildSplitter):
"""
基于字符递归切分的子块策略
适用于:通用文档,追求均匀切分
"""
def __init__(self, chunk_size: int, chunk_overlap: int, separators: List[str]):
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=separators
)
logger.debug(f" [ChildSplitter] 初始化:Recursive (Size={chunk_size}, Overlap={chunk_overlap})")
def split_text(self, text: str, metadata: Dict[str, Any]) -> List[Document]:
temp_doc = Document(page_content=text, metadata=metadata)
return self.splitter.split_documents([temp_doc])
class SentenceChildSplitter(BaseChildSplitter):
"""
基于 Jieba 的句子切分策略 (Sentence-as-Child)
优势:
1. 利用 jieba 强大的中文分句能力,准确识别 。!?等标点。
2. 自动处理中英文混排、数字、缩写等复杂情况。
3. 比正则更稳健,不易产生空切片或错误截断。
"""
def __init__(self, min_sentence_len: int = 10):
self.min_sentence_len = min_sentence_len
# 预加载 jieba 字典 (可选,加速首次运行)
# jieba.initialize()
logger.debug(f" [ChildSplitter] 初始化:Jieba Sentence (MinLen={min_sentence_len})")
def _split_into_sentences(self, text: str) -> List[str]:
"""
使用 jieba 进行分句
原理:jieba.lcut 会把标点符号也作为单独的词切分出来,我们据此重组句子。
"""
# 使用 jieba 精确模式切分
words = jieba.lcut(text)
sentences = []
current_sent = ""
# 定义句末标点集合 (中文 + 英文)
end_punctuations = {'。', '!', '?', '!', '?', '…', '…', '.'}
# 换行符也视为句子结束
newlines = {'\n', '\r'}
for word in words:
current_sent += word
# 判断是否结束一个句子
if word.strip() in end_punctuations or word in newlines:
s = current_sent.strip()
if len(s) >= self.min_sentence_len:
sentences.append(s)
current_sent = ""
# 处理最后一段没有标点的情况
if current_sent.strip():
s = current_sent.strip()
if len(s) >= self.min_sentence_len:
sentences.append(s)
return sentences
def split_text(self, text: str, metadata: Dict[str, Any]) -> List[Document]:
sentences = self._split_into_sentences(text)
docs = []
if not sentences:
# 如果分句失败(极少见),返回整个文本作为一个块,防止数据丢失
logger.warning(f"⚠️ Jieba 分句结果为空,回退为单块。文本前50字:{text[:50]}...")
return [Document(page_content=text, metadata=metadata)]
for i, sent in enumerate(sentences):
doc = Document(
page_content=sent,
metadata={
**metadata,
"sentence_index": i,
"total_sentences_in_parent": len(sentences),
"chunk_method": "jieba_sentence"
}
)
docs.append(doc)
return docs
# ==========================================
# 3. 子分块器工厂 (Internal Factory for Decoupling)
# ==========================================
# 注意:这是一个纯逻辑工厂,不包含 settings 依赖,由外部 factories.py 调用
class ChildSplitterFactory:
"""
子分块器创建工厂
根据传入的策略字符串和参数创建具体的子分块器实例
"""
@staticmethod
def create(strategy: str, **kwargs) -> BaseChildSplitter:
strategy = strategy.lower()
if strategy == "sentence" or strategy == "sentence_window" or strategy == "jieba":
return SentenceChildSplitter(
min_sentence_len=kwargs.get('min_sentence_len', 10)
)
elif strategy == "recursive" or strategy == "fixed" or strategy == "char":
return RecursiveChildSplitter(
chunk_size=kwargs.get('chunk_size', 200),
chunk_overlap=kwargs.get('chunk_overlap', 0),
separators=kwargs.get('separators', ["\n\n", "\n", "。", "!", "?", " ", ""])
)
else:
logger.warning(f"⚠️ 未知子分块策略 '{strategy}',降级使用 recursive")
return RecursiveChildSplitter(
chunk_size=kwargs.get('chunk_size', 200),
chunk_overlap=kwargs.get('chunk_overlap', 0),
separators=kwargs.get('separators', ["\n\n", "\n", "。", "!", "?", " ", ""])
)
2. 🏭 更新工厂组装 (src/rag/factories.py)
class ChunkerFactory:
@staticmethod
def get_chunker() -> BaseChunker:
...[前面内容不变]
elif strategy == "parent_child":
# ==========================================
# 1. 解析子分块器相关配置
# ==========================================
child_strategy = settings.rag_offline.child_splitter_strategy
child_chunk_size = settings.rag_offline.child_chunk_size
child_chunk_overlap = settings.rag_offline.child_chunk_overlap
min_sentence_len = settings.rag_offline.min_sentence_length
parent_size = settings.rag_offline.chunk_size
parent_overlap = settings.rag_offline.chunk_overlap
logger.info(f" └─ 子分块策略:{child_strategy}")
logger.info(f" └─ 父块大小:{parent_size}, 重叠:{parent_overlap}")
# ==========================================
# 2. 使用工厂创建【子分块器实例】 (依赖注入的核心)
# ==========================================
child_splitter = ChildSplitterFactory.create(
strategy=child_strategy,
chunk_size=child_chunk_size,
chunk_overlap=child_chunk_overlap,
min_sentence_len=min_sentence_len,
separators=separators
)
# ==========================================
# 3. 创建【父分块器】并注入子分块器
# ==========================================
return ParentChildChunker(
parent_size=parent_size,
parent_overlap=parent_overlap,
child_splitter=child_splitter, # 👈 关键:注入已经初始化好的子分块器
parent_separators=["\n\n", "\n"] # 父块通常只按段落切分
)
✅ 架构优势总结
- 高内聚低耦合:ParentChildChunker 不需要知道具体的子分块逻辑,只负责调度。ChildSplitterFactory 封装了所有创建细节。
- 极易扩展:未来如果想加一个 “SemanticChildSplitter” (用小模型聚类切分),只需:
- 新建类继承 BaseChildSplitter。
- 在 ChildSplitterFactory 注册。
- 在 .env 加个配置,无需修改任何业务代码。
- 配置驱动:完全通过环境变量切换策略,方便进行 A/B 测试。
- 统一接口:对外依然暴露标准的 split_documents 方法,上层 Ingestion 流程无感知。
3. ⚙️ 更新配置文件 (src/core/config.py & .env)
class RagOfflineSettings(BaseSettings):
"""RAG 检索前 (Offline) 配置:分块与入库"""
# ...[前面内容不变]
# 父子分块特有参数
# 子分块器配置
child_chunk_size: int = 80
child_chunk_overlap: int = 0
child_splitter_strategy: str = "recursive" # 可选:recursive, sentence
min_sentence_length: int = 10
在运行测试前,还要修改.env中的内容
# 分块策略修改、要重新生成索引和向量
MILVUS_COLLECTION=knowledge_child_sentences
es_index_questions=knowledge_questions_child_sentences
es_index_summaries=knowledge_summaries_child_sentences
# RAG Offline (Chunking)
CHUNK_STRATEGY=parent_child
CHILD_SPLITTER_STRATEGY=sentence
CHILD_CHUNK_SIZE=80
CHILD_CHUNK_OVERLAP=0
MIN_SENTENCE_LENGTH=10
# RAG Online (Retrieval & Rerank)
ROUGH_TOP_K=2
FINAL_TOP_K=1
# Search Strategies (Plugins)
SEARCH_ENABLE_HYBRID_SEARCH=False
SEARCH_PLUGIN_REWRITTEN_QUERY=False
SEARCH_PLUGIN_ES_QUESTIONS=False
SEARCH_PLUGIN_ES_SUMMARIES=False
4. 数据入库与测试
先执行,数据入库操作, 由于父子索引方式切出的切片数量很多,每个都要跑一遍元数据增强,可能会很慢
python -m src.test.test_ingestion
然后再执行命中测试:
python -m src.test.recall_chunk_strategy

可以看到parent_child的MRR比前面用recursive 要高一点
MRR评估:
在命中测试(或更准确地说,在排序质量评估)中,MRR(Mean Reciprocal Rank,平均倒数排名)是越高越好。
- ✅ 为什么 MRR 越高越好?
MRR 的核心思想是:正确答案(或相关结果)在排序列表中出现的位置越靠前,得分越高。
- 计算公式:
MRR = 1 Q ∑ i = 1 Q 1 rank i \text{MRR} = \frac{1}{Q} \sum_{i=1}^{Q} \frac{1}{\text{rank}_i} MRR=Q1i=1∑Qranki1
- $ Q $:查询总数
- $ \text{rank}_i $:第 $ i $ 个查询中,第一个相关结果的排名(从 1 开始)
- 如果某次查询没有相关结果,则 $ \frac{1}{\text{rank}_i} = 0 $
- 举例说明:
| 查询 | 正确答案排名 | 倒数排名(Reciprocal Rank) |
|------|---------------|-------------------------------|
| Q1 | 1 | $ 1/1 = 1.0 $ |
| Q2 | 3 | $ 1/3 \approx 0.333 $ |
| Q3 | 未命中 | $ 0 $ |
→ MRR = $ (1.0 + 0.333 + 0) / 3 \approx 0.444 $
- 如果所有正确答案都排在第 1 位 → MRR = 1.0(最佳)
- 如果正确答案都很靠后或经常未命中 → MRR 接近 0(最差)
因此,MRR 的取值范围是 [0, 1],值越大表示系统排序能力越强,用户体验越好。
📌 应用场景中的意义
- RAG 系统:MRR 高意味着用户问题的相关文档被排在召回列表前列,有利于后续生成高质量答案。
- 推荐系统:MRR 高说明用户感兴趣的商品/内容出现在推荐列表顶部。
- 搜索引擎 / 知识图谱补全:MRR 是衡量“首个正确结果是否靠前”的黄金指标。
⚠️ 注意:MRR 只关注第一个相关结果的位置,不关心后面是否还有更多相关项。如果需要评估整体排序质量,应结合 NDCG 或 MAP 使用。
✅ 总结
| 指标 | 趋势 | 含义 |
|---|---|---|
| MRR | 越高越好 | 首个相关结果越靠前,系统排序越精准 |
| Hit Rate / Recall | 越高越好 | 找到的相关结果越多 |
| NDCG | 越高越好 | 整体排序质量越高(考虑位置和相关性) |
所以,在命中测试或 RAG 评估中,追求更高的 MRR 是正确的优化方向。
第五部分:HyDE (Hypothetical Document Embeddings)—— 用“虚构的答案”来寻找“真实的文档”
核心目标:解决用户提问(Query)与专业文档(Document)之间的语义鸿沟和词汇不匹配问题。
演进路线:Query Rewriting → HyDE (生成式检索)。
关键概念:Embed the Answer, Retrieve the Question。
1. 痛点分析:为什么需要 HyDE?
❌ 传统检索的困境
- 用户问:“电脑蓝屏了怎么办?” (口语化、症状描述)
- 文档写:“Windows 操作系统致命错误修复指南:重启并检查内存转储。” (专业术语、解决方案)
- 结果:
- 关键词匹配 (BM25):完全没命中(“蓝屏”vs“致命错误”)。
- 向量匹配 (Dense):虽然语义相近,但由于表述角度完全不同(问题 vs 答案),向量空间距离可能较远,导致排名靠后。
✅ HyDE 的解决方案
核心思想:“以答找文”。
- 生成 (Generate):让 LLM 根据用户问题,虚构一篇“完美的答案文档”(不需要事实正确,只要语义相关)。
- 嵌入 (Embed):将这篇虚构的文档转化为向量。
- 检索 (Retrieve):用虚构文档的向量去检索真实的文档库。
- 原理:虚构的答案在文体、结构、词汇上与真实答案高度相似,因此在向量空间中距离更近。
2. 架构设计:集成到现有流程
我们将 HyDE 集成到现有的 Rewriter 模块中,作为 VectorRewrittenRetriever 的一种增强模式。
📂 修改文件
src/rag/rewriter.py: 增加 HyDE 生成逻辑。src/rag/strategies/retrievers/vector_rewritten.py: 支持使用 HyDE 生成的文本进行检索。src/core/config.py: 增加 HyDE 配置开关。.env: 新增环境变量。
3. 实战编码
🛠️ 第一步:实现 HyDE 生成器 (src/rag/rewriter.py)
我们复用现有的 rewriter 客户端,增加一个专门生成“虚构文档”的方法。
👍 核心差异只是 Prompt 模板、温度 和 名称
- 使用 配置字典 + 单一执行类 的模式,不仅代码量减少 80%,而且后续添加新策略(比如 Multi-Query)只需要在字典里加一行配置,完全不需要改代码逻辑。
# src/rag/rewriter.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from src.core.config import settings
from src.utils.xml_parser import remove_think_and_n
import logging
from typing import Dict, Any
logger = logging.getLogger(__name__)
# ==========================================
# 1. 策略配置中心 (Data-Driven)
# ==========================================
STRATEGY_CONFIGS: Dict[str, Dict[str, Any]] = {
"standard": {
"name": "StandardRewrite",
"temperature": settings.llm.rewrite_temperature,
"system_prompt": """
你是一个高级 RAG 系统的查询优化专家。
任务:将用户的自然语言问题改写为更适合向量数据库检索的陈述句。
策略:口语转书面语、疑问转陈述、术语对齐、去噪。
约束:只输出改写后的句子,无解释。
"""
},
"hyde": {
"name": "HyDE (Hypothetical Document)",
"temperature": settings.llm.hyde_temperature,
"system_prompt": """
请阅读以下问题,并撰写一段**虚构的、详细的**文档片段来回答这个问题。
要求:
1. 语气专业、客观,模仿技术文档或百科全书风格。
2. 包含可能出现在真实文档中的关键词、术语和具体步骤。
3. 不需要保证事实绝对正确,但必须在语义和结构上像一个真实的答案。
4. 长度控制在 100-200 字左右。
约束:只输出虚构的文档内容,不要包含“以下是虚构文档”等前缀。
"""
},
# 未来扩展示例:
# "multi_query": { ... },
# "step_back": { ... },
}
# ==========================================
# 2. 统一执行器 (Single Executor)
# ==========================================
class QueryRewriter:
"""
统一查询重写器
通过 strategy_name 动态加载配置,无重复代码
"""
def __init__(self, strategy_name: str = "standard"):
self.strategy_name = strategy_name.lower()
# 获取配置,降级处理
config = STRATEGY_CONFIGS.get(self.strategy_name, STRATEGY_CONFIGS["standard"])
if self.strategy_name not in STRATEGY_CONFIGS:
logger.warning(f"⚠️ 未知策略 '{self.strategy_name}',降级使用 standard")
self.strategy_name = "standard"
config = STRATEGY_CONFIGS["standard"]
self.name = config["name"]
self.temperature = config["temperature"]
# 初始化 LLM (每个策略独立实例以隔离温度配置)
self.llm = ChatOpenAI(
model=settings.llm.model_name,
base_url=settings.llm.base_url,
api_key=settings.llm.api_key,
temperature=self.temperature
)
# 构建 Prompt
self.prompt = ChatPromptTemplate.from_messages([
("system", config["system_prompt"]),
("human", "{original_query}")
])
logger.info(f"📝 初始化重写器:{self.name} (Temp={self.temperature})")
def rewrite(self, original_query: str) -> str:
"""执行重写 """
try:
chain = self.prompt | self.llm
response = chain.invoke({"original_query": original_query})
result = remove_think_and_n(response.content.strip())
logger.debug(f"✨ [{self.name}] '{original_query}' -> '{result[:50]}...'")
return result
except Exception as e:
logger.error(f"❌ [{self.name}] 失败,降级使用原查询:{e}")
return original_query
# ==========================================
# 3. 工厂函数 (Factory Function)
# ==========================================
def get_rewriter(strategy_name: str = None) -> QueryRewriter:
"""工厂函数:根据配置返回重写器实例"""
if not strategy_name:
strategy_name = getattr(settings.search, 'rewritten_strategy', 'standard')
return QueryRewriter(strategy_name=strategy_name)
🔄 第二步:更新检索插件(src/rag/strategies/retrievers/vector_rewritten.py)
修改插件加载,注意此时该插件能构造两种模式的检索器:
- standard:使用陈述句改写的
- hyde:使用虚拟文档改写的(记得在config.py中添加相关的配置变量)
# src/rag/strategies/retrievers/vector_rewritten.py
from src.rag.strategies.base import BaseRetrievalStrategy, SearchResult
from src.core.milvus_client import get_milvus_client
from src.rag.rewriter import get_rewriter # 复用之前的重写器
from typing import List, Optional
import logging
import os
logger = logging.getLogger(__name__)
class VectorRewrittenRetriever(BaseRetrievalStrategy):
"""
插件 2: 变体向量检索
策略:先让 LLM 重写查询 (Query Rewriting),再用重写后的句子进行向量搜索。
"""
def __init__(self, strategy_name: str = "standard"):
self.milvus = get_milvus_client()
self.rewriter = get_rewriter(strategy_name=strategy_name)
logger.info(f"🔌 [Plugin] 加载变体向量检索插件 ({strategy_name})")
🚀 第三步:更新融合器 (src/rag/strategies/composer.py)
class RetrieverComposer:
"""
检索器组装器
职责:根据配置动态加载多个检索插件,并行执行,并使用 RRF 融合结果。
"""
def __init__(self):
self.retrievers: List[BaseRetrievalStrategy] = []
self.rrf_engine = RRFFusionEngine(k=settings.search.rrf_k)
self._load_plugins()
def _load_plugins(self):
"""根据配置动态加载插件"""
# 1. 主路:永远加载
self.retrievers.append(VectorTextRetriever())
logger.info("✅ [Composer] 已加载主路:VectorText")
# 2. 变体路:如果开启混合检索
if settings.search.enable_hybrid_search:
# 2. 改写路:如果开启改写
if settings.search.plugin_rewritten_query:
self.retrievers.append(VectorRewrittenRetriever('standard'))
logger.info("✅ [Composer] 已加载变体路:VectorRewritten-standard")
if settings.search.plugin_rewritten_hyde:
self.retrievers.append(VectorRewrittenRetriever('hyde'))
logger.info("✅ [Composer] 已加载变体路:VectorRewritten-hyde")
# 3. ES 路:如果配置了 ES
if settings.db.es_host:
# 3. ES - Questions 路
if settings.search.plugin_es_questions:
es_retriever = ESQuestionsRetriever()
if es_retriever.es.is_available(): # 只有连接成功才加入
self.retrievers.append(es_retriever)
logger.info("✅ [Composer] 已加载 ES - Questions 路:ESQuestions")
# 4. ES - Summaries 路
if settings.search.plugin_es_summaries:
es_retriever = ESSummariesRetriever()
if es_retriever.es.is_available(): # 只有连接成功才加入
self.retrievers.append(es_retriever)
logger.info("✅ [Composer] 已加载 ES - Summaries 路:ESSummaries")
4. 执行命中测试
修改.env文件
# Search Strategies (Plugins)
SEARCH_ENABLE_HYBRID_SEARCH=True
SEARCH_PLUGIN_REWRITTEN_QUERY=False
SEARCH_PLUGIN_REWRITTEN_HYDE=True
SEARCH_PLUGIN_ES_QUESTIONS=False
SEARCH_PLUGIN_ES_SUMMARIES=False
SEARCH_RRF_K=60
运行测试,注意把exit(1)的代码注释掉
python -m src.test.recall_chunk_strategy
结果有点令人失望,即使多了一路,评估指标也没有上去,可能跟文章本身就太奇葩了,大模型也预料不到有这样的报销政策,而且比较的对象也不应该选择这个,parent_child是属于数据处理方面的策略,而HyDE是属于检索策略,应该是HyDE与standard单独比较才对。

🏗️ 项目架构全景图与演进规划
本阶段在第五阶段“混合检索框架”基础上,进一步完成了两条关键进化路线:
- 数据侧:引入 Parent-Child Indexing + 可插拔 Child Splitter(递归/句子级),实现“查子返父”。
- 检索侧:引入 HyDE 重写策略,并将重写器升级为策略驱动模式(
standard/hyde)。
因此当前系统已经从“多路检索可用”,走向“上下文更完整、召回表达更鲁棒”的 Context-Aware RAG 架构。
📂 当前项目结构解析
src/
├── main.py # 🚀 应用入口
│
├── core/ # 🧱 基础设施层
│ ├── config.py # ⚙️ 配置中心(含 Parent-Child / HyDE 开关)
│ ├── redis_client.py # 💾 短期记忆(LangGraph Checkpointer)
│ ├── db_session.py # 🗄️ MySQL 连接池
│ ├── models.py # 📑 MySQL 模型
│ ├── milvus_client.py # 🔭 Milvus 客户端
│ └── es_client.py # 🔎 Elasticsearch 客户端
│
├── rag/ # 🧠 RAG 引擎层(第六阶段核心演进区)
│ ├── chunkers.py # ✂️ 分块体系(ParentChildChunker + ChildSplitterFactory)
│ ├── factories.py # 🏭 工厂模块(配置驱动注入 Child Splitter)
│ ├── ingestion.py # 📥 数据摄入(保留 parent_id/parent_text 元数据)
│ ├── pipeline.py # 🔄 检索执行流
│ ├── rewriter.py # ✍️ 查询重写器(standard / hyde 策略化)
│ ├── reranker.py # ⚖️ 重排序模块
│ ├── fusion/
│ │ └── rrf.py # 🔗 RRF 融合
│ └── strategies/
│ ├── base.py # 基础接口(SearchResult / BaseRetrievalStrategy)
│ ├── composer.py # 🎼 检索器组装(异步并发 + RRF)
│ ├── metadata_filter.py # 🔒 过滤表达式构建
│ └── retrievers/
│ ├── vector_text.py # 插件 1:主路向量检索
│ ├── vector_rewritten.py # 插件 2:改写向量检索(支持 HyDE)
│ ├── es_questions.py # 插件 3:ES Questions
│ └── es_summaries.py # 插件 4:ES Summaries
│
├── Mini_Agent/ # 🤖 Agent 编排层
│ ├── agent_framework.py # Agent 框架封装
│ ├── agent_v1.py # Agent 版本实现
│ ├── graph.py # LangGraph 工作流
│ ├── state.py # 状态定义
│ └── tools/
│ ├── base_tools.py # 基础工具
│ ├── memory_tools.py # 长期记忆工具
│ └── rag_tools.py # RAG 工具(调用 pipeline)
│
├── service/ # 🧩 业务服务层(预留)
│
├── utils/
│ └── xml_parser.py # 🧰 输出清洗 / XML 解析辅助
│
└── test/ # 🧪 测试与评估
├── test_ingestion.py # 入库测试
├── recall_test.py # 多路召回评估
├── recall_chunk_strategy.py # 分块策略对比评估(第六阶段重点)
├── test_dynamic_rerank.py # 动态重排测试
├── sync_es.py # Milvus -> ES 同步脚本
└── ...
🌊 系统数据流向架构图


🔑 第六阶段关键能力落点
| 能力方向 | 关键实现 | 价值 |
|---|---|---|
| 上下文窗口优化 | ParentChildChunker + parent_text 回传 |
兼顾“检索精度”与“上下文完整性” |
| 子分块可插拔 | BaseChildSplitter + ChildSplitterFactory |
递归切分与句子切分可配置切换 |
| 重写策略升级 | rewriter.py 策略化(standard / hyde) |
从“改写句子”升级到“生成假设文档” |
| 多路调度融合 | composer.py + rrf.py |
多插件并发召回,降低单路偏差 |
| 配置驱动实验 | config.py + .env 参数化 |
支持 A/B 测试与低成本试错 |
更多推荐

所有评论(0)