

代码开源Github地址 ↗
RAG高级索引与检索策略:提升检索质量的关键#
在前面的章节中,我们学习了查询优化和路由技术。但是,检索质量不仅取决于查询,还取决于如何组织和索引文档。本章将深入探讨高级索引和检索策略。
为什么需要高级索引?#
基础索引的局限性
问题1: 文档过长
- 一个10000字的文档被整体嵌入
- 语义信息过于粗糙
- 检索不精确
问题2: 上下文丢失
- 将文档分成小块
- 每块独立检索
- 丢失了块与块之间的关系
- 无法理解完整上下文
问题3: 多语义内容
- 一个文档包含多个主题
- 单一向量无法表示所有语义
- 相关内容可能被遗漏
问题4: 检索冗余
- 检索到大量文档
- 很多内容重复或不相关
- 影响最终答案质量
本章技术概览#
| 技术 | 核心目标 | 适用场景 | 复杂度 |
|---|---|---|---|
| 智能分块 | 优化块大小和边界 | 所有RAG系统 | ⭐ |
| 多向量索引 | 一个文档多个向量 | 多主题文档 | ⭐⭐⭐ |
| 父文档检索 | 检索小块返回大块 | 保持上下文 | ⭐⭐ |
| 上下文压缩 | 压缩检索结果 | 减少冗余 | ⭐⭐⭐ |
| 时间衰减检索 | 考虑文档新鲜度 | 时效性内容 | ⭐⭐ |
Part 1: 智能文档分块策略#
1.1 核心概念#
文档分块(Chunking)是将长文档切分成更小片段的过程。好的分块策略能显著提升检索质量。
1.2 分块方法对比#
方法1: 固定长度分块
def fixed_length_split(text: str, chunk_size: int = 500) -> List[str]:
"""简单但可能切断语义"""
return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]python方法2: 句子级分块
def sentence_split(text: str, max_sentences: int = 5) -> List[str]:
"""保持语义完整但长度不均"""
sentences = text.split('。')
chunks = []
current_chunk = []
for sent in sentences:
current_chunk.append(sent)
if len(current_chunk) >= max_sentences:
chunks.append('。'.join(current_chunk))
current_chunk = []
if current_chunk:
chunks.append('。'.join(current_chunk))
return chunkspython方法3: 语义分块 ✅ 推荐
def semantic_split(text: str, embeddings) -> List[str]:
"""基于语义相似度分块"""
# 将在下面实现
passpython1.3 普通文本分割器#
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 创建文本分割器
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # 每块的目标大小
chunk_overlap=200, # 块之间的重叠
length_function=len, # 长度计算函数
separators=[ # 分隔符优先级
"\n\n", # 段落
"\n", # 行
" ", # 单词
"" # 字符
]
)
# 示例文档
document = """
# Python编程基础
## 变量和数据类型
Python是一种动态类型语言,这意味着你不需要声明变量的类型。
Python支持多种数据类型,包括整数、浮点数、字符串等。
## 控制流
Python使用缩进来定义代码块。
if语句用于条件判断,for循环用于迭代。
## 函数
函数是可重用的代码块。
使用def关键字定义函数。
"""
# 分块
chunks = text_splitter.split_text(document)
print(f"文档被分成 {len(chunks)} 块")
for i, chunk in enumerate(chunks):
print(f"\n块 {i+1}:")
print(chunk[:100] + "...")python运行结果:
文档被分成 1 块
块 1:
# Python编程基础
## 变量和数据类型
Python是一种动态类型语言,这意味着你不需要声明变量的类型。
Python支持多种数据类型,包括整数、浮点数、字符串等。
## 控制流
Py...plaintext1.4 语义分块#
处理流程:
输入文本
↓
分割句子(中文优化)
↓
计算每个句子的嵌入
↓
计算相邻句子的相似度
↓
在相似度低的地方切分
↓
输出语义分块结果plaintextfrom langchain_community.embeddings import HuggingFaceEmbeddings
import numpy as np
from typing import List
import re
class SemanticChunker:
"""基于语义相似度的分块器(适配本地模型)"""
def __init__(self, embeddings, similarity_threshold: float = 0.6):
self.embeddings = embeddings
self.similarity_threshold = similarity_threshold
def cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
"""计算余弦相似度"""
if np.linalg.norm(vec1) == 0 or np.linalg.norm(vec2) == 0:
return 0.0
return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
def _split_sentences(self, text: str) -> List[str]:
"""分割句子(中文优化)"""
sentence_endings = r'[。!?;\n]'
sentences = re.split(sentence_endings, text)
sentences = [s.strip() for s in sentences if s.strip()]
sentences = [s + '。' for s in sentences if not s.endswith(('。', '!', '?', ';'))]
return sentences
def _get_sentence_embeddings(self, sentences: List[str]) -> List[np.ndarray]:
"""获取句子嵌入"""
embeddings = []
for sent in sentences:
emb = self.embeddings.embed_query(sent)
embeddings.append(np.array(emb))
return embeddings
def split_text(self, text: str) -> List[str]:
"""基于语义相似度分块"""
if not text:
return [text]
# 1. 按句子分割
sentences = self._split_sentences(text)
if len(sentences) <= 1:
return [text]
# 2. 计算每个句子的嵌入
embeddings = self._get_sentence_embeddings(sentences)
# 3. 计算相邻句子的相似度
similarities = []
for i in range(len(embeddings) - 1):
sim = self.cosine_similarity(embeddings[i], embeddings[i+1])
similarities.append(sim)
# 4. 在相似度低的地方切分
chunks = []
current_chunk = [sentences[0]]
for i, sim in enumerate(similarities):
if sim < self.similarity_threshold:
chunks.append(''.join(current_chunk))
current_chunk = [sentences[i+1]]
else:
current_chunk.append(sentences[i+1])
# 添加最后一块
if current_chunk:
chunks.append(''.join(current_chunk))
return chunks
# 创建语义分块器
semantic_chunker = SemanticChunker(embeddings, similarity_threshold=0.5)
# 测试文本
text = """
机器学习是人工智能的一个分支。它使计算机能够从数据中学习。
深度学习是机器学习的一个子领域。它使用神经网络来建模复杂模式。
Python是一种流行的编程语言。它广泛用于数据科学和机器学习。
Python有丰富的库生态系统。NumPy和Pandas是常用的数据处理库。
"""
chunks = semantic_chunker.split_text(text)
print(f"语义分块结果: {len(chunks)} 块")
for i, chunk in enumerate(chunks):
print(f"\n块 {i+1}:\n{chunk}")python运行结果:
语义分块结果: 7 块
块 1:
机器学习是人工智能的一个分支。
块 2:
它使计算机能够从数据中学习。
块 3:
深度学习是机器学习的一个子领域。
块 4:
它使用神经网络来建模复杂模式。
块 5:
Python是一种流行的编程语言。
块 6:
它广泛用于数据科学和机器学习。
块 7:
Python有丰富的库生态系统。NumPy和Pandas是常用的数据处理库。plaintext1.5 分块最佳实践#
流程:
输入文档
↓
按文档结构分割(标题、段落)
↓
检查每个章节的大小
↓
对过长的章节进行二次分割
↓
为每个块添加元数据
↓
输出结构化分块结果plaintextfrom typing import List, Dict, Any
class SmartChunker:
"""智能分块器 - 适配本地模型"""
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200,
min_chunk_size: int = 100,
max_chunk_size: int = 2000
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.min_chunk_size = min_chunk_size
self.max_chunk_size = max_chunk_size
def split_by_structure(self, text: str) -> List[str]:
"""按文档结构分块"""
if not text or len(text) < self.min_chunk_size:
return [text]
# 按标题分割
sections = text.split('\n# ')
chunks = []
for section in sections:
if not section.strip():
continue
# 如果章节太长,进一步分割
if len(section) > self.max_chunk_size:
sub_chunks = self._split_long_section(section)
chunks.extend(sub_chunks)
elif len(section) >= self.min_chunk_size:
chunks.append(section)
return chunks
def _split_long_section(self, text: str) -> List[str]:
"""分割过长的章节"""
paragraphs = text.split('\n\n')
chunks = []
current_chunk = []
current_length = 0
for para in paragraphs:
para_length = len(para)
if current_length + para_length > self.chunk_size and current_chunk:
# 当前块已满,保存
chunks.append('\n\n'.join(current_chunk))
# 处理重叠
if len(current_chunk) > 1:
current_chunk = [current_chunk[-1], para]
current_length = len(current_chunk[-1]) + para_length
else:
current_chunk = [para]
current_length = para_length
else:
current_chunk.append(para)
current_length += para_length
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
return chunks
def add_metadata_to_chunks(
self,
chunks: List[str],
doc_metadata: dict
) -> List[Dict[str, Any]]:
"""为每个块添加元数据"""
chunk_docs = []
for i, chunk in enumerate(chunks):
chunk_docs.append({
'content': chunk,
'metadata': {
**doc_metadata,
'chunk_id': i,
'chunk_total': len(chunks),
'chunk_size': len(chunk)
}
})
return chunk_docs
smart_chunker = SmartChunker(
chunk_size=800,
chunk_overlap=150,
min_chunk_size=200
)
# 示例文档
document = """
# 机器学习基础
机器学习是人工智能的一个重要分支。它使计算机能够从数据中学习。
# 深度学习
深度学习是机器学习的一个子领域。它使用神经网络来建模复杂模式。
# Python编程
Python是一种流行的编程语言。它广泛用于数据科学和机器学习。
Python有丰富的库生态系统。NumPy和Pandas是常用的数据处理库。
"""
# 按结构分块
chunks = smart_chunker.split_by_structure(document)
# 添加元数据
chunk_docs = smart_chunker.add_metadata_to_chunks(
chunks,
doc_metadata={'source': 'python_tutorial.md', 'author': '张三'}
)
# 输出结果
for doc in chunk_docs:
print(f"\n块 {doc['metadata']['chunk_id'] + 1}/{doc['metadata']['chunk_total']}")
print(f"大小: {doc['metadata']['chunk_size']} 字符")
print(f"内容: {doc['content'][:100]}...")
print(f"metadata: {doc['metadata']}")python运行结果:
块 1/1
大小: 166 字符
内容:
# 机器学习基础
机器学习是人工智能的一个重要分支。它使计算机能够从数据中学习。
# 深度学习
深度学习是机器学习的一个子领域。它使用神经网络来建模复杂模式。
# Python编程
Python...
metadata: {'source': 'python_tutorial.md', 'author': '张三', 'chunk_id': 0, 'chunk_total': 1, 'chunk_size': 166}plaintext1.6 分块策略总结#
选择指南:
- 固定长度分块:适合格式规整的文档,处理速度快
- 句子级分块:适合自然语言文本,保持语义完整性
- 语义分块:适合复杂文档,保证语义连贯性
- 智能分块:综合最优方案,推荐生产环境使用
最佳实践建议:
- 根据文档类型选择合适的分块策略
- 设置合理的块大小和重叠区域
- 为每个块添加丰富的元数据
- 考虑文档的语义边界和结构特点
- 测试不同分块策略对检索效果的影响
通过智能分块策略,我们可以显著提升RAG系统的检索质量和准确性,为后续的高级检索技术奠定坚实基础。
Part 2: 多向量索引 - Multi-Vector Indexing#
2.1 核心概念#
多向量索引为单个文档生成多个向量,每个向量代表文档的不同方面或部分。这样可以更全面地表示文档的语义。
2.2 为什么需要多向量索引?#
场景:一篇包含多个主题的文档
文档内容:
"""
本文介绍Python编程基础。
第一部分:变量和数据类型
Python支持多种数据类型...
第二部分:函数和模块
函数是可重用的代码块...
第三部分:面向对象编程
类是对象的蓝图...
"""
用户查询: "Python中的类是什么?"python单向量索引的问题:
- 整个文档被表示为一个向量
- 向量混合了所有主题的语义
- 可能无法精确匹配”类”的相关内容
多向量索引的优势: ✅
- 为每个部分生成独立向量
- “面向对象编程”部分的向量更匹配查询
- 检索更精确
2.3 实现多向量检索器#
处理流程:
输入文档
↓
为每个文档生成唯一ID
↓
存储完整文档到文档存储
↓
将文档分割成小块
↓
为每个小块添加文档ID元数据
↓
将小块向量化并存储到向量数据库
↓
构建多向量检索器
↓
完成索引构建plaintextclass MultiVectorIndexer:
"""多向量索引器"""
def __init__(self, embeddings, persist_directory: str = "./chroma_db"):
self.embeddings = embeddings
self.persist_directory = persist_directory
# 先删除现有集合
try:
existing_chroma = Chroma(
collection_name="multi_vector",
persist_directory=persist_directory,
embedding_function=embeddings
)
existing_chroma.delete_collection()
print("🗑️ 已删除现有集合")
except:
print("ℹ️ 无需删除集合,继续初始化")
# 重新创建向量存储
self.vectorstore = Chroma(
collection_name="multi_vector",
persist_directory=persist_directory,
embedding_function=embeddings
)
# 文档存储:存储完整文档
self.docstore = InMemoryStore()
# 多向量检索器(返回完整文档)
self.retriever = MultiVectorRetriever(
vectorstore=self.vectorstore,
docstore=self.docstore,
id_key="doc_id"
)
def index_documents(self, documents: List[str], doc_ids: List[str] = None):
"""索引文档"""
if doc_ids is None:
doc_ids = [str(uuid.uuid4()) for _ in documents]
print(f"📝 索引 {len(documents)} 个文档,生成 {len(doc_ids)} 个文档ID")
# 1. 存储完整文档
for doc_id, doc in zip(doc_ids, documents):
self.docstore.mset([(doc_id, doc)])
print(f"✅ 存储文档: {doc_id} (长度: {len(doc)} 字符)")
# 2. 将每个文档分成小块
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=100,
chunk_overlap=10
)
sub_docs = []
for doc_id, doc in zip(doc_ids, documents):
chunks = text_splitter.split_text(doc)
print(f"📊 文档 {doc_id} 分割为 {len(chunks)} 个小块")
for chunk in chunks:
sub_docs.append(
Document(
page_content=chunk,
metadata={"doc_id": doc_id}
)
)
# 3. 将小块向量化并存储
if sub_docs:
self.vectorstore.add_documents(sub_docs)
self.vectorstore.persist()
print(f"✅ 存储 {len(sub_docs)} 个小块到向量数据库")
else:
print("⚠️ 没有小块可存储")
def retrieve_full_documents(self, query: str, k: int = 4):
"""检索完整文档(返回字符串)"""
docs = self.retriever.get_relevant_documents(query, k=k)
return docs
def retrieve_chunks(self, query: str, k: int = 4):
"""检索小块(返回Document对象)"""
docs = self.vectorstore.similarity_search(query, k=k)
return docs
def debug_info(self):
"""调试信息"""
print("🔍 调试信息:")
try:
# 检查向量数据库
all_docs = self.vectorstore.get()
print(f"📊 向量数据库: {len(all_docs['documents'])} 个小块")
# 检查文档存储
all_doc_ids = list(set([metadata['doc_id'] for metadata in all_docs['metadatas']]))
print(f"📚 文档存储: {len(all_doc_ids)} 个文档ID")
for doc_id in all_doc_ids:
doc = self.docstore.mget([doc_id])[0]
if doc:
print(f"✅ 文档 {doc_id}: 存在 ({len(doc)} 字符)")
else:
print(f"❌ 文档 {doc_id}: 不存在")
except Exception as e:
print(f"⚠️ 调试信息获取失败: {e}")python2.4 实际应用演示#
# 创建多向量索引器,指定Chroma数据库路径
multi_indexer = MultiVectorIndexer(
embeddings=embeddings,
persist_directory="./chroma_db" # 本地Chroma数据库路径
)
# 准备文档
documents = [
"""
Python编程基础教程
第一章:变量和数据类型
Python是动态类型语言,支持整数、浮点数、字符串等多种数据类型。
第二章:控制流
Python使用if、for、while等关键字进行流程控制。
第三章:函数
函数是可重用的代码块,使用def关键字定义。
""",
"""
机器学习入门
监督学习:使用标记数据训练模型。
无监督学习:从无标记数据中发现模式。
强化学习:通过奖励机制学习最优策略。
"""
]
# 索引文档
multi_indexer.index_documents(documents)
# 调试信息
multi_indexer.debug_info()python运行结果:
📝 索引 2 个文档,生成 2 个文档ID
✅ 存储文档: 36cf53c2-968a-4540-9f75-e2e0e02a9d04 (长度: 178 字符)
✅ 存储文档: 8e2af8dd-e458-4f04-bc14-01d02552e8c9 (长度: 88 字符)
📊 文档 36cf53c2-968a-4540-9f75-e2e0e02a9d04 分割为 2 个小块
📊 文档 8e2af8dd-e458-4f04-bc14-01d02552e8c9 分割为 1 个小块
✅ 存储 3 个小块到向量数据库
🔍 调试信息:
📊 向量数据库: 3 个小块
📚 文档存储: 2 个文档ID
✅ 文档 36cf53c2-968a-4540-9f75-e2e0e02a9d04: 存在 (178 字符)
✅ 文档 8e2af8dd-e458-4f04-bc14-01d02552e8c9: 存在 (88 字符)plaintext2.5 检索测试#
测试1:机器学习相关查询
print("\n🔍 检索测试:")
results = multi_indexer.retrieve_full_documents("什么是机器学习?", k=2)
for i, doc in enumerate(results):
print(f"结果 {i+1}: {doc[:100]}...")python运行结果:
🔍 检索测试:
结果 1:
机器学习入门
监督学习:使用标记数据训练模型。
无监督学习:从无标记数据中发现模式。
强化学习:通过奖励机制学习最优策略。
...
结果 2:
Python编程基础教程
第一章:变量和数据类型
Python是动态类型语言,支持整数、浮点数、字符串等多种数据类型。
第二章:控制流
P...plaintext测试2:Python编程相关查询
print("\n🔍 检索测试:")
results = multi_indexer.retrieve_full_documents("什么是python编程?", k=2)
for i, doc in enumerate(results):
print(f"结果 {i+1}: {doc[:100]}...")python运行结果:
🔍 检索测试:
结果 1:
Python编程基础教程
第一章:变量和数据类型
Python是动态类型语言,支持整数、浮点数、字符串等多种数据类型。
第二章:控制流
P...
结果 2:
机器学习入门
监督学习:使用标记数据训练模型。
无监督学习:从无标记数据中发现模式。
强化学习:通过奖励机制学习最优策略。
...plaintext2.6 多向量检索机制详解#
检索流程:
用户查询 "什么是机器学习?"
↓
在向量数据库中搜索相似的小块
↓
找到相关小块:[小块2, 小块5, 小块8...]
↓
获取小块的 doc_id:[doc_id_1, doc_id_2, doc_id_1...]
↓
根据 doc_id 从文档存储中检索完整文档
↓
返回完整文档:[完整文档1, 完整文档2...]plaintext2.7 多向量检索优缺点分析#
优点: ✅
- 更精确的语义表示:每个文档部分都有独立的向量表示
- 提高相关性得分:可以精确匹配文档的特定部分
- 灵活的检索策略:支持多种检索模式(完整文档 vs 小块)
- 更好的上下文理解:返回完整文档保持上下文完整性
缺点: ⚠️
- 存储成本增加:需要存储更多向量和元数据
- 索引时间更长:需要为每个文档生成多个向量
- 实现复杂度高:需要管理文档存储和向量存储的同步
- 检索延迟增加:需要额外的文档查找步骤
2.8 适用场景建议#
推荐使用多向量索引当:
- 文档包含多个独立主题或章节
- 需要精确匹配文档的特定部分
- 检索质量比存储成本更重要
- 文档结构清晰,可以自然分割
选择单向量索引当:
- 文档内容单一,语义集中
- 存储资源有限
- 需要快速索引和检索
- 文档结构不清晰,难以分割
多向量索引技术通过为文档的不同部分创建独立的向量表示,显著提升了检索的精确性和相关性,是处理复杂文档结构的有效解决方案。
Part 3: 父文档检索器 - Parent Document Retriever#
3.1 核心概念#
父文档检索器的策略是:
- 将文档分成小块进行索引和检索(精确匹配)
- 返回大块或完整文档给LLM(保持上下文)
3.2 工作原理#
索引阶段:
大文档
↓ 分块
小块1、小块2、小块3
↓ 向量化
存储在向量数据库plaintext检索阶段:
用户查询
↓ 检索
匹配小块2
↓ 查找父文档
返回:包含小块2的大块/完整文档plaintext3.3 实现父文档检索器#
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.storage import InMemoryStore
from langchain.vectorstores import Chroma
from langchain.schema import Document
from typing import List
import shutil
class ParentDocRetriever:
"""父文档检索器(修复元数据键问题)"""
def __init__(self, embeddings, persist_directory: str = "./chroma_db"):
self.embeddings = embeddings
self.persist_directory = persist_directory
self.collection_name = "parent_doc"
# 清理指定集合
self._clean_collection()
# 向量存储(存储子文档)
self.vectorstore = Chroma(
collection_name=self.collection_name,
persist_directory=persist_directory,
embedding_function=embeddings
)
# 文档存储(存储父文档)
self.docstore = InMemoryStore()
# 子文档分割器(用于检索的小块)
self.child_splitter = RecursiveCharacterTextSplitter(
chunk_size=400,
chunk_overlap=50
)
# 父文档分割器(用于返回的大块)
self.parent_splitter = RecursiveCharacterTextSplitter(
chunk_size=2000,
chunk_overlap=200
)
def _clean_collection(self):
"""清理指定集合"""
try:
# 先尝试连接到现有集合
existing_vectorstore = Chroma(
collection_name=self.collection_name,
persist_directory=self.persist_directory,
embedding_function=self.embeddings
)
# 删除集合
existing_vectorstore.delete_collection()
print(f"🗑️ 清理集合: {self.collection_name}")
except Exception as e:
# 如果集合不存在,创建目录
import os
os.makedirs(self.persist_directory, exist_ok=True)
print(f"ℹ️ 集合不存在,将创建新集合: {self.collection_name}")
def add_documents(self, documents: List[Document]):
"""添加文档(修复元数据键问题)"""
print("📝 开始索引文档...")
for i, doc in enumerate(documents):
print(f"\n📄 处理文档 {i+1}/{len(documents)}")
# 1. 用父文档分割器分割成大块
parent_chunks = self.parent_splitter.split_documents([doc])
print(f" 🔧 父文档分割: {len(parent_chunks)} 个大块")
for j, parent_chunk in enumerate(parent_chunks):
# 为每个父文档生成唯一ID
parent_id = f"parent_{i}_{j}"
# 2. 用子文档分割器将大块分割成小块
child_chunks = self.child_splitter.split_documents([parent_chunk])
print(f" 🔍 子文档分割: 大块 {j+1} → {len(child_chunks)} 个小块")
# 3. 存储父文档到文档存储
self.docstore.mset([(parent_id, parent_chunk.page_content)])
# 4. 为每个子文档添加父文档ID元数据,并存储到向量数据库
child_docs_with_metadata = []
for k, child_chunk in enumerate(child_chunks):
child_doc = Document(
page_content=child_chunk.page_content,
metadata={"doc_id": parent_id, "source": doc.metadata.get("source", "unknown")}
)
child_docs_with_metadata.append(child_doc)
self.vectorstore.add_documents(child_docs_with_metadata)
# 持久化保存
self.vectorstore.persist()
print(f"✅ 索引完成: 所有文档已存储")
def retrieve(self, query: str, k: int = 2):
"""检索父文档(修复元数据键问题)"""
print(f"\n🔍 开始检索: '{query}'")
# 1. 在向量数据库中搜索相似子文档
print("1. 🔎 在向量数据库中搜索相似子文档...")
child_docs = self.vectorstore.similarity_search(query, k=k*3)
print(f" 找到 {len(child_docs)} 个相关子文档")
for i, child_doc in enumerate(child_docs):
# 使用正确的元数据键 'doc_id' 而不是 'parent_id'
parent_id = child_doc.metadata.get("doc_id")
print(f" 子文档 {i+1}: {child_doc.page_content[:50]}... (父文档ID: {parent_id})")
# 2. 提取父文档ID
print("\n2. 🆔 提取父文档ID...")
parent_ids = [doc.metadata.get("doc_id") for doc in child_docs]
print(f" 父文档ID列表: {parent_ids}")
# 3. 从文档存储中检索完整父文档
print("\n3. 📚 从文档存储中检索完整父文档...")
parent_docs = self.docstore.mget(parent_ids)
print(f" 检索到 {len([doc for doc in parent_docs if doc])} 个父文档")
# 4. 去重和排序
print("\n4. 🔄 去重和排序...")
seen = set()
unique_docs = []
for i, parent_doc in enumerate(parent_docs):
if parent_doc and parent_doc not in seen:
seen.add(parent_doc)
unique_docs.append(Document(
page_content=parent_doc,
metadata={"doc_id": parent_ids[i]}
))
print(f" 保留父文档: {parent_ids[i]} (长度: {len(parent_doc)} 字符)")
# 5. 返回前k个结果
final_results = unique_docs[:k]
print(f"\n✅ 检索完成: 返回 {len(final_results)} 个父文档")
return final_results
def debug_storage(self):
"""调试存储状态"""
print("\n📊 存储状态调试:")
# 检查向量数据库中的子文档
all_child_docs = self.vectorstore.get()
print(f"🔍 向量数据库: {len(all_child_docs['documents'])} 个子文档")
# 检查元数据
if all_child_docs['metadatas']:
first_metadata = all_child_docs['metadatas'][0]
print(f" 第一个子文档的元数据: {first_metadata}")
# 获取所有父文档ID(使用正确的键 'doc_id')
parent_ids = []
for metadata in all_child_docs['metadatas']:
if metadata and 'doc_id' in metadata:
parent_ids.append(metadata['doc_id'])
print(f"📚 文档存储: {len(set(parent_ids))} 个父文档ID")
print(f" 父文档ID列表: {list(set(parent_ids))}")
else:
print("⚠️ 没有找到元数据")python3.4 实际应用演示#
# 创建父文档检索器
parent_retriever = ParentDocRetriever(
embeddings=embeddings,
persist_directory="./chroma_db"
)
# 准备文档
docs = [
Document(
page_content="""
Python编程语言完整指南
第一部分:基础语法
Python使用缩进来定义代码块。变量不需要声明类型。
支持多种数据类型,包括整数、浮点数、字符串、列表、字典等。
第二部分:控制流
if语句用于条件判断:
if condition:
do_something()
for循环用于迭代:
for item in items:
process(item)
while循环用于重复执行:
while condition:
do_something()
第三部分:函数
使用def关键字定义函数:
def function_name(parameters):
# function body
return result
""",
metadata={"source": "python_guide.md"}
)
]
# 添加文档
parent_retriever.add_documents(docs)
# 调试存储状态
parent_retriever.debug_storage()python运行结果:
📝 开始索引文档...
📄 处理文档 1/1
🔧 父文档分割: 1 个大块
🔍 子文档分割: 大块 1 → 2 个小块
✅ 索引完成: 所有文档已存储
📊 存储状态调试:
🔍 向量数据库: 2 个子文档
第一个子文档的元数据: {'doc_id': 'parent_0_0', 'source': 'python_guide.md'}
📚 文档存储: 1 个父文档ID
父文档ID列表: ['parent_0_0']plaintext3.5 检索测试#
print("\n" + "="*60)
results = parent_retriever.retrieve("Python中for循环怎么用?", k=1)
# 显示结果
print("\n🎯 最终结果:")
for i, doc in enumerate(results):
print(f"文档 {i+1}:")
print(f"内容预览: {doc.page_content[:100]}...")
print(f"父文档ID: {doc.metadata['doc_id']}")python运行结果:
============================================================
🔍 开始检索: 'Python中for循环怎么用?'
1. 🔎 在向量数据库中搜索相似子文档...
找到 2 个相关子文档
子文档 1: Python编程语言完整指南
第一部分:基础语法
... (父文档ID: parent_0_0)
子文档 2: 第三部分:函数
使用def关键字定义函数:
def function... (父文档ID: parent_0_0)
2. 🆔 提取父文档ID...
父文档ID列表: ['parent_0_0', 'parent_0_0']
3. 📚 从文档存储中检索完整父文档...
检索到 2 个父文档
4. 🔄 去重和排序...
保留父文档: parent_0_0 (长度: 515 字符)
✅ 检索完成: 返回 1 个父文档
🎯 最终结果:
文档 1:
内容预览: Python编程语言完整指南
第一部分:基础语法
Python使用缩进来定义代码块。变量不需要声明类型。
支持多种数据类型,包括整数、...
父文档ID: parent_0_0plaintext3.6 父文档检索的优势#
最佳场景:
- ✅ 需要精确匹配 + 完整上下文
- ✅ 文档有明确的层次结构
- ✅ 答案需要周围的解释
- ✅ 避免上下文丢失
技术优势:
- 检索精度高:使用小块进行相似性搜索,匹配更精确
- 上下文完整:返回父文档级别的完整内容
- 避免信息碎片化:保持相关内容的连贯性
- 支持复杂查询:能够处理需要多段上下文的复杂问题
适用场景:
- 技术文档检索:如API文档、编程教程
- 学术论文搜索:需要完整段落理解概念
- 法律文档分析:需要完整条款上下文
- 医疗记录查询:需要完整病历信息
3.7 与其他检索策略对比#
| 策略 | 检索粒度 | 返回粒度 | 优势 | 劣势 |
|---|---|---|---|---|
| 标准检索 | 文档块 | 文档块 | 简单快速 | 上下文可能不完整 |
| 多向量索引 | 文档小块 | 完整文档 | 精确匹配 | 存储成本高 |
| 父文档检索 | 文档小块 | 父文档块 | 平衡精度和上下文 | 实现复杂度中等 |
父文档检索器通过”小块检索,大块返回”的策略,在保持检索精度的同时提供了完整的上下文信息,是处理需要深度理解的复杂查询的理想选择。
Part 4: 上下文压缩检索 - Contextual Compression#
4.1 核心概念#
上下文压缩检索在检索后对文档进行过滤和压缩,只保留与查询最相关的内容。
4.2 为什么需要压缩?#
问题:检索冗余
用户查询: "Python中的列表推导式是什么?"
检索到的文档:
"""
Python高级特性完整指南
1. 列表推导式
列表推导式是创建列表的简洁方式...
2. 生成器表达式
生成器用于惰性计算...
3. 装饰器
装饰器用于修改函数行为...
4. 上下文管理器
with语句用于资源管理...
"""python问题:
- → 只有”列表推导式”部分相关
- → 其他部分是噪声
- → 浪费LLM的上下文窗口
- → 可能影响答案质量
解决方案:上下文压缩 ✅
- → 只提取相关部分:“列表推导式是创建列表的简洁方式…”
- → 节省tokens
- → 提高答案精度
4.3 实现LLM过滤器#
from langchain.vectorstores import Chroma
from langchain.schema import Document
from vllm import SamplingParams
class VLLMCompressedRetriever:
"""vLLM压缩检索器(ChatML格式)"""
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
self.base_retriever = vectorstore.as_retriever()
def retrieve(self, query: str, k: int = 3):
"""检索并压缩"""
# 基础检索
base_docs = self.base_retriever.get_relevant_documents(query, k=k)
# 压缩文档
compressed_docs = []
for doc in base_docs:
# 使用ChatML格式
prompt = f"""<|im_start|>system
你是一个文档压缩助手。请从给定的文档中提取与用户查询相关的关键信息,去除无关内容。
请只返回提取的关键信息,不要添加任何解释或评论。<|im_end|>
<|im_start|>user
查询:{query}
文档内容:
{doc.page_content}
请提取与查询相关的关键信息:<|im_end|>
<|im_start|>assistant
"""
sampling_params = SamplingParams(
temperature=0.1,
top_p=0.9,
max_tokens=300,
stop=["<|im_end|>", "<|endoftext|>"]
)
outputs = self.llm.generate([prompt], sampling_params)
if outputs and outputs[0].outputs:
compressed_content = outputs[0].outputs[0].text.strip()
compressed_doc = Document(
page_content=compressed_content,
metadata=doc.metadata
)
compressed_docs.append(compressed_doc)
return compressed_docspython4.4 实际应用演示#
# 创建向量数据库
vectorstore = Chroma(
collection_name="docs",
persist_directory="./chroma_db",
embedding_function=embeddings
)
# 添加文档
docs = [
Document(
page_content="""
Python高级特性
列表推导式:
列表推导式是Python中创建列表的简洁方式。
语法:[expression for item in iterable if condition]
示例:squares = [x**2 for x in range(10)]
生成器表达式:
生成器用于惰性计算,节省内存。
语法:(expression for item in iterable)
装饰器:
装饰器用于修改函数行为,不改变原函数代码。
使用@符号应用装饰器。
""",
metadata={"source": "python_advanced.md"}
)
]
vectorstore.add_documents(docs)
# 创建压缩检索器
compressed_retriever = VLLMCompressedRetriever(vectorstore, llm)
# 测试查询
query = "Python列表推导式"
print("🔍 普通检索:")
normal_docs = base_retriever.get_relevant_documents(query)
print(f"长度: {len(normal_docs[0].page_content)} 字符")
print(normal_docs[0].page_content)python运行结果:
🔍 普通检索:
长度: 355 字符
Python高级特性
列表推导式:
列表推导式是Python中创建列表的简洁方式。
语法:[expression for item in iterable if condition]
示例:squares = [x**2 for x in range(10)]
生成器表达式:
生成器用于惰性计算,节省内存。
语法:(expression for item in iterable)
装饰器:
装饰器用于修改函数行为,不改变原函数代码。
使用@符号应用装饰器。plaintextprint("\n✂️ 压缩检索:")
compressed_docs = compressed_retriever.retrieve(query)
print(f"长度: {len(compressed_docs[0].page_content)} 字符")
print(compressed_docs[0].page_content)python运行结果:
✂️ 压缩检索:
Processed prompts: 100%|██████████| 1/1 [00:00<00:00, 2.64it/s]
Processed prompts: 100%|██████████| 1/1 [00:00<00:00, 2.67it/s]
Processed prompts: 100%|██████████| 1/1 [00:00<00:00, 2.67it/s]
Processed prompts: 100%|██████████| 1/1 [00:00<00:00, 2.66it/s]
长度: 72 字符
列表推导式是Python中创建列表的简洁方式。语法:[expression for item in iterable if condition]plaintext4.5 实现嵌入过滤器#
操作流程:
用户查询: "Python列表推导式"
↓
基础检索器检索相关文档
↓
获取多个候选文档
↓
计算查询与每个文档的嵌入相似度
↓
过滤相似度低于阈值的文档
↓
返回高相似度文档plaintextfrom langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import EmbeddingsFilter
from langchain.vectorstores import Chroma
class EmbeddingFilterRetriever:
"""基于嵌入相似度的过滤器"""
def __init__(self, base_retriever, embeddings, similarity_threshold: float = 0.75):
# 创建嵌入过滤器
compressor = EmbeddingsFilter(
embeddings=embeddings,
similarity_threshold=similarity_threshold
)
# 创建压缩检索器
self.retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=base_retriever
)
def retrieve(self, query: str, k: int = 5):
"""检索并过滤"""
docs = self.retriever.get_relevant_documents(query, k=k)
return docs
# 创建基础检索器
base_retriever = vectorstore.as_retriever()
# 创建嵌入过滤器
embedding_filter = EmbeddingFilterRetriever(
base_retriever=base_retriever,
embeddings=embeddings,
similarity_threshold=0.6
)
# 检索测试
query = "Python列表推导式"
filtered_docs = embedding_filter.retrieve(query, k=5)
for i in range(len(filtered_docs)):
print(f"Doc_{i}: {filtered_docs[i].page_content}")
print(f"过滤后保留 {len(filtered_docs)} 个文档")python运行结果:
Doc_0:
Python高级特性
列表推导式:
列表推导式是Python中创建列表的简洁方式。
语法:[expression for item in iterable if condition]
示例:squares = [x**2 for x in range(10)]
生成器表达式:
生成器用于惰性计算,节省内存。
语法:(expression for item in iterable)
装饰器:
装饰器用于修改函数行为,不改变原函数代码。
使用@符号应用装饰器。
Doc_1:
Python高级特性
列表推导式:
列表推导式是Python中创建列表的简洁方式。
语法:[expression for item in iterable if condition]
示例:squares = [x**2 for x in range(10)]
生成器表达式:
生成器用于惰性计算,节省内存。
语法:(expression for item in iterable)
装饰器:
装饰器用于修改函数行为,不改变原函数代码。
使用@符号应用装饰器。
过滤后保留 4 个文档plaintext4.6 实现文档分割过滤器#
from langchain.retrievers import ContextualCompressionRetriever
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import Chroma
class SimplePipelineCompressor:
"""简化版管道压缩器"""
def __init__(self, base_retriever, embeddings, llm):
self.base_retriever = base_retriever
self.embeddings = embeddings
self.llm = llm
# 文本分割器
self.splitter = CharacterTextSplitter(
chunk_size=100,
chunk_overlap=10,
separator=". "
)
def retrieve(self, query: str):
"""手动实现管道压缩"""
# 1. 基础检索
base_docs = self.base_retriever.get_relevant_documents(query)
# 2. 文本分割
split_docs = []
for doc in base_docs:
chunks = self.splitter.split_text(doc.page_content)
for chunk in chunks:
split_docs.append({
'content': chunk,
'metadata': doc.metadata
})
print(f"length of split_docs: {len(split_docs)}")
# 3. 嵌入过滤(简化版)
filtered_docs = self._embedding_filter(split_docs, query)
# 4. LLM提取(简化版)
final_docs = self._llm_extract(filtered_docs, query)
return final_docs
def _embedding_filter(self, docs, query):
"""嵌入相似度过滤"""
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
# 计算查询嵌入
query_embedding = self.embeddings.embed_query(query)
filtered_docs = []
for doc in docs:
# 计算文档嵌入
doc_embedding = self.embeddings.embed_query(doc['content'])
# 计算相似度
similarity = cosine_similarity([query_embedding], [doc_embedding])[0][0]
# 应用阈值
if similarity >= 0.5:
filtered_docs.append(doc)
return filtered_docs
def _llm_extract(self, docs, query):
"""LLM内容提取"""
from vllm import SamplingParams
final_docs = []
for doc in docs:
# 构建ChatML格式提示
prompt = f"""<|im_start|>system
你是一个文档压缩助手。请从给定的文本中提取与用户查询相关的关键信息。
请只返回提取的关键信息,不要添加任何解释或评论。<|im_end|>
<|im_start|>user
查询:{query}
文本内容:
{doc['content']}
请提取与查询相关的关键信息:<|im_end|>
<|im_start|>assistant
"""
sampling_params = SamplingParams(
temperature=0.1,
top_p=0.9,
max_tokens=200,
stop=["<|im_end|>"]
)
outputs = self.llm.generate([prompt], sampling_params)
if outputs and outputs[0].outputs:
extracted_content = outputs[0].outputs[0].text.strip()
from langchain.schema import Document
final_doc = Document(
page_content=extracted_content,
metadata=doc['metadata']
)
final_docs.append(final_doc)
return final_docs
# 创建压缩器
compressor = SimplePipelineCompressor(base_retriever, embeddings, llm)
# 检索
results = compressor.retrieve("Python列表推导式")
print(results)
for i, doc in enumerate(results):
print(f"结果 {i+1}: {doc.page_content}")python运行结果:
length of split_docs: 4
Processed prompts: 100%|██████████| 1/1 [00:00<00:00, 1.22it/s]
Processed prompts: 100%|██████████| 1/1 [00:00<00:00, 1.20it/s]
Processed prompts: 100%|██████████| 1/1 [00:00<00:00, 1.29it/s]
Processed prompts: 100%|██████████| 1/1 [00:00<00:00, 1.77it/s]
[Document(page_content='Python列表推导式语法:[expression for item in iterable if condition]\n 示例:squares = [x**2 for x in range(10)]\n 生成器表达式:(expression for item in iterable)\n 装饰器:使用@符号应用装饰器。', metadata={'source': 'python_advanced.md'}), Document(page_content='Python列表推导式语法:[expression for item in iterable if condition]\n 示例:squares = [x**2 for x in range(10)]\n 生成器表达式语法:(expression for item in iterable)\n 装饰器语法:@符号应用装饰器。', metadata={'source': 'python_advanced.md'}), Document(page_content='Python列表推导式语法:[expression for item in iterable if condition]\n 示例:squares = [x**2 for x in range(10)]\n 生成器表达式语法:(expression for item in iterable)\n 装饰器使用@符号应用', metadata={'source': 'python_advanced.md'}), Document(page_content='列表推导式:[expression for item in iterable if condition]\n 生成器表达式:(expression for item in iterable)\n 装饰器:@符号应用装饰器', metadata={'source': 'python_advanced.md'})]
结果 1: Python列表推导式语法:[expression for item in iterable if condition]
示例:squares = [x**2 for x in range(10)]
生成器表达式:(expression for item in iterable)
装饰器:使用@符号应用装饰器。
结果 2: Python列表推导式语法:[expression for item in iterable if condition]
示例:squares = [x**2 for x in range(10)]
生成器表达式语法:(expression for item in iterable)
装饰器语法:@符号应用装饰器。
结果 3: Python列表推导式语法:[expression for item in iterable if condition]
示例:squares = [x**2 for x in range(10)]
生成器表达式语法:(expression for item in iterable)
装饰器使用@符号应用
结果 4: 列表推导式:[expression for item in iterable if condition]
生成器表达式:(expression for item in iterable)
装饰器:@符号应用装饰器plaintext4.7 压缩策略对比#
| 压缩器 | 方法 | 速度 | 质量 | 成本 |
|---|---|---|---|---|
| LLM提取器 | LLM提取相关内容 | 慢 | 高 | 高 |
| 嵌入过滤器 | 相似度过滤 | 快 | 中 | 低 |
| 管道压缩 | 组合多种方法 | 中 | 高 | 中 |
4.8 技术优势与适用场景#
优势: ✅
- 显著节省tokens:减少LLM处理的无用信息
- 提高答案质量:专注于相关内容
- 降低计算成本:减少API调用费用
- 提升响应速度:处理更少的内容
适用场景:
- 长文档检索:技术文档、学术论文等
- 多主题查询:需要精确匹配特定部分
- 成本敏感应用:需要控制API调用成本
- 实时系统:需要快速响应的应用
选择指南:
- 追求质量:选择LLM提取器或管道压缩
- 追求速度:选择嵌入过滤器
- 平衡方案:管道压缩提供最佳平衡
上下文压缩检索技术通过智能过滤和内容提取,有效解决了检索冗余问题,是构建高效RAG系统的关键技术之一。
Part 5: 时间衰减检索 - Time-Weighted Retrieval#
5.1 核心概念#
时间衰减检索考虑文档的新鲜度,给予新文档更高的权重。这种策略特别适用于新闻、技术文档、市场报告等时效性强的场景。
5.2 为什么需要时间衰减?#
场景对比:
# 旧文档(2023年)
old_doc = Document(
page_content="2023年1月:Python 3.11发布",
metadata={"date": "2023-01-01"}
)
# 新文档(2024年)
recent_doc = Document(
page_content="2024年10月:Python 3.13发布,性能提升显著",
metadata={"date": "2024-10-01"}
)
用户查询: "Python最新版本"python问题:
- 标准检索可能返回旧文档(Python 3.11)
- 用户期望获得最新信息(Python 3.13)
- 时间因素影响答案的准确性和价值
解决方案:时间衰减检索 ✅
- 给予新文档更高的相关性权重
- 自动平衡语义相关性和时效性
- 确保返回最新、最准确的信息
5.3 实现时间加权检索器#
from langchain.retrievers import TimeWeightedVectorStoreRetriever
import datetime
class TimeSensitiveRetriever:
"""时间敏感检索器"""
def __init__(self, vectorstore, decay_rate: float = 0.01):
"""
Args:
decay_rate: 衰减率,越大则时间影响越大
"""
self.retriever = TimeWeightedVectorStoreRetriever(
vectorstore=vectorstore,
decay_rate=decay_rate,
k=2
)
def add_documents(self, documents: List[Document]):
"""添加文档(会自动记录时间)"""
self.retriever.add_documents(documents)
def retrieve(self, query: str):
"""检索(考虑时间因素)"""
docs = self.retriever.get_relevant_documents(query)
print(f"docs: {docs}")
return docspython5.4 实际应用演示#
# 创建时间敏感检索器
vectorstore = Chroma(
collection_name="news",
persist_directory="./chroma_db",
embedding_function=embeddings
)
time_retriever = TimeSensitiveRetriever(vectorstore, decay_rate=0.01)
# 准备不同时间的文档
old_doc = Document(
page_content="2023年1月:Python 3.11发布,引入了新的语法特性和性能改进",
metadata={"date": "2023-01-01", "source": "python_release_notes"}
)
recent_doc = Document(
page_content="2024年10月:Python 3.13发布,性能提升显著,新增了JIT编译器",
metadata={"date": "2024-10-01", "source": "python_release_notes"}
)
# 添加文档到检索器
time_retriever.add_documents([old_doc, recent_doc])
# 检索:新文档会获得更高权重
results = time_retriever.retrieve("Python最新版本")
print("\n检索结果(按时间加权):")
for i, doc in enumerate(results):
print(f"\n结果 {i+1}:")
print(f"日期: {doc.metadata['date']}")
print(f"内容: {doc.page_content}")python运行结果:
docs: [Document(page_content='2024年10月:Python 3.13发布,性能提升显著,新增了JIT编译器', metadata={'date': '2024-10-01', 'source': 'python_release_notes'}), Document(page_content='2023年1月:Python 3.11发布,引入了新的语法特性和性能改进', metadata={'date': '2023-01-01', 'source': 'python_release_notes'})]
检索结果(按时间加权):
结果 1:
日期: 2024-10-01
内容: 2024年10月:Python 3.13发布,性能提升显著,新增了JIT编译器
结果 2:
日期: 2023-01-01
内容: 2023年1月:Python 3.11发布,引入了新的语法特性和性能改进plaintext5.5 衰减率参数调优#
class AdvancedTimeRetriever:
"""高级时间检索器(支持不同衰减策略)"""
def __init__(self, vectorstore):
self.vectorstore = vectorstore
def retrieve_with_decay(self, query: str, decay_rate: float = 0.01, k: int = 3):
"""使用指定衰减率检索"""
retriever = TimeWeightedVectorStoreRetriever(
vectorstore=self.vectorstore,
decay_rate=decay_rate,
k=k
)
return retriever.get_relevant_documents(query)
def compare_decay_rates(self, query: str):
"""比较不同衰减率的效果"""
decay_rates = [0.001, 0.01, 0.1] # 低、中、高衰减率
print(f"查询: '{query}'")
print("=" * 60)
for decay_rate in decay_rates:
print(f"\n衰减率: {decay_rate}")
results = self.retrieve_with_decay(query, decay_rate)
for i, doc in enumerate(results):
date = doc.metadata.get('date', '未知日期')
print(f" {i+1}. {date}: {doc.page_content[:50]}...")
# 测试不同衰减率
advanced_retriever = AdvancedTimeRetriever(vectorstore)
# 比较不同衰减率的效果
advanced_retriever.compare_decay_rates("Python最新特性")python运行结果:
查询: 'Python最新特性'
============================================================
衰减率: 0.001
1. 2024-10-01: 2024年10月:Python 3.13发布,性能提升显著...
2. 2023-01-01: 2023年1月:Python 3.11发布,引入了新的语法...
衰减率: 0.01
1. 2024-10-01: 2024年10月:Python 3.13发布,性能提升显著...
2. 2023-01-01: 2023年1月:Python 3.11发布,引入了新的语法...
衰减率: 0.1
1. 2024-10-01: 2024年10月:Python 3.13发布,性能提升显著...
2. 2023-01-01: 2023年1月:Python 3.11发布,引入了新的语法...plaintext5.6 自定义时间衰减函数#
import math
from datetime import datetime
from typing import List, Callable
class CustomTimeWeightedRetriever:
"""自定义时间加权检索器"""
def __init__(self, vectorstore, time_weight_function: Callable = None):
self.vectorstore = vectorstore
# 默认时间衰减函数(指数衰减)
if time_weight_function is None:
self.time_weight_function = self.exponential_decay
else:
self.time_weight_function = time_weight_function
def exponential_decay(self, doc_date: str, current_date: str = None) -> float:
"""指数衰减函数"""
if current_date is None:
current_date = datetime.now().strftime("%Y-%m-%d")
# 计算天数差
doc_dt = datetime.strptime(doc_date, "%Y-%m-%d")
current_dt = datetime.strptime(current_date, "%Y-%m-%d")
days_diff = (current_dt - doc_dt).days
# 指数衰减:每30天衰减一半
decay_factor = 0.5 ** (days_diff / 30)
return max(decay_factor, 0.1) # 最小权重0.1
def linear_decay(self, doc_date: str, current_date: str = None) -> float:
"""线性衰减函数"""
if current_date is None:
current_date = datetime.now().strftime("%Y-%m-%d")
doc_dt = datetime.strptime(doc_date, "%Y-%m-%d")
current_dt = datetime.strptime(current_date, "%Y-%m-%d")
days_diff = (current_dt - doc_dt).days
# 线性衰减:365天内从1.0衰减到0.1
if days_diff <= 365:
weight = 1.0 - (0.9 * days_diff / 365)
return max(weight, 0.1)
else:
return 0.1
def retrieve_with_custom_weights(self, query: str, k: int = 3) -> List[Document]:
"""使用自定义时间权重检索"""
# 基础检索(不考虑时间)
base_docs = self.vectorstore.similarity_search(query, k=k*2)
# 计算时间权重并排序
weighted_docs = []
for doc in base_docs:
doc_date = doc.metadata.get('date')
if doc_date:
time_weight = self.time_weight_function(doc_date)
else:
time_weight = 0.5 # 默认权重
# 可以结合语义相似度得分
weighted_docs.append((doc, time_weight))
# 按时间权重排序
weighted_docs.sort(key=lambda x: x[1], reverse=True)
# 返回前k个结果
return [doc for doc, weight in weighted_docs[:k]]
# 测试自定义检索器
custom_retriever = CustomTimeWeightedRetriever(vectorstore)
# 使用指数衰减
results_exp = custom_retriever.retrieve_with_custom_weights("Python发布")
print("指数衰减结果:")
for doc in results_exp:
print(f" {doc.metadata['date']}: {doc.page_content}")
# 使用线性衰减
custom_retriever.time_weight_function = custom_retriever.linear_decay
results_linear = custom_retriever.retrieve_with_custom_weights("Python发布")
print("\n线性衰减结果:")
for doc in results_linear:
print(f" {doc.metadata['date']}: {doc.page_content}")python5.7 时间衰减策略对比#
| 衰减策略 | 公式 | 特点 | 适用场景 |
|---|---|---|---|
| 指数衰减 | weight = base^(days/interval) | 前期衰减快,后期平缓 | 新闻、社交媒体 |
| 线性衰减 | weight = 1 - (衰减率 × days) | 均匀衰减,易于控制 | 技术文档、研究报告 |
| 阶梯衰减 | 按时间段分段设置权重 | 离散化处理,简单明了 | 法律法规、政策文件 |
5.8 实际应用场景#
1. 新闻检索系统
# 新闻文档示例
news_docs = [
Document(
page_content="今日股市大涨,科技股领涨",
metadata={"date": "2024-11-20", "category": "财经"}
),
Document(
page_content="上周市场回顾:整体平稳",
metadata={"date": "2024-11-13", "category": "财经"}
)
]
# 高衰减率确保最新新闻优先
news_retriever = TimeSensitiveRetriever(vectorstore, decay_rate=0.1)python2. 技术文档检索
# 技术文档示例
tech_docs = [
Document(
page_content="React 18新特性:并发渲染",
metadata={"date": "2024-06-01", "framework": "React"}
),
Document(
page_content="React 17版本特性介绍",
metadata={"date": "2023-03-01", "framework": "React"}
)
]
# 中等衰减率平衡新旧信息
tech_retriever = TimeSensitiveRetriever(vectorstore, decay_rate=0.01)python3. 学术论文检索
# 学术论文示例
paper_docs = [
Document(
page_content="2024年最新AI研究成果",
metadata={"date": "2024-10-01", "field": "人工智能"}
),
Document(
page_content="经典机器学习算法综述",
metadata={"date": "2020-05-01", "field": "机器学习"}
)
]
# 低衰减率重视经典文献
paper_retriever = TimeSensitiveRetriever(vectorstore, decay_rate=0.001)python时间衰减检索技术通过智能平衡信息的新鲜度和相关性,为时效性敏感的应用场景提供了重要价值。合理配置衰减参数和策略,可以显著提升检索系统的实用性和用户体验。
综合实战:构建高级RAG系统#
在前面的章节中,我们深入探讨了各种高级索引和检索技术。现在,让我们将这些技术整合到一个完整的、生产就绪的高级RAG系统中。
系统架构设计#
我们的高级RAG系统采用模块化设计,包含以下核心组件:
系统架构图:
用户查询
↓
查询分析器 → 语义路由
↓
多策略检索引擎
├─ 基础向量检索
├─ 压缩检索
├─ 嵌入过滤
├─ 时间加权检索
└─ 混合检索
↓
结果融合器
↓
上下文压缩器
↓
答案生成器
↓
最终响应plaintext核心实现代码#
from langchain.vectorstores import Chroma
from langchain.schema import Document
from typing import Dict, List, Optional
from dataclasses import dataclass
from vllm import SamplingParams
import numpy as np
import asyncio
from datetime import datetime
@dataclass
class RAGConfig:
"""RAG系统配置"""
chunk_size: int = 1000
chunk_overlap: int = 200
similarity_threshold: float = 0.7
max_retrieved_docs: int = 5
enable_compression: bool = True
enable_time_weighting: bool = False
cache_enabled: bool = True
class AdvancedRAGSystem:
"""高级RAG系统(生产级实现)"""
def __init__(self, embeddings, llm, config: RAGConfig = None):
self.embeddings = embeddings
self.llm = llm
self.config = config or RAGConfig()
# 初始化向量存储
self.vectorstore = Chroma(
collection_name="advanced_rag",
persist_directory="./chroma_db",
embedding_function=embeddings
)
# 基础检索器
self.base_retriever = self.vectorstore.as_retriever(
search_kwargs={"k": self.config.max_retrieved_docs}
)
# 缓存系统
self.cache = {} if self.config.cache_enabled else None
print("🚀 高级RAG系统初始化完成")
def index_document(self, content: str, metadata: Dict = None):
"""智能索引文档"""
doc = Document(
page_content=content,
metadata=metadata or {}
)
self.vectorstore.add_documents([doc])
print("✅ 文档已索引")
def batch_index(self, documents: List[Dict]):
"""批量索引文档"""
docs = []
for doc_data in documents:
doc = Document(
page_content=doc_data['content'],
metadata=doc_data.get('metadata', {})
)
docs.append(doc)
# 分批处理避免内存溢出
batch_size = 50
for i in range(0, len(docs), batch_size):
batch = docs[i:i+batch_size]
self.vectorstore.add_documents(batch)
print(f"✅ 已索引批次 {i//batch_size + 1}/{(len(docs)-1)//batch_size + 1}")
def query_with_compression(self, question: str, k: int = 3):
"""使用压缩检索器"""
from vllm import SamplingParams
# 基础检索
base_docs = self.base_retriever.get_relevant_documents(question, k=k*2)
# 压缩文档
compressed_docs = []
for doc in base_docs:
prompt = f"""<|im_start|>system
你是一个文档压缩助手。请从以下文本中提取与用户查询相关的关键信息。
请只返回提取的关键信息,不要添加任何解释或评论。<|im_end|>
<|im_start|>user
查询:{question}
文本内容:
{doc.page_content}
请提取与查询相关的关键信息:<|im_end|>
<|im_start|>assistant
"""
sampling_params = SamplingParams(
temperature=0.1,
top_p=0.9,
max_tokens=300,
stop=["<|im_end|>"]
)
outputs = self.llm.generate([prompt], sampling_params)
if outputs and outputs[0].outputs:
compressed_content = outputs[0].outputs[0].text.strip()
compressed_doc = Document(
page_content=compressed_content,
metadata=doc.metadata
)
compressed_docs.append(compressed_doc)
return compressed_docs[:k]
def query_with_embedding_filter(self, question: str, k: int = 3, similarity_threshold: float = 0.7):
"""使用嵌入过滤器"""
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
# 基础检索
base_docs = self.base_retriever.get_relevant_documents(question, k=k*3)
# 计算查询嵌入
query_embedding = self.embeddings.embed_query(question)
# 过滤相似度低的文档
filtered_docs = []
for doc in base_docs:
doc_embedding = self.embeddings.embed_query(doc.page_content)
similarity = cosine_similarity([query_embedding], [doc_embedding])[0][0]
if similarity >= similarity_threshold:
filtered_docs.append(doc)
return filtered_docs[:k]
def query_with_pipeline(self, question: str, k: int = 3):
"""使用管道压缩(压缩+过滤)"""
# 先过滤
filtered_docs = self.query_with_embedding_filter(question, k=k*2, similarity_threshold=0.6)
# 再压缩
compressed_docs = []
for doc in filtered_docs:
prompt = f"从以下文本中提取与'{question}'相关的关键信息:\n\n{doc.page_content}\n\n关键信息:"
from vllm import SamplingParams
sampling_params = SamplingParams(
temperature=0.1,
top_p=0.9,
max_tokens=200,
stop=["<|im_end|>"]
)
outputs = self.llm.generate([prompt], sampling_params)
if outputs and outputs[0].outputs:
compressed_content = outputs[0].outputs[0].text.strip()
compressed_doc = Document(
page_content=compressed_content,
metadata=doc.metadata
)
compressed_docs.append(compressed_doc)
return compressed_docs[:k]
def hybrid_query(self, question: str, k: int = 3):
"""混合检索策略"""
# 并行执行多种检索策略
strategies = {
'basic': self.base_retriever.get_relevant_documents(question, k=k),
'compressed': self.query_with_compression(question, k=k),
'filtered': self.query_with_embedding_filter(question, k=k)
}
# 结果融合(基于相似度得分)
all_docs = []
for strategy_name, docs in strategies.items():
for doc in docs:
all_docs.append((doc, strategy_name))
# 去重并排序
seen_content = set()
unique_docs = []
for doc, strategy in all_docs:
content_hash = hash(doc.page_content)
if content_hash not in seen_content:
seen_content.add(content_hash)
unique_docs.append(doc)
return unique_docs[:k]
def query(
self,
question: str,
method: str = "hybrid", # "basic", "compression", "filter", "pipeline", "hybrid"
k: int = 3
):
"""综合检索方法"""
print(f"❓ 查询: {question}")
print(f"🔧 方法: {method}")
# 检查缓存
cache_key = f"{question}_{method}_{k}"
if self.cache and cache_key in self.cache:
print("💾 使用缓存结果")
return self.cache[cache_key]
# 选择检索方法
start_time = datetime.now()
if method == "compression":
print("✂️ 使用压缩检索")
docs = self.query_with_compression(question, k)
elif method == "filter":
print("🎯 使用嵌入过滤")
docs = self.query_with_embedding_filter(question, k)
elif method == "pipeline":
print("⚡ 使用管道压缩")
docs = self.query_with_pipeline(question, k)
elif method == "hybrid":
print("🔄 使用混合检索")
docs = self.hybrid_query(question, k)
else:
print("🔍 使用基础检索")
docs = self.base_retriever.get_relevant_documents(question, k=k)
retrieval_time = (datetime.now() - start_time).total_seconds()
print(f"📄 检索到 {len(docs)} 个文档 (耗时: {retrieval_time:.2f}s)")
# 生成答案
context = "\n\n".join([doc.page_content for doc in docs])
prompt = f"""<|im_start|>system
你是一个智能问答助手。请基于提供的文档内容回答问题。
文档内容:
{context}<|im_end|>
<|im_start|>user
问题:{question}<|im_end|>
<|im_start|>assistant
"""
from vllm import SamplingParams
sampling_params = SamplingParams(
temperature=0.1,
top_p=0.9,
max_tokens=500,
stop=["<|im_end|>"]
)
outputs = self.llm.generate([prompt], sampling_params)
answer = outputs[0].outputs[0].text.strip() if outputs and outputs[0].outputs else "未能生成答案"
result = {
"question": question,
"method": method,
"documents": docs,
"answer": answer,
"retrieval_time": retrieval_time,
"context_length": len(context)
}
# 缓存结果
if self.cache:
self.cache[cache_key] = result
return resultpython系统部署与测试#
# 初始化高级RAG系统
config = RAGConfig(
chunk_size=800,
chunk_overlap=150,
similarity_threshold=0.6,
max_retrieved_docs=4,
enable_compression=True,
cache_enabled=True
)
advanced_rag = AdvancedRAGSystem(embeddings, llm, config)
# 索引示例文档
python_content = """
Python编程语言基础
函数定义:
使用def关键字定义函数:
def function_name(parameters):
# 函数体
return result
函数可以接受参数,也可以返回值。
参数可以有默认值,使用parameter=default_value语法。
示例:
def greet(name="World"):
return f"Hello, {name}!"
调用函数:
result = greet("Alice")
print(result) # 输出: Hello, Alice!
列表推导式:
列表推导式是Python中创建列表的简洁方式。
语法:[expression for item in iterable if condition]
示例:squares = [x**2 for x in range(10)]
面向对象编程:
类定义使用class关键字:
class MyClass:
def __init__(self, name):
self.name = name
def say_hello(self):
return f"Hello, {self.name}"
"""
advanced_rag.index_document(
content=python_content,
metadata={"source": "python_tutorial.md", "type": "tutorial", "date": "2024-01-01"}
)python性能对比测试#
# 测试不同检索方法
test_questions = [
"Python中的函数如何定义?",
"什么是列表推导式?",
"如何创建Python类?"
]
methods = ["basic", "compression", "filter", "hybrid"]
results = []
for question in test_questions:
print(f"\n{'='*60}")
print(f"测试问题: {question}")
print('='*60)
for method in methods:
result = advanced_rag.query(question, method=method, k=2)
results.append(result)
print(f"\n{method.upper()} 方法:")
print(f"答案: {result['answer'][:80]}...")
print(f"检索时间: {result['retrieval_time']:.2f}s")
print(f"上下文长度: {result['context_length']} 字符")python测试结果输出:
============================================================
测试问题: Python中的函数如何定义?
============================================================
BASIC 方法:
答案: 使用def关键字定义函数,函数可以接受参数,也可以返回值。参数可以有默认值...
检索时间: 0.45s
上下文长度: 355 字符
COMPRESSION 方法:
答案: Python函数定义使用def关键字,可以接受参数,也可以返回值。参数可以有默认值...
检索时间: 1.23s
上下文长度: 120 字符
HYBRID 方法:
答案: 使用def关键字定义函数,语法为:def function_name(parameters):。函数可以...
检索时间: 0.89s
上下文长度: 280 字符plaintext性能优化策略#
1. 智能缓存系统
class SmartCache:
"""智能缓存系统"""
def __init__(self, max_size=1000, ttl=3600):
self.cache = {}
self.max_size = max_size
self.ttl = ttl # 生存时间(秒)
def get(self, key):
if key in self.cache:
data, timestamp = self.cache[key]
if time.time() - timestamp < self.ttl:
return data
else:
del self.cache[key]
return None
def set(self, key, value):
if len(self.cache) >= self.max_size:
# LRU淘汰策略
oldest_key = min(self.cache.keys(), key=lambda k: self.cache[k][1])
del self.cache[oldest_key]
self.cache[key] = (value, time.time())python2. 异步处理优化
async def async_batch_query(self, questions: List[str], method: str = "hybrid"):
"""异步批量查询"""
semaphore = asyncio.Semaphore(5) # 控制并发数
async def process_question(question):
async with semaphore:
return await asyncio.to_thread(self.query, question, method)
tasks = [process_question(q) for q in questions]
return await asyncio.gather(*tasks)python3. 动态参数调优
def auto_tune_parameters(self, query_type: str):
"""根据查询类型自动调整参数"""
tuning_rules = {
"factual": {
"similarity_threshold": 0.8,
"k": 3,
"enable_compression": False
},
"analytical": {
"similarity_threshold": 0.6,
"k": 5,
"enable_compression": True
},
"creative": {
"similarity_threshold": 0.5,
"k": 7,
"enable_compression": True
}
}
return tuning_rules.get(query_type, tuning_rules["factual"])python通过这个完整的高级RAG系统实现,我们成功整合了前面讨论的所有先进技术,为构建生产级的智能问答应用提供了坚实的基础。系统具有良好的可扩展性、可配置性和监控能力,可以满足不同场景下的需求。