一、策略全景图
1、主流的RAG文档切分策略体系
├── 基础切块策略
│ ├── 1. 固定大小切块 (Fixed-size Chunking)
│ ├── 2. 重叠切块 (Overlapping Chunking)
│ └── 3. 滑动窗口切块 (Sliding Window Chunking)
│
├── 内容感知策略
│ ├── 4. 句子窗口切块 (Sentence Window Chunking)
│ ├── 5. 文档结构切块 (Document Structure Chunking)
│ └── 8. LLM智能切块 (LLM-aided Chunking)
│
├── 智能递归策略
│ └── 6. 递归切块 (Recursive Chunking)
│
└── 高级语义策略
└── 7. 语义感知切块 (Semantic-aware Chunking)
二、详细策略说明
1. 固定大小切块 (Fixed-size Chunking)
核心思想: 将文档按照固定字符数或token数切割成等长的块
实现原理:
文档: [=============完整文档=============]
切分: [块1: 500字符][块2: 500字符][块3: 500字符]def fixed_size_chunking(text: str, chunk_size: int = 512) -> List[str]:
"""
最基础的固定大小切分
Args:
text: 输入文本
chunk_size: 每块字符数
Returns:
等长文本块列表
"""
return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
# 改进版:支持token计数(更准确)
import tiktoken
class TokenBasedChunker:
"""基于token的固定大小切块"""
def __init__(self, model_name: str = "gpt-4", chunk_size: int = 512):
self.encoder = tiktoken.encoding_for_model(model_name)
self.chunk_size = chunk_size
def chunk(self, text: str) -> List[str]:
"""按token数切分"""
tokens = self.encoder.encode(text)
chunks = []
for i in range(0, len(tokens), self.chunk_size):
chunk_tokens = tokens[i:i + self.chunk_size]
chunk_text = self.encoder.decode(chunk_tokens)
chunks.append(chunk_text)
return chunks适用场景:
代码文件处理
日志文件分析
批处理ETL流水线
对语义完整性要求不高的场景
优点: ⚡ 极快速度 | 📏 大小均匀 | 🛠️ 实现简单
缺点: 💔 破坏语义 | 🔪 切分单词 | 🎯 检索质量低
2. 重叠切块 (Overlapping Chunking)
核心思想: 在固定切块基础上添加块间重叠,防止边界信息丢失
实现原理:
文档: [=============完整文档=============]
切分: [块1: 500字符]
[块2: 500字符] ← 重叠200字符
[块3: 500字符] ← 重叠200字符class OverlappingChunker:
"""重叠切块策略"""
def __init__(self, chunk_size: int = 500, overlap_size: int = 100):
self.chunk_size = chunk_size
self.overlap_size = overlap_size
def chunk(self, text: str) -> List[str]:
"""生成重叠分块"""
chunks = []
step_size = self.chunk_size - self.overlap_size
text_length = len(text)
for start in range(0, text_length, step_size):
end = min(start + self.chunk_size, text_length)
chunk = text[start:end]
# 优化:不在单词中间切断
if end < text_length:
chunk = self._adjust_to_word_boundary(chunk, text, end)
chunks.append(chunk)
if end >= text_length:
break
return chunks
def _adjust_to_word_boundary(self, chunk: str, full_text: str, end_pos: int) -> str:
"""调整到单词边界"""
# 如果是英文字母或数字,找到最近的非字母数字边界
if end_pos < len(full_text) and full_text[end_pos].isalnum():
# 向前寻找边界
adjust_pos = end_pos - 1
while adjust_pos > 0 and full_text[adjust_pos].isalnum():
adjust_pos -= 1
if adjust_pos > 0 and not full_text[adjust_pos].isalnum():
return full_text[:adjust_pos + 1]
return chunk
# 使用示例
chunker = OverlappingChunker(chunk_size=500, overlap_size=100)
chunks = chunker.chunk(long_document)适用场景:
技术文档检索
段落级问答系统
需要保持上下文连续性的场景
优点: 🔗 保持上下文 | 🛡️ 防止信息丢失 | ⚡ 相对高效
缺点: 📦 存储冗余 | 🔄 重复计算 | 🎯 仍需人工调参
3. 滑动窗口切块 (Sliding Window Chunking)
核心思想: 以固定步长在文档上滑动窗口,生成密集覆盖的分块
实现原理:
文档: [=============完整文档=============]
窗口: [窗口1] → [窗口2] → [窗口3] → [窗口4] → ...
步长: ↓ ↓ ↓
每次移动固定距离class SlidingWindowChunker:
"""滑动窗口切块策略"""
def __init__(self, window_size: int = 300, step_size: int = 100):
self.window_size = window_size
self.step_size = step_size
def chunk(self, text: str) -> List[Dict]:
"""生成带位置信息的滑动窗口分块"""
chunks = []
text_length = len(text)
for i in range(0, text_length - self.window_size + 1, self.step_size):
window_text = text[i:i + self.window_size]
chunk_info = {
"text": window_text,
"start": i,
"end": i + self.window_size,
"window_id": len(chunks),
"metadata": {
"char_count": len(window_text),
"word_count": len(window_text.split())
}
}
chunks.append(chunk_info)
return chunks
def chunk_with_context(self, text: str, context_before: int = 50, context_after: int = 50) -> List[Dict]:
"""生成带额外上下文的分块"""
base_chunks = self.chunk(text)
for chunk in base_chunks:
start = max(0, chunk["start"] - context_before)
end = min(len(text), chunk["end"] + context_after)
chunk["full_context"] = text[start:end]
chunk["context_before"] = text[start:chunk["start"]]
chunk["context_after"] = text[chunk["end"]:end]
return base_chunks适用场景:
密集检索任务
命名实体识别
关键词匹配
模式发现
优点: 🔍 密集覆盖 | 📍 位置精确 | 🎯 适合局部匹配
缺点: 💾 存储爆炸 | ⏱️ 处理缓慢 | 🔄 高度冗余
4. 句子窗口切块 (Sentence Window Chunking)
核心思想: 以句子为单位构建窗口,保持句子完整性和上下文关系
实现原理:
句子: [S1][S2][S3][S4][S5][S6][S7][S8]
窗口: [S1 S2 S3 S4 S5] ← 窗口大小=5
[S3 S4 S5 S6 S7] ← 滑动,保持句子完整python
import spacy
from typing import List, Tuple
class SentenceWindowChunker:
"""句子窗口切块策略"""
def __init__(self, window_sentences: int = 5, overlap_sentences: int = 2):
self.window_sentences = window_sentences
self.overlap_sentences = overlap_sentences
self.nlp = spacy.load("zh_core_web_sm") # 中文模型
def chunk(self, text: str) -> List[Dict]:
"""基于句子的窗口切块"""
# 分割句子
doc = self.nlp(text)
sentences = [sent.text.strip() for sent in doc.sents]
chunks = []
step = self.window_sentences - self.overlap_sentences
for i in range(0, len(sentences), step):
end_idx = min(i + self.window_sentences, len(sentences))
window_sentences = sentences[i:end_idx]
if not window_sentences:
continue
chunk_text = " ".join(window_sentences)
chunk_info = {
"text": chunk_text,
"sentences": window_sentences,
"sentence_range": (i, end_idx - 1),
"sentence_count": len(window_sentences),
"metadata": {
"avg_sentence_length": sum(len(s) for s in window_sentences) / len(window_sentences)
}
}
chunks.append(chunk_info)
return chunks
def chunk_with_centered_sentence(self, text: str) -> List[Dict]:
"""以每个句子为中心构建窗口"""
doc = self.nlp(text)
sentences = [sent.text.strip() for sent in doc.sents]
chunks = []
half_window = self.window_sentences // 2
for center_idx in range(len(sentences)):
start_idx = max(0, center_idx - half_window)
end_idx = min(len(sentences), center_idx + half_window + 1)
window_sentences = sentences[start_idx:end_idx]
chunk_text = " ".join(window_sentences)
chunks.append({
"text": chunk_text,
"center_sentence": sentences[center_idx],
"center_index": center_idx,
"window_sentences": window_sentences
})
return chunks适用场景:
对话系统
问答机器人
情感分析
文本摘要
优点: 🗣️ 句子完整 | 🤝 上下文连贯 | 🎯 语义质量高
缺点: 📏 长度不均 | 🐌 依赖NLP模型 | ⚙️ 配置复杂
5. 文档结构切块 (Document Structure Chunking)
核心思想: 基于文档的层级结构(标题、章节、段落)进行智能切分
实现原理:
文档结构:
├── 第1章 引言
│ ├── 1.1 背景
│ └── 1.2 目标
├── 第2章 方法
│ ├── 2.1 实验设计
│ └── 2.2 数据分析
└── 第3章 结论import re
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class DocumentNode:
"""文档节点"""
level: int
title: str
content: str
children: List['DocumentNode']
start_pos: int
end_pos: int
class DocumentStructureChunker:
"""基于文档结构的切块策略"""
def __init__(self):
# Markdown标题正则
self.markdown_patterns = [
(r'^# (.+)$', 1), # H1
(r'^## (.+)$', 2), # H2
(r'^### (.+)$', 3), # H3
(r'^#### (.+)$', 4), # H4
]
def parse_markdown(self, text: str) -> DocumentNode:
"""解析Markdown文档结构"""
lines = text.split('\n')
root = DocumentNode(level=0, title="Root", content="", children=[], start_pos=0, end_pos=len(text))
stack = [root]
current_content = []
current_start = 0
for i, line in enumerate(lines):
line_start = text.find(line, current_start)
# 检查是否为标题
is_heading = False
heading_level = 0
heading_text = ""
for pattern, level in self.markdown_patterns:
match = re.match(pattern, line)
if match:
is_heading = True
heading_level = level
heading_text = match.group(1)
break
if is_heading:
# 保存当前节点的内容
if stack and current_content:
stack[-1].content = '\n'.join(current_content)
stack[-1].end_pos = line_start
# 创建新节点
new_node = DocumentNode(
level=heading_level,
title=heading_text,
content="",
children=[],
start_pos=line_start,
end_pos=line_start + len(line)
)
# 找到父节点
while stack and stack[-1].level >= heading_level:
stack.pop()
if stack:
stack[-1].children.append(new_node)
stack.append(new_node)
current_content = []
else:
current_content.append(line)
current_start = line_start + len(line) + 1
# 处理最后一部分内容
if stack and current_content:
stack[-1].content = '\n'.join(current_content)
return root
def chunk_by_level(self, root: DocumentNode, target_level: int) -> List[str]:
"""按指定层级切分"""
chunks = []
def collect_nodes(node: DocumentNode):
if node.level == target_level:
chunk = f"{node.title}\n\n{node.content}".strip()
if chunk:
chunks.append(chunk)
for child in node.children:
collect_nodes(child)
collect_nodes(root)
return chunks
def chunk_hierarchical(self, root: DocumentNode) -> List[Dict]:
"""生成层次化分块"""
chunks = []
def traverse(node: DocumentNode, parent_titles: List[str]):
# 当前节点的完整标题路径
current_titles = parent_titles + [node.title]
full_title = " > ".join(filter(None, current_titles))
# 生成当前块
if node.content.strip():
chunks.append({
"text": node.content,
"title": full_title,
"level": node.level,
"path": current_titles,
"position": (node.start_pos, node.end_pos)
})
# 递归处理子节点
for child in node.children:
traverse(child, current_titles)
traverse(root, [])
return chunks
# 支持PDF/Word等结构化文档
class AdvancedDocumentChunker(DocumentStructureChunker):
"""支持多种文档格式的结构化切块"""
def chunk_pdf(self, pdf_content: str) -> List[Dict]:
"""处理PDF文档结构"""
# 检测PDF标题和章节
# 实际实现需要使用PDF解析库如PyPDF2或pdfplumber
pass
def chunk_word(self, doc_content: str) -> List[Dict]:
"""处理Word文档结构"""
# 检测Word的样式和格式
# 实际实现需要使用python-docx
pass适用场景:
学术论文检索
技术手册
法律文档
结构化报告
优点: 🏗️ 保持结构 | 📚 层次清晰 | 🎯 检索精准
缺点: 🐌 解析复杂 | 📄 格式依赖 | ⚙️ 实现困难
6. 递归切块 (Recursive Chunking)
核心思想: 使用分隔符优先级列表递归分割,智能适应不同文档结构
实现原理:
第一级: 用"\n\n"分割 → [大块1][大块2][大块3]
第二级: 对>chunk_size的块用"\n"分割
第三级: 对仍然太大的块用"。"分割
...
直到所有块都≤chunk_sizefrom typing import List, Optional
class RecursiveChunker:
"""递归切块策略 - LangChain风格实现"""
def __init__(self,
chunk_size: int = 1000,
chunk_overlap: int = 200,
separators: Optional[List[str]] = None):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# 分隔符优先级列表(从大到小)
self.separators = separators or [
"\n\n", # 双换行(段落)
"\n", # 单换行
"。", ".", # 句子结束
";", ";", # 分号
",", ",", # 逗号
" ", " ", # 空格
"" # 字符级别(最后手段)
]
def chunk(self, text: str) -> List[str]:
"""递归切分文档"""
return self._split_text_recursive(text, self.separators)
def _split_text_recursive(self, text: str, separators: List[str]) -> List[str]:
"""递归分割实现"""
# 基本情况:文本足够小
if len(text) <= self.chunk_size:
return [text]
# 尝试当前分隔符
separator = separators[0] if separators else ""
if separator:
splits = text.split(separator)
else:
# 字符级分割
splits = [text[i:i+self.chunk_size] for i in range(0, len(text), self.chunk_size)]
# 合并小的分割片段
merged_splits = self._merge_splits(splits, separator)
# 检查合并后的分块
good_splits = []
need_recursive = False
for split in merged_splits:
if len(split) <= self.chunk_size:
good_splits.append(split)
else:
need_recursive = True
# 递归处理这个太大的分块
remaining_separators = separators[1:] if len(separators) > 1 else [""]
sub_splits = self._split_text_recursive(split, remaining_separators)
good_splits.extend(sub_splits)
# 如果没有需要递归处理的,直接返回
if not need_recursive:
return good_splits
return good_splits
def _merge_splits(self, splits: List[str], separator: str) -> List[str]:
"""合并小片段,添加重叠"""
merged = []
current_chunk = ""
for split in splits:
# 如果加上当前片段不超过chunk_size,就合并
if len(current_chunk) + len(split) + len(separator) <= self.chunk_size:
if current_chunk:
current_chunk += separator + split
else:
current_chunk = split
else:
# 保存当前块
if current_chunk:
merged.append(current_chunk)
# 开始新块,可能包含重叠
if self.chunk_overlap > 0 and current_chunk:
# 从当前块末尾取重叠部分
overlap_text = current_chunk[-self.chunk_overlap:]
current_chunk = overlap_text + separator + split
else:
current_chunk = split
# 添加最后一个块
if current_chunk:
merged.append(current_chunk)
return merged
def chunk_with_metadata(self, text: str) -> List[Dict]:
"""带元数据的切分"""
chunks = self.chunk(text)
enhanced_chunks = []
for i, chunk_text in enumerate(chunks):
# 找到块在原文中的位置
start_pos = text.find(chunk_text)
end_pos = start_pos + len(chunk_text)
enhanced_chunks.append({
"text": chunk_text,
"chunk_id": i,
"position": (start_pos, end_pos),
"length": len(chunk_text),
"token_count": len(chunk_text.split()), # 简化的token计数
"metadata": {
"chunk_size": self.chunk_size,
"chunk_overlap": self.chunk_overlap
}
})
return enhanced_chunks适用场景:
通用文档处理
混合格式内容
企业内部知识库
内容管理系统
优点: 🧠 智能适应 | 🎯 质量均衡 | ⚙️ 配置灵活
缺点: 🔄 递归开销 | 📏 大小不均 | 🎛️ 参数敏感
7. 语义感知切块 (Semantic-aware Chunking)
核心思想: 使用文本嵌入模型检测语义边界,基于内容相似度智能切分
实现原理:
文档: [段落1][段落2][段落3][段落4][段落5]
嵌入: [向量1][向量2][向量3][向量4][向量5]
相似度: 高 高 低 高 高
切分点: ↑ (相似度低于阈值)
结果: [块1: 段落1-2-3][块2: 段落4-5]import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.cluster import AgglomerativeClustering
class SemanticAwareChunker:
"""语义感知切块策略"""
def __init__(self,
model_name: str = "paraphrase-multilingual-MiniLM-L12-v2",
similarity_threshold: float = 0.75,
min_chunk_size: int = 100,
max_chunk_size: int = 800):
self.model = SentenceTransformer(model_name)
self.similarity_threshold = similarity_threshold
self.min_chunk_size = min_chunk_size
self.max_chunk_size = max_chunk_size
def chunk(self, text: str) -> List[Dict]:
"""基于语义的智能切分"""
# 1. 预处理:分割为基本单元
basic_units = self._split_to_units(text)
# 2. 计算每个单元的嵌入
embeddings = self.model.encode(basic_units)
# 3. 基于相似度合并单元
chunks = self._merge_by_semantic(basic_units, embeddings)
# 4. 后处理:确保大小合适
final_chunks = self._post_process(chunks)
return final_chunks
def _split_to_units(self, text: str) -> List[str]:
"""将文本分割为基本语义单元"""
# 方法1:按段落分割
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
# 如果段落太大,进一步分割
units = []
for para in paragraphs:
if len(para) > self.max_chunk_size:
# 按句子分割
sentences = self._split_sentences(para)
units.extend(sentences)
else:
units.append(para)
return units
def _split_sentences(self, text: str) -> List[str]:
"""分割句子(简化版)"""
import re
# 中英文句子分隔符
delimiters = r'[。!?;.!?;]'
sentences = re.split(delimiters, text)
return [s.strip() for s in sentences if s.strip()]
def _merge_by_semantic(self, units: List[str], embeddings: np.ndarray) -> List[Dict]:
"""基于语义相似度合并单元"""
if len(units) <= 1:
return [{"text": units[0] if units else "", "units": units}]
chunks = []
current_chunk = [units[0]]
current_embedding = embeddings[0].reshape(1, -1)
for i in range(1, len(units)):
# 计算当前块与新单元的相似度
new_embedding = embeddings[i].reshape(1, -1)
# 计算平均嵌入
combined_embeddings = np.vstack([current_embedding, new_embedding])
avg_embedding = np.mean(combined_embeddings, axis=0, keepdims=True)
# 计算与之前块的相似度
similarity = self._cosine_similarity(current_embedding.flatten(), avg_embedding.flatten())
# 检查合并条件
should_merge = (
similarity > self.similarity_threshold and
sum(len(u) for u in current_chunk) + len(units[i]) <= self.max_chunk_size
)
if should_merge:
current_chunk.append(units[i])
current_embedding = avg_embedding
else:
# 保存当前块
chunk_text = " ".join(current_chunk)
chunks.append({
"text": chunk_text,
"units": current_chunk.copy(),
"embedding": current_embedding.flatten(),
"size": len(chunk_text)
})
# 开始新块
current_chunk = [units[i]]
current_embedding = embeddings[i].reshape(1, -1)
# 添加最后一个块
if current_chunk:
chunk_text = " ".join(current_chunk)
chunks.append({
"text": chunk_text,
"units": current_chunk,
"embedding": current_embedding.flatten(),
"size": len(chunk_text)
})
return chunks
def _post_process(self, chunks: List[Dict]) -> List[Dict]:
"""后处理:调整分块大小"""
final_chunks = []
for chunk in chunks:
if chunk["size"] < self.min_chunk_size and final_chunks:
# 合并到前一个块
final_chunks[-1]["text"] += " " + chunk["text"]
final_chunks[-1]["units"].extend(chunk["units"])
final_chunks[-1]["size"] += chunk["size"]
elif chunk["size"] > self.max_chunk_size:
# 重新切分太大的块
sub_chunks = self._split_large_chunk(chunk)
final_chunks.extend(sub_chunks)
else:
final_chunks.append(chunk)
return final_chunks
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
"""计算余弦相似度"""
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def _split_large_chunk(self, chunk: Dict) -> List[Dict]:
"""切分过大的块"""
# 使用更激进的分割策略
text = chunk["text"]
sentences = self._split_sentences(text)
sub_chunks = []
current = []
for sentence in sentences:
if sum(len(s) for s in current) + len(sentence) > self.max_chunk_size:
if current:
sub_chunks.append({
"text": " ".join(current),
"units": current.copy(),
"size": sum(len(s) for s in current)
})
current = [sentence]
else:
current.append(sentence)
if current:
sub_chunks.append({
"text": " ".join(current),
"units": current,
"size": sum(len(s) for s in current)
})
return sub_chunks