

代码开源Github地址 ↗
RAG重排序与查询集成:提升检索精度的关键技术#
在前面的章节中,我们学习了如何优化文档索引和检索策略。但是,初次检索的结果往往不够精确。本章将深入探讨如何通过重排序和查询集成技术进一步提升检索质量。
为什么要重排序?#
向量检索的局限性
场景:
用户查询: "Python中如何处理异常?"
向量检索结果(按相似度排序):
1. 文档A: "Python异常处理机制...try-except..." (相似度: 0.85)
2. 文档B: "Python中的错误类型...Exception类..." (相似度: 0.83)
3. 文档C: "Python编程基础...变量、函数..." (相似度: 0.82)python问题分析:
- → 文档A最相关,排序正确 ✅
- → 但相似度差异很小(0.85 vs 0.83 vs 0.82)
- → 向量相似度不能完全反映真实相关性
- → 文档C不太相关但相似度也不低
解决方案:重排序 🎯
- → 使用更强大的模型重新评估文档相关性
- → 考虑查询和文档的精确匹配度
- → 调整排序,确保最相关的文档排在前面
本章技术概览#
| 技术 | 核心功能 | 优势 | 复杂度 | 推荐指数 |
|---|---|---|---|---|
| 交叉编码器重排序 | 精确相关性评估 | 准确度最高 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 倒数排序融合(RRF) | 多结果融合 | 鲁棒性强 | ⭐⭐ | ⭐⭐⭐⭐ |
| 多查询检索 | 多角度查询 | 召回率高 | ⭐⭐ | ⭐⭐⭐⭐ |
| 查询扩展 | 语义扩充 | 覆盖面广 | ⭐⭐ | ⭐⭐⭐ |
| 混合检索 | 向量+关键词 | 全面性好 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
Part 1: 交叉编码器重排序 - Cross-Encoder Reranking#
1.1 核心概念#
双编码器(Bi-Encoder) vs 交叉编码器(Cross-Encoder):
双编码器(用于初始检索)
查询 → 编码器 → 查询向量 ──┐
├→ 余弦相似度
文档 → 编码器 → 文档向量 ──┘
优点:快速,可预先计算文档向量
缺点:查询和文档独立编码,无法捕捉细粒度交互plaintext交叉编码器(用于重排序)
查询 + 文档 → 编码器 → 相关性分数
优点:查询和文档联合编码,更精确
缺点:慢,无法预先计算plaintext1.2 工作流程#
RAG + 重排序流程
1. 初始检索(双编码器)
→ 从大量文档中快速检索top-k个候选
→ 例如:从10000个文档中检索top-50
2. 重排序(交叉编码器)
→ 对top-k个候选重新评分
→ 更精确地排序
3. 返回最终结果
→ 返回重排序后的top-n个文档
→ 例如:返回最相关的top-5plaintext1.3 实现本地交叉编码器模型#
from sentence_transformers import CrossEncoder
from typing import List
from langchain_core.documents import Document
from langchain_community.vectorstores import Chroma
import chromadb
from chromadb.config import Settings
class LocalCrossEncoderReranker:
"""使用本地交叉编码器模型重排序"""
def __init__(self, embeddings, model_name: str = "./Models/ms-marco-MiniLM-L-6-v2/cross-encoder/ms-marco-MiniLM-L6-v2"):
self.embeddings = embeddings
## 加载交叉编码器模型
print(f"📥 加载交叉编码器模型: {model_name}")
self.cross_encoder = CrossEncoder(model_name)
self.persist_directory: str = "./chroma_db"
client = chromadb.PersistentClient(path=self.persist_directory)
try:
client.delete_collection("local_rerank")
print("🗑️ 已删除旧的 Chroma 集合 'local_rerank'")
except ValueError:
print("🆕 未发现旧集合,将创建新的 'local_rerank' 集合")
## 创建新的向量存储
self.vectorstore = Chroma(
collection_name="local_rerank",
embedding_function=embeddings,
persist_directory=self.persist_directory
)
def add_documents(self, documents: List[Document]):
"""添加文档"""
self.vectorstore.add_documents(documents)
def retrieve_and_rerank(
self,
query: str,
initial_k: int = 20,
final_k: int = 5
):
"""检索并重排序"""
## 1. 初始检索
initial_docs = self.vectorstore.similarity_search(query, k=initial_k)
## 2. 准备查询-文档对
query_doc_pairs = [
[query, doc.page_content] for doc in initial_docs
]
## 3. 使用交叉编码器计算相关性分数
print(f"🎯 使用交叉编码器重新评分...")
scores = self.cross_encoder.predict(query_doc_pairs)
## 4. 根据分数排序
scored_docs = [
{'document': doc, 'score': score}
for doc, score in zip(initial_docs, scores)
]
## 按分数降序排序
scored_docs.sort(key=lambda x: x['score'], reverse=True)
## 5. 返回top-k
reranked_docs = scored_docs[:final_k]
print("\n重排序结果:")
for i, item in enumerate(reranked_docs):
print(f"{i+1}. [得分: {item['score']:.4f}] {item['document'].page_content[:100]}...")
return reranked_docspython1.4 实际应用演示#
## 准备测试文档
documents = [
Document(
page_content="""
Python异常处理完整指南
使用try-except捕获异常:
try:
risky_operation()
except Exception as e:
print(f"发生错误: {e}")
可以捕获特定异常类型,也可以使用finally子句。
""",
metadata={"source": "python_exceptions.md"}
),
Document(
page_content="""
Python错误和异常类型
Python有多种内置异常类型:
- ValueError: 值错误
- TypeError: 类型错误
- KeyError: 键错误
- IndexError: 索引错误
所有异常都继承自Exception类。
""",
metadata={"source": "python_error_types.md"}
),
Document(
page_content="""
Python编程基础教程
本教程涵盖:
- 变量和数据类型
- 控制流语句
- 函数定义
- 模块导入
""",
metadata={"source": "python_basics.md"}
),
Document(
page_content="""
如何在Python中优雅地处理错误
最佳实践:
1. 只捕获你能处理的异常
2. 使用具体的异常类型而不是Exception
3. 提供有用的错误信息
4. 适当时使用finally清理资源
""",
metadata={"source": "python_error_best_practices.md"}
),
]
## 使用示例
local_reranker = LocalCrossEncoderReranker(embeddings)
local_reranker.add_documents(documents)
query = "Python中如何处理异常?"
results = local_reranker.retrieve_and_rerank(query, initial_k=10, final_k=3)python运行结果:
📥 加载交叉编码器模型: ./Models/ms-marco-MiniLM-L-6-v2/cross-encoder/ms-marco-MiniLM-L6-v2
🗑️ 已删除旧的 Chroma 集合 'local_rerank'
🎯 使用交叉编码器重新评分...
重排序结果:
1. [得分: 8.3246]
如何在Python中优雅地处理错误
最佳实践:
1. 只捕获你能处理的异常
2. 使用具体的异常类型而不是Excep...
2. [得分: 7.6097]
Python异常处理完整指南
使用try-except捕获异常:
try:
risky_operation(...
3. [得分: 7.5047]
Python错误和异常类型
Python有多种内置异常类型:
- ValueError: 值错误
- TypeErr...plaintext1.5 常用交叉编码器模型#
RERANKER_MODELS = {
"small_fast": {
"name": "cross-encoder/ms-marco-TinyBERT-L-2-v2",
"params": "~4M",
"speed": "very fast",
"quality": "good"
},
"balanced": {
"name": "cross-encoder/ms-marco-MiniLM-L-6-v2",
"params": "~22M",
"speed": "fast",
"quality": "very good"
},
"high_quality": {
"name": "cross-encoder/ms-marco-MiniLM-L-12-v2",
"params": "~33M",
"speed": "medium",
"quality": "excellent"
},
"multilingual": {
"name": "cross-encoder/mmarco-mMiniLMv2-L12-H384-v1",
"params": "~118M",
"speed": "medium",
"quality": "excellent (多语言)"
}
}
def choose_reranker(priority: str = "balanced"):
"""选择合适的重排序模型"""
model_info = RERANKER_MODELS.get(priority, RERANKER_MODELS["balanced"])
print(f"📊 选择模型: {model_info['name']}")
print(f" 参数量: {model_info['params']}")
print(f" 速度: {model_info['speed']}")
print(f" 质量: {model_info['quality']}")
return CrossEncoder(model_info['name'])python1.6 技术优势与适用场景#
优势: ✅
- 精确度显著提升:交叉编码器能捕捉查询和文档的细粒度交互
- 解决语义鸿沟:弥补向量检索的局限性
- 灵活配置:可根据需求调整初始检索和重排序的比例
- 质量可控:通过分数阈值控制返回文档的质量
适用场景:
- 高精度要求的问答系统
- 法律、医疗等专业领域检索
- 需要精确匹配的搜索应用
- 对检索质量要求极高的生产环境
性能考虑: ⚠️
- 计算开销:重排序会增加响应时间
- 资源需求:需要加载额外的模型
- 批量优化:建议批量处理以提高效率
交叉编码器重排序技术通过更精细的相关性评估,显著提升了RAG系统的检索精度,是构建高质量智能问答系统的关键技术之一。
Part 2: 倒数排序融合 - Reciprocal Rank Fusion (RRF)#
2.1 核心概念#
倒数排序融合(RRF)是一种融合多个排序列表的算法,它不需要知道具体的分数,只需要排名。这种方法特别适合融合来自不同检索系统的结果。
2.2 RRF算法原理#
RRF公式
对于文档d,其RRF分数为:
RRF(d) = Σ (1 / (k + rank_i(d)))plaintext其中:
rank_i(d): 文档d在第i个排序列表中的排名k: 常数,通常取60Σ: 对所有排序列表求和
示例演示
假设有2个排序列表,k=60:
列表1: [DocA, DocB, DocC] (DocA排名1, DocB排名2, DocC排名3)
列表2: [DocC, DocA, DocD] (DocC排名1, DocA排名2, DocD排名3)plaintextRRF分数计算:
DocA: 1/(60+1) + 1/(60+2) = 0.0164 + 0.0161 = 0.0325
DocB: 1/(60+2) + 0 = 0.0161
DocC: 1/(60+3) + 1/(60+1) = 0.0159 + 0.0164 = 0.0323
DocD: 0 + 1/(60+3) = 0.0159plaintext最终排序: DocA > DocC > DocB > DocD
2.3 实现RRF融合器#
from typing import List, Dict
from collections import defaultdict
class ReciprocalRankFusion:
"""倒数排序融合"""
def __init__(self, k: int = 60):
"""
Args:
k: RRF常数,通常取60
"""
self.k = k
def fuse(self, ranked_lists: List[List[Document]]) -> List[Dict]:
"""
融合多个排序列表
Args:
ranked_lists: 多个文档排序列表
Returns:
融合后的文档列表(包含RRF分数)
"""
## 存储每个文档的RRF分数
doc_scores = defaultdict(float)
doc_map = {} ## 文档ID到文档对象的映射
## 遍历每个排序列表
for ranked_list in ranked_lists:
for rank, doc in enumerate(ranked_list, start=1):
## 使用page_content作为文档唯一标识
doc_id = id(doc)
## 计算RRF分数
rrf_score = 1.0 / (self.k + rank)
doc_scores[doc_id] += rrf_score
## 保存文档对象
if doc_id not in doc_map:
doc_map[doc_id] = doc
## 按RRF分数排序
sorted_docs = sorted(
doc_scores.items(),
key=lambda x: x[1],
reverse=True
)
## 构建结果
fused_results = [
{
'document': doc_map[doc_id],
'rrf_score': score
}
for doc_id, score in sorted_docs
]
return fused_resultspython2.4 使用示例#
## 创建RRF融合器
rrf = ReciprocalRankFusion(k=60)
## 模拟多个检索器的结果
## 检索器1: 向量相似度检索
retriever1_results = [
Document(page_content="文档A: Python异常处理..."),
Document(page_content="文档B: 错误类型..."),
Document(page_content="文档C: 编程基础...")
]
## 检索器2: BM25关键词检索
retriever2_results = [
Document(page_content="文档C: 编程基础..."),
Document(page_content="文档A: Python异常处理..."),
Document(page_content="文档D: 最佳实践...")
]
## 融合结果
fused = rrf.fuse([retriever1_results, retriever2_results])
print("RRF融合结果:")
for i, item in enumerate(fused):
print(f"{i+1}. [RRF分数: {item['rrf_score']:.4f}] {item['document'].page_content[:50]}...")python运行结果:
RRF融合结果:
1. [RRF分数: 0.0164] 文档A: Python异常处理......
2. [RRF分数: 0.0164] 文档C: 编程基础......
3. [RRF分数: 0.0161] 文档B: 错误类型......
4. [RRF分数: 0.0161] 文档A: Python异常处理......
5. [RRF分数: 0.0159] 文档C: 编程基础......
6. [RRF分数: 0.0159] 文档D: 最佳实践......plaintext2.5 实现完整的RRF检索器#
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from typing import List, Dict
class RRFEnsembleRetriever:
"""使用RRF的集成检索器"""
def __init__(self, embeddings, documents: List[Document]):
self.embeddings = embeddings
self.persist_directory: str = "./chroma_db"
## 1. 向量检索器
self.vectorstore = Chroma(
collection_name="rrf_ensemble",
embedding_function=embeddings,
persist_directory=self.persist_directory
)
self.vectorstore.add_documents(documents)
self.vector_retriever = self.vectorstore.as_retriever(search_kwargs={"k": 10})
## 2. BM25关键词检索器
self.bm25_retriever = BM25Retriever.from_documents(documents)
self.bm25_retriever.k = 10
## 3. RRF融合器
self.rrf = ReciprocalRankFusion(k=60)
def retrieve(self, query: str, k: int = 5) -> List[Dict]:
"""使用RRF融合多个检索器的结果"""
print(f"🔍 查询: {query}\n")
## 1. 向量检索
print("📊 向量检索...")
vector_results = self.vector_retriever.get_relevant_documents(query)
print(f" → 检索到 {len(vector_results)} 个文档")
## 2. BM25检索
print("🔤 BM25关键词检索...")
bm25_results = self.bm25_retriever.get_relevant_documents(query)
print(f" → 检索到 {len(bm25_results)} 个文档")
## 3. RRF融合
print("\n🔀 RRF融合...")
fused_results = self.rrf.fuse([vector_results, bm25_results])
## 返回top-k
return fused_results[:k]python2.6 实际应用演示#
## 准备测试文档
documents = [
Document(
page_content="""
Python异常处理完整指南
使用try-except捕获异常:
try:
risky_operation()
except Exception as e:
print(f"发生错误: {e}")
可以捕获特定异常类型,也可以使用finally子句。
""",
metadata={"source": "python_exceptions.md"}
),
Document(
page_content="""
Python错误和异常类型
Python有多种内置异常类型:
- ValueError: 值错误
- TypeError: 类型错误
- KeyError: 键错误
- IndexError: 索引错误
所有异常都继承自Exception类。
""",
metadata={"source": "python_error_types.md"}
),
Document(
page_content="""
如何在Python中优雅地处理错误
最佳实践:
1. 只捕获你能处理的异常
2. 使用具体的异常类型而不是Exception
3. 提供有用的错误信息
4. 适当时使用finally清理资源
""",
metadata={"source": "python_error_best_practices.md"}
),
]
## 使用示例
rrf_retriever = RRFEnsembleRetriever(embeddings, documents)
results = rrf_retriever.retrieve("Python异常处理", k=3)
print("\n最终结果:")
for i, item in enumerate(results):
print(f"\n{i+1}. [RRF分数: {item['rrf_score']:.4f}]")
print(f" {item['document'].page_content[:150]}...")python运行结果:
🔍 查询: Python异常处理
📊 向量检索...
→ 检索到 10 个文档
🔤 BM25关键词检索...
→ 检索到 4 个文档
🔀 RRF融合...
最终结果:
1. [RRF分数: 0.0164]
Python异常处理完整指南
使用try-except捕获异常:
try:
risky_operation()
except Exception as e:
print...
2. [RRF分数: 0.0164]
如何在Python中优雅地处理错误
最佳实践:
1. 只捕获你能处理的异常
2. 使用具体的异常类型而不是Exception
3. 提供有用的错误信息
4. 适当时使用finally清...
3. [RRF分数: 0.0161]
Python异常处理完整指南
使用try-except捕获异常:
try:
risky_operation()
except Exception as e:
print...plaintext2.7 技术优势与适用场景#
优势: ✅
- 无需分数归一化:直接使用排名,避免不同系统的分数尺度问题
- 鲁棒性强:对单个检索器的异常结果不敏感
- 简单高效:算法简单,计算开销小
- 可扩展性好:容易集成新的检索系统
适用场景:
- 多检索系统融合:结合向量检索、关键词检索等不同方法
- 联邦搜索:融合来自不同数据源的结果
- 专家系统:结合不同专业领域的检索器
- 容错系统:需要鲁棒性的生产环境
性能考虑: ⚠️
- 排名质量依赖:结果质量取决于各个检索器的排名质量
- 常数选择:k值需要根据具体场景调整
- 重复文档处理:需要处理不同检索器返回的相同文档
RRF技术通过简单而有效的方法融合多个检索系统的结果,显著提升了检索的鲁棒性和准确性,是构建复杂检索系统的关键技术之一。
Part 3: 多查询检索 - Multi-Query Retrieval#
3.1 核心概念#
多查询检索是一种通过生成多个查询变体来提高检索召回率的先进技术。其核心思路是:
- 从单个用户查询生成多个相似但不同的查询
- 对每个查询分别进行检索
- 融合所有检索结果
3.2 为什么需要多查询?#
问题:单一查询可能不够全面
用户查询: "Python如何读取文件?"
## 可能的相关文档:
- "Python文件读取open()函数" ✅ 匹配
- "读写文件的最佳实践" ❌ 可能不匹配(没有"Python")
- "使用pathlib处理文件路径" ❌ 可能不匹配(没有"读取")python解决方案:生成多个查询变体 ✅
原始查询: "Python如何读取文件?"
生成的查询变体:
1. "在Python中打开和读取文件"
2. "Python file I/O操作"
3. "使用open()函数读取文件内容"
4. "Python文件处理方法"
→ 不同变体可能匹配不同的相关文档
→ 融合结果,提高召回率plaintext3.3 实现使用LLM生成查询变体#
from langchain.vectorstores import Chroma
from langchain.schema import Document
from typing import List, Dict
from vllm import SamplingParams
class MultiQueryRetriever:
"""多查询检索器(适配本地模型)"""
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
def generate_queries(self, question: str) -> List[str]:
"""生成查询变体"""
## 使用ChatML格式
prompt = f"""<|im_start|>system
你是一个搜索查询生成助手。请为用户的查询生成3个不同的变体,这些变体表达相同的意图但用词不同。
请每行输出一个查询,不要编号。<|im_end|>
<|im_start|>user
用户查询:{question}<|im_end|>
<|im_start|>assistant
"""
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.9,
max_tokens=200,
stop=["<|im_end|>"]
)
outputs = self.llm.generate([prompt], sampling_params)
response = outputs[0].outputs[0].text.strip() if outputs and outputs[0].outputs else ""
## 解析生成的查询
queries = [q.strip() for q in response.split('\n') if q.strip()]
## 加入原始查询
all_queries = [question] + queries
print(f"📝 生成了 {len(all_queries)} 个查询变体")
return all_queries
def simple_rrf_fusion(self, all_results: List[List[Document]], k: int = 5) -> List[Document]:
"""简单的RRF融合"""
## 简单的文档去重和排序
seen_content = set()
unique_docs = []
for results in all_results:
for doc in results:
if doc.page_content not in seen_content:
seen_content.add(doc.page_content)
unique_docs.append(doc)
return unique_docs[:k]
def retrieve(self, question: str, k: int = 5) -> List[Document]:
"""多查询检索"""
## 1. 生成查询变体
queries = self.generate_queries(question)
## 2. 对每个查询进行检索
all_results = []
for query in queries:
results = self.vectorstore.similarity_search(query, k=k)
all_results.append(results)
## 3. 简单的结果融合
fused_results = self.simple_rrf_fusion(all_results, k)
print(f"✅ 最终检索到 {len(fused_results)} 个文档")
return fused_resultspython3.4 实际应用演示#
## 创建向量数据库
vectorstore = Chroma(
collection_name="multi_query",
persist_directory="./chroma_db",
embedding_function=embeddings
)
## 准备测试文档
documents = [
Document(
page_content="""
Python文件操作完整指南
使用open()函数读取文件:
with open('file.txt', 'r') as f:
content = f.read()
写入文件:
with open('file.txt', 'w') as f:
f.write('Hello, World!')
""",
metadata={"source": "python_file_operations.md"}
),
Document(
page_content="""
Python I/O操作最佳实践
文件处理建议:
1. 总是使用with语句确保文件正确关闭
2. 处理文件编码问题
3. 使用pathlib进行路径操作
4. 处理大文件时使用分块读取
""",
metadata={"source": "python_io_best_practices.md"}
),
Document(
page_content="""
Python pathlib模块使用
pathlib提供面向对象的文件路径操作:
from pathlib import Path
## 读取文件
content = Path('file.txt').read_text()
## 写入文件
Path('file.txt').write_text('Hello, World!')
""",
metadata={"source": "python_pathlib.md"}
),
Document(
page_content="""
Python异常处理指南
文件操作中的异常处理:
try:
with open('file.txt', 'r') as f:
content = f.read()
except FileNotFoundError:
print("文件不存在")
except PermissionError:
print("没有权限")
""",
metadata={"source": "python_exception_handling.md"}
)
]
vectorstore.add_documents(documents)
## 创建多查询检索器
multi_query_retriever = MultiQueryRetriever(vectorstore, llm)
## 检索测试
question = "Python如何读取文件?"
results = multi_query_retriever.retrieve(question, k=3)
print("\n最终检索结果:")
for i, doc in enumerate(results):
print(f"\n{i+1}. {doc.page_content[:100]}...")python运行结果:
Processed prompts: 100%|██████████| 1/1 [00:00<00:00, 2.03it/s]
📝 生成了 4 个查询变体
✅ 最终检索到 3 个文档
最终检索结果:
1.
Python文件操作完整指南
使用open()函数读取文件:
with open('file.txt', 'r') as f:
content = f.read(...
2.
Python I/O操作最佳实践
文件处理建议:
1. 总是使用with语句确保文件正确关闭
2. 处理文件编码问题...
3.
Python pathlib模块使用
pathlib提供面向对象的文件路径操作:
from pathlib import Path
## 读取文件...plaintext3.5 技术优势与适用场景#
优势: ✅
- 提高召回率:多个查询变体覆盖更多相关文档
- 增强鲁棒性:对查询表述的变化不敏感
- 语义多样性:涵盖不同角度和表达方式
- 易于集成:可与现有检索系统无缝集成
适用场景:
- 复杂查询:需要多角度理解的复杂问题
- 专业领域:技术、学术等专业内容检索
- 多语言检索:支持不同语言表达方式
- 容错系统:需要高召回率的应用场景
性能考虑: ⚠️
- 计算开销:需要执行多次检索操作
- LLM依赖:查询生成依赖语言模型质量
- 结果去重:需要有效的融合和去重策略
多查询检索技术通过生成多样化的查询变体,显著提高了检索系统的召回率和鲁棒性,是构建高质量智能检索系统的重要技术之一。
Part 4: 查询扩展 - Query Expansion#
4.1 核心概念#
查询扩展是信息检索中的一种关键技术,旨在通过向原始查询中添加相关术语、同义词或上下文信息来增强查询的语义表示,从而提高检索系统的召回率。其核心思想是弥补用户查询与文档集合中相关文档之间可能存在的词汇不匹配问题。
主要方法:
- 基于同义词词典:使用预定义的词典(如 WordNet)添加同义词。
- 基于词嵌入:利用词向量模型(如 Word2Vec, GloVe)找到语义相近的词汇。
- 基于查询日志:分析历史查询数据,找到经常一起出现的查询词。
- 伪相关反馈:假设初次检索返回的顶部文档是相关的,并从中提取扩展词。
- 基于LLM的扩展:利用大语言模型强大的语义理解和生成能力,根据查询的意图和上下文生成相关的扩展术语。
本部分将重点介绍基于LLM的查询扩展和伪相关反馈两种高级策略。
4.2 实现基于LLM的查询扩展#
LLM能够深入理解查询的意图和上下文,生成高质量、语义相关的扩展词,而不仅仅是机械地添加同义词。
以下是优化后的 QueryExpander 类,它更加健壮,并提供了更好的提示词和输出处理。
from vllm import SamplingParams
class QueryExpander:
"""查询扩展器(适配本地模型)"""
def __init__(self, llm):
self.llm = llm
def expand(self, query: str) -> str:
"""扩展查询"""
## 使用ChatML格式
prompt = f"""<|im_start|>system
你是一个查询扩展专家。请为用户查询添加同义词和相关术语。
请使用以下格式:
原始术语 OR 同义词1 OR 同义词2<|im_end|>
<|im_start|>user
查询:{query}<|im_end|>
<|im_start|>assistant
扩展查询:"""
sampling_params = SamplingParams(
temperature=0.3,
top_p=0.9,
max_tokens=512,
stop=["<|im_end|>"]
)
outputs = self.llm.generate([prompt], sampling_params)
expanded = outputs[0].outputs[0].text.strip() if outputs and outputs[0].outputs else query
print(f"原始查询: {query}")
print(f"扩展查询: {expanded}")
return expanded
## 创建查询扩展器
expander = QueryExpander(llm)
## 测试扩展
original_query = "Python机器学习"
expanded_query = expander.expand(original_query)python输出内容如下:
Processed prompts: 100%|██████████| 1/1 [00:00<00:00, 5.79it/s]
原始查询: Python机器学习
扩展查询: Python数据分析 OR Python统计分析 OR Python数据挖掘plaintext4.3 实现伪相关反馈#
伪相关反馈是一种更高级的查询扩展技术,它利用初次检索的结果来指导查询扩展,特别适合文档库特定的术语和上下文。
from langchain.vectorstores import Chroma
from langchain.schema import Document
from vllm import SamplingParams
class PseudoRelevanceFeedback:
"""伪相关反馈查询扩展"""
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
def expand_query(self, query: str, top_k: int = 3) -> str:
"""使用伪相关反馈扩展查询"""
## 1. 初始检索
initial_docs = self.vectorstore.similarity_search(query, k=top_k)
## 2. 从top文档提取关键词
context = "\n\n".join([doc.page_content for doc in initial_docs])
## 3. 使用LLM生成扩展查询
prompt = f"""<|im_start|>system
你是一个查询扩展助手。基于相关文档内容,为原始查询添加重要的相关术语。
请保持简洁,只添加最重要的术语。<|im_end|>
<|im_start|>user
原始查询:{query}
相关文档:
{context}
请生成扩展查询:<|im_end|>
<|im_start|>assistant
扩展查询:"""
sampling_params = SamplingParams(
temperature=0.2,
top_p=0.9,
max_tokens=512,
stop=["<|im_end|>"]
)
outputs = self.llm.generate([prompt], sampling_params)
expanded = outputs[0].outputs[0].text.strip() if outputs and outputs[0].outputs else query
print(f"📝 原始查询: {query}")
print(f"✨ 扩展查询: {expanded}")
return expanded
def retrieve_with_expansion(self, query: str, k: int = 5):
"""使用扩展查询检索"""
## 1. 扩展查询
expanded_query = self.expand_query(query)
## 2. 使用扩展查询检索
results = self.vectorstore.similarity_search(expanded_query, k=k)
print(f"✅ 检索到 {len(results)} 个文档")
return results
## 4. 创建伪相关反馈检索器
prf = PseudoRelevanceFeedback(vectorstore, llm)
## 5. 检索测试
results = prf.retrieve_with_expansion("Python异常", k=3)
print("\n最终检索结果:")
for i, doc in enumerate(results):
print(f"{i+1}. {doc.page_content[:100]}...")
python输出内容如下:
Processed prompts: 100%|██████████| 1/1 [00:01<00:00, 1.99s/it]
📝 原始查询: Python异常
✨ 扩展查询: Python异常处理
1. Python异常处理完整指南:https://realpython.com/python-exceptions/
2. 使用try-except捕获异常:https://realpython.com/python-try-except/
3. 可以捕获特定异常类型,也可以使用finally子句:https://realpython.com/python-finally/
4. Python错误和异常类型:https://docs.python.org/3/library/exceptions.html
5. 如何在Python中优雅地处理错误:https://realpython.com/python-error-handling/
注意:在处理异常时,应只捕获你能处理的异常,使用具体的异常类型而不是Exception,提供有用的错误信息,适当时使用finally清理资源。
✅ 检索到 3 个文档
最终检索结果:
1.
如何在Python中优雅地处理错误
最佳实践:
1. 只捕获你能处理的异常
2. 使用具体的异常类型而不是Excep...
2.
Python异常处理完整指南
使用try-except捕获异常:
try:
risky_operation(...
3.
Python错误和异常类型
Python有多种内置异常类型:
- ValueError: 值错误
- TypeErr...plaintext这种查询扩展方法能够显著提高检索系统的召回率,特别是在处理专业领域查询时,能够更好地理解用户的真实信息需求。
Part 5: 混合检索 - Hybrid Search#
5.1 核心概念#
混合检索结合向量检索(语义搜索)和关键词检索(如BM25),利用两者的优势。
向量检索 vs 关键词检索
## 向量检索(语义搜索)
优点:
✅ 理解语义,能匹配同义词
✅ 能处理模糊查询
✅ 跨语言能力(多语言模型)
缺点: ❌ 对精确术语匹配不敏感 ❌ 对罕见词或专有名词效果差 ❌ 计算成本高
## 关键词检索(BM25) 优点: ✅ 精确匹配关键词 ✅ 对专有名词、代码等效果好 ✅ 计算快速
缺点: ❌ 不理解语义 ❌ 无法匹配同义词 ❌ 对查询措辞敏感
## 混合检索 = 向量检索 + 关键词检索 🎯 → 结合两者优势 → 适用于大多数场景
5.2 实现向量+BM25混合检索#
from rank_bm25 import BM25Okapi
import jieba ## 中文分词
import numpy as np
from collections import defaultdict
class HybridRetriever:
"""混合检索器(向量 + BM25)"""
def __init__(self, embeddings, documents: List[Document], weights: tuple = (0.5, 0.5)):
"""
Args:
weights: (向量权重, BM25权重),两者之和应为1.0
"""
self.embeddings = embeddings
self.documents = documents
self.vector_weight, self.bm25_weight = weights
## 1. 向量存储
self.vectorstore = Chroma(
collection_name="hybrid_search",
embedding_function=embeddings,
persist_directory="./chroma_db",
)
## 2. BM25索引
self._build_bm25_index()
def _build_bm25_index(self):
"""构建BM25索引"""
## 分词(中文使用jieba,英文可以使用split)
tokenized_docs = [
list(jieba.cut(doc.page_content)) for doc in self.documents
]
self.bm25 = BM25Okapi(tokenized_docs)
print(f"✅ BM25索引构建完成,共 {len(self.documents)} 个文档")
def _vector_search(self, query: str, k: int) -> List[tuple]:
"""向量检索,返回 (doc, score)"""
try:
results = self.vectorstore.similarity_search_with_score(query, k=k)
## 归一化分数到[0, 1]
if results:
scores = [score for _, score in results]
max_score = max(scores)
min_score = min(scores)
score_range = max_score - min_score if max_score != min_score else 1
normalized = [
(doc, 1 - (score - min_score) / score_range) ## 距离转相似度
for doc, score in results
]
return normalized
else:
return []
except Exception as e:
print(f"向量检索错误: {e}")
return []
def _bm25_search(self, query: str, k: int) -> List[tuple]:
"""BM25检索,返回 (doc, score)"""
try:
## 查询分词
tokenized_query = list(jieba.cut(query))
## BM25评分
scores = self.bm25.get_scores(tokenized_query)
## 获取top-k
if len(scores) > 0:
top_indices = np.argsort(scores)[::-1][:k]
## 归一化分数
max_score = max(scores) if max(scores) > 0 else 1
results = [
(self.documents[i], scores[i] / max_score)
for i in top_indices if scores[i] > 0
]
return results
else:
return []
except Exception as e:
print(f"BM25检索错误: {e}")
return []
def hybrid_search(self, query: str, k: int = 5) -> List[Dict]:
"""混合检索"""
print(f"🔍 混合检索: {query}")
print(f" 权重: 向量={self.vector_weight}, BM25={self.bm25_weight}")
## 1. 向量检索
vector_results = self._vector_search(query, k=k*2)
print(f"📊 向量检索结果: {len(vector_results)} 个文档")
## 2. BM25检索
bm25_results = self._bm25_search(query, k=k*2)
print(f"🔤 BM25检索结果: {len(bm25_results)} 个文档")
## 3. 合并分数
combined_scores = defaultdict(float)
doc_map = {}
## 处理向量检索结果
for doc, score in vector_results:
doc_id = id(doc)
combined_scores[doc_id] += self.vector_weight * score
doc_map[doc_id] = doc
## 处理BM25检索结果
for doc, score in bm25_results:
doc_id = id(doc)
combined_scores[doc_id] += self.bm25_weight * score
doc_map[doc_id] = doc
## 4. 排序并取top-k
sorted_results = sorted(
combined_scores.items(),
key=lambda x: x[1],
reverse=True
)[:k]
print(f"🎯 最终合并结果: {len(sorted_results)} 个文档")
## 5. 构建结果
final_results = []
for doc_id, score in sorted_results:
final_results.append({
'document': doc_map[doc_id],
'hybrid_score': score,
'content': doc_map[doc_id].page_content[:200] + "..."
if len(doc_map[doc_id].page_content) > 200 else doc_map[doc_id].page_content
})
return final_results
def search_comparison(self, query: str, k: int = 3):
"""对比三种检索方式的效果"""
print(f"\n🔬 检索方式对比: {query}")
print("=" * 60)
## 向量检索
vector_results = self._vector_search(query, k=k)
print("\n📊 向量检索结果:")
for i, (doc, score) in enumerate(vector_results):
print(f" {i+1}. [分数: {score:.4f}] {doc.page_content[:80]}...")
## BM25检索
bm25_results = self._bm25_search(query, k=k)
print("\n🔤 BM25检索结果:")
for i, (doc, score) in enumerate(bm25_results):
print(f" {i+1}. [分数: {score:.4f}] {doc.page_content[:80]}...")
## 混合检索
hybrid_results = self.hybrid_search(query, k=k)
print("\n🎯 混合检索结果:")
for i, result in enumerate(hybrid_results):
print(f" {i+1}. [混合分数: {result['hybrid_score']:.4f}] {result['document'].page_content[:80]}...")
return {
'vector': vector_results,
'bm25': bm25_results,
'hybrid': hybrid_results
}
## 创建混合检索器
hybrid_retriever = HybridRetriever(
embeddings=embeddings,
documents=documents,
weights=(0.6, 0.4) ## 60%向量,40% BM25
)
## 测试混合检索
print("=== 混合检索测试 ===")
results = hybrid_retriever.hybrid_search("Python异常处理", k=3)
print("\n混合检索结果:")
for i, item in enumerate(results):
print(f"\n{i+1}. [混合分数: {item['hybrid_score']:.4f}]")
print(f" {item['content']}")
python输出内容如下:
🔍 混合检索: Python异常处理
权重: 向量=0.6, BM25=0.4
📊 向量检索...
🔤 BM25检索...
🔀 合并结果...
混合检索结果:
1. [混合分数: 0.6000]
Python异常处理完整指南
使用try-except捕获异常:
try:
risky_operation()
except Exception as e:
print...
2. [混合分数: 0.6000]
Python异常处理完整指南
使用try-except捕获异常:
try:
risky_operation()
except Exception as e:
print...
3. [混合分数: 0.6000]
Python异常处理完整指南
使用try-except捕获异常:
try:
risky_operation()
except Exception as e:
print...plaintext混合检索通过结合语义理解和关键词匹配,能够在各种查询场景下提供更稳定、更准确的检索结果。自适应权重调整和LLM重排序进一步提升了检索系统的智能性和实用性。
综合实战:完整的高级检索系统#
将所有技术整合成一个系统
class AdvancedRetrievalSystem:
"""高级检索系统(集成所有技术)"""
def __init__(self, embeddings, llm, documents: List[Document]):
self.embeddings = embeddings
self.llm = llm
self.documents = documents
## 向量存储
self.vectorstore = Chroma(
collection_name="multi_query",
embedding_function=embeddings,
persist_directory="./chroma_db",
)
## self.vectorstore.add_documents(documents)
## 组件
self.multi_query = MultiQueryRetriever(self.vectorstore, llm)
self.reranker = LocalCrossEncoderReranker(embeddings)
self.reranker.add_documents(documents)
self.hybrid = HybridRetriever(embeddings, documents, weights=(0.6, 0.4))
self.rrf = ReciprocalRankFusion(k=60)
def retrieve(
self,
query: str,
mode: str = "hybrid_multiquery_rerank",
k: int = 5
):
"""
高级检索
Args:
mode: 检索模式
- "simple": 简单向量检索
- "hybrid": 混合检索
- "multiquery": 多查询检索
- "hybrid_multiquery": 混合+多查询
- "hybrid_multiquery_rerank": 混合+多查询+重排序(最强)
"""
print(f"🎯 检索模式: {mode}")
print(f"❓ 查询: {query}\n")
if mode == "simple":
## 简单向量检索
results = self.vectorstore.similarity_search(query, k=k)
results = [{'document': doc, 'score': 0} for doc in results]
elif mode == "hybrid":
## 混合检索
results = self.hybrid.hybrid_search(query, k=k)
elif mode == "multiquery":
## 多查询检索
results = self.multi_query.retrieve(query, k=k)
elif mode == "hybrid_multiquery":
## 混合 + 多查询
## 1. 生成查询变体
queries = self.multi_query.generate_queries(query)
## 2. 对每个查询进行混合检索
all_results = []
for q in queries:
hybrid_results = self.hybrid.hybrid_search(q, k=10)
all_results.append([item['document'] for item in hybrid_results])
## 3. RRF融合
results = self.rrf.fuse(all_results)[:k]
elif mode == "hybrid_multiquery_rerank":
## 混合 + 多查询 + 重排序(最强模式)
## 1. 生成查询变体
queries = self.multi_query.generate_queries(query)
## 2. 混合检索
all_results = []
for q in queries:
hybrid_results = self.hybrid.hybrid_search(q, k=10)
all_results.append([item['document'] for item in hybrid_results])
## 3. RRF融合
fused = self.rrf.fuse(all_results)
candidate_docs = [item['document'] for item in fused[:20]]
## 4. 交叉编码器重排序
print("\n🎯 重排序...")
results = self.reranker.retrieve_and_rerank(
query=query,
initial_k=len(candidate_docs),
final_k=k
)
return results
def query(self, question: str, mode: str = "hybrid_multiquery_rerank"):
"""执行完整的RAG查询(修复重复代码问题)"""
## 1. 检索
results = self.retrieve(question, mode=mode, k=3)
## 2. 提取文档:兼容两种格式
if results and isinstance(results[0], dict) and 'document' in results[0]:
## 格式: [{"document": Document, "score": ...}, ...]
docs = [item['document'] for item in results]
else:
## 格式: [Document, Document, ...] 或其他格式
docs = results ## 直接使用结果
## 3. 生成答案(使用ChatML格式)
context = "\n\n".join([doc.page_content for doc in docs])
## 构建ChatML格式的提示
prompt = f"""<|im_start|>system
你是一个智能问答助手。请基于提供的文档内容回答问题。如果文档中没有相关信息,请说明。
文档内容:
{context}<|im_end|>
<|im_start|>user
问题:{question}<|im_end|>
<|im_start|>assistant
"""
## 使用本地模型生成答案
from vllm import SamplingParams
sampling_params = SamplingParams(
temperature=0.1,
top_p=0.9,
max_tokens=512,
stop=["<|im_end|>"]
)
outputs = self.llm.generate([prompt], sampling_params)
answer = outputs[0].outputs[0].text.strip() if outputs and outputs[0].outputs else "未能生成答案"
return {
"question": question,
"documents": docs,
"answer": answer
}
## 使用高级检索系统
advanced_system = AdvancedRetrievalSystem(embeddings, llm, documents)python输出内容:
📥 加载交叉编码器模型: ./Models/ms-marco-MiniLM-L-6-v2/cross-encoder/ms-marco-MiniLM-L6-v2
🗑️ 已删除旧的 Chroma 集合 'local_rerank'
✅ BM25索引构建完成plaintext## 测试不同模式
modes = ["simple", "hybrid", "hybrid_multiquery_rerank", "multiquery"]
question = "Python中如何处理异常?"
for mode in modes:
print(f"\n{'='*60}")
print(f"测试模式: {mode}")
print(f"{'='*60}\n")
result = advanced_system.query(question, mode=mode)
print(f"\n💡 答案:\n{result['answer']}\n")python输出内容如下:
============================================================
测试模式: simple
============================================================
🎯 检索模式: simple
❓ 查询: Python中如何处理异常?
Processed prompts: 100%|██████████| 1/1 [00:01<00:00, 1.40s/it]
💡 答案:
在Python中,可以使用try-except语句来捕获和处理异常。try语句块中包含可能会抛出异常的代码,如果try语句块中的代码抛出了异常,那么程序会立即跳转到与该异常匹配的except语句块中。except语句块中的代码会在异常发生时被执行,可以用来处理异常,例如打印错误信息,或者执行其他恢复操作。此外,还可以使用finally语句块来确保即使在发生异常时,某些代码也会被执行。
============================================================
测试模式: hybrid
============================================================
🎯 检索模式: hybrid
❓ 查询: Python中如何处理异常?
🔍 混合检索: Python中如何处理异常?
权重: 向量=0.6, BM25=0.4
📊 向量检索...
🔤 BM25检索...
🔀 合并结果...
Processed prompts: 100%|██████████| 1/1 [00:01<00:00, 1.52s/it]
💡 答案:
在Python中,可以使用try-except语句来捕获和处理异常。try块中包含可能会引发异常的代码,如果try块中的代码引发异常,程序会立即跳转到与该异常匹配的except块中。except块中的代码会在发生异常时被执行,可以用来处理异常,例如打印错误信息、记录日志、恢复程序状态等。如果except块中没有匹配的异常,程序会继续执行。此外,还可以使用finally块来确保在try-except语句块中的代码无论如何都会被执行,无论是否发生异常。
============================================================
测试模式: hybrid_multiquery_rerank
============================================================
🎯 检索模式: hybrid_multiquery_rerank
❓ 查询: Python中如何处理异常?
Processed prompts: 100%|██████████| 1/1 [00:00<00:00, 2.24it/s]
📝 生成了 4 个查询变体
🔍 混合检索: Python中如何处理异常?
权重: 向量=0.6, BM25=0.4
📊 向量检索...
🔤 BM25检索...
🔀 合并结果...
🔍 混合检索: 1. 如何在Python中处理异常?
权重: 向量=0.6, BM25=0.4
📊 向量检索...
🔤 BM25检索...
🔀 合并结果...
🔍 混合检索: 2. 如何应对Python中的异常情况?
权重: 向量=0.6, BM25=0.4
📊 向量检索...
🔤 BM25检索...
🔀 合并结果...
🔍 混合检索: 3. 如何处理Python程序中的错误?
权重: 向量=0.6, BM25=0.4
📊 向量检索...
🔤 BM25检索...
🔀 合并结果...
🎯 重排序...
🎯 使用交叉编码器重新评分...
重排序结果:
1. [得分: 8.3246]
如何在Python中优雅地处理错误
最佳实践:
1. 只捕获你能处理的异常
2. 使用具体的异常类型而不是Excep...
2. [得分: 7.6097]
Python异常处理完整指南
使用try-except捕获异常:
try:
risky_operation(...
3. [得分: 7.5047]
Python错误和异常类型
Python有多种内置异常类型:
- ValueError: 值错误
- TypeErr...
Processed prompts: 100%|██████████| 1/1 [00:01<00:00, 1.14s/it]
💡 答案:
在Python中,可以使用try-except语句来优雅地处理异常。try块中包含可能会抛出异常的代码,如果try块中的代码抛出异常,程序会立即跳转到与之匹配的except块中。except块中的代码会在异常发生时被执行,可以用来处理异常或捕获特定类型的异常。此外,还可以使用finally子句来清理资源,无论是否发生异常。
============================================================
测试模式: multiquery
============================================================
🎯 检索模式: multiquery
❓ 查询: Python中如何处理异常?
Processed prompts: 100%|██████████| 1/1 [00:00<00:00, 2.24it/s]
📝 生成了 4 个查询变体
✅ 最终检索到 2 个文档
Processed prompts: 100%|██████████| 1/1 [00:01<00:00, 1.22s/it]
💡 答案:
在Python中,可以使用try-except语句来捕获和处理异常。try块中包含可能会引发异常的代码,如果try块中的代码引发了异常,程序会立即跳转到与之匹配的except块中。except块中的代码会在异常发生时被执行,可以用来处理异常或提供有用的错误信息。此外,还可以使用finally子句来确保在try-except语句块中的代码无论如何都会被执行,即使没有引发异常。plaintext实验总结#
重排序: 使用交叉编码器提升精度 RRF融合: 简单有效的结果融合方法 多查询: 提高召回率 混合检索: 结合向量和关键词检索
## 技术选择指南
RETRIEVAL_STRATEGIES = {
"快速原型": {
"策略": "simple",
"说明": "简单向量检索",
"适用": "快速验证想法,数据量小"
},
"生产环境基础": {
"策略": "hybrid",
"说明": "混合检索(向量+BM25)",
"适用": "大多数生产场景,平衡速度和质量"
},
"高召回率": {
"策略": "multiquery",
"说明": "多查询检索",
"适用": "需要全面覆盖,不要遗漏相关文档"
},
"高精度": {
"策略": "hybrid_multiquery_rerank",
"说明": "混合+多查询+重排序",
"适用": "对质量要求极高,可以牺牲速度"
},
"实时应用": {
"策略": "hybrid + 缓存",
"说明": "混合检索+结果缓存",
"适用": "需要快速响应的应用"
}
}
def choose_strategy(priority: str):
"""根据优先级选择策略"""
strategy = RETRIEVAL_STRATEGIES.get(priority)
if strategy:
print(f"推荐策略: {strategy['策略']}")
print(f"说明: {strategy['说明']}")
print(f"适用场景: {strategy['适用']}")
return strategypython技术组合建议
检索质量层级:
Level 1: 向量检索 (基础)
↓
Level 2: 混合检索 (向量 + BM25)
↓
Level 3: 混合检索 + RRF融合
↓
Level 4: 多查询 + 混合检索 + RRF
↓
Level 5: 多查询 + 混合检索 + RRF + 重排序 (最强)
根据应用场景选择合适的级别:
- 快速原型: Level 1
- 生产基础: Level 2-3
- 高质量应用: Level 4-5plaintext