
在 RAG 系统中,即便采用性能卓越的 LLM 并反复打磨 Prompt,问答仍可能出现上下文缺失、事实性错误或拼接不连贯等问题。多数团队会频繁更换检索算法与 Embedding模型,但收益常常有限。真正的瓶颈,往往潜伏在数据入库之前的一个细节——文档分块(chunking)。不当的分块会破坏语义边界,拆散关键线索并与噪声纠缠,使被检索的片段呈现“顺序错乱、信息残缺”的面貌。在这样的输入下,再强大的模型也难以基于支离破碎的知识推理出完整、可靠的答案。某种意义上,分块质量几乎决定了RAG的性能上限——它决定知识是以连贯的上下文呈现,还是退化为无法拼合的碎片。
在实际场景中,最常见的错误是按固定长度生硬切割,忽略文档的结构与语义:定义与信息被切开、表头与数据分离、步骤说明被截断、代码与注释脱节,结果就是召回命中却无法支撑结论,甚至诱发幻觉与错误引用。相反,高质量的分块应尽量贴合自然边界(标题、段落、列表、表格、代码块等),以适度重叠保持上下文连续,并保留必要的来源与章节元数据,确保可追溯与重排可用。当分块尊重文档的叙事与结构时,检索的相关性与答案的事实一致性往往显著提升,远胜于一味更换向量模型或调参;换言之,想要真正改善 RAG 的稳健性与上限,首先要把“知识如何被切开并呈现给模型”这件事做好。
PS:本文主要是针对中文文档类型的嵌入进行实战。
分块是将大块文本分解成较小段落的过程,这使得文本数据更易于管理和处理。通过分块,我们能够更高效地进行内容嵌入(embedding),并显著提升从向量数据库中召回内容的相关性和准确性。
在实际操作中,分块的好处是多方面的。首先,它能够提高模型处理的效率,因为较小的文本段落更容易进行嵌入和检索。
其次,分块后的文本能够更精确地匹配用户查询,从而提供更相关的搜索结果。这对于需要高精度信息检索和内容生成的应用程序尤为重要。
通过优化内容的分块和嵌入策略,我们可以最大化LLM在各种应用场景中的性能。分块技术不仅提高了内容召回的准确性,还提升了整体系统的响应速度和用户体验。
因此,在构建和优化基于LLM的应用程序时,理解和应用分块技术是不可或缺的步骤。
分块过程中主要的两个概念:chunk_size块的大小,chunk_overlap重叠窗口。

总之理想的分块是在“上下文完整性”和“信息密度”之间取得动态平衡:chunk_size决定信息承载量,chunk_overlap 用于弥补边界断裂并维持语义连续。只要边界对齐语义、粒度贴合内容,检索与生成的质量就能提升。

基于固定长度分块
from langchain_text_splitters import CharacterTextSplitter
splitter = CharacterTextSplitter(
separator="", # 纯按长度切
chunk_size=600, # 依据实验与模型上限调整
chunk_overlap=90, # 15% 重叠
)
chunks = splitter.split_text(text)基于句子的分块
import re
def split_sentences_zh(text: str):
# 在句末标点(。!?;)后面带可选引号的场景断句
pattern = re.compile(r'([^。!?;]*[。!?;]+|[^。!?;]+$)')
sentences = [m.group(0).strip() for m in pattern.finditer(text) if m.group(0).strip()]
return sentences
def sentence_chunk(text: str, chunk_size=600, overlap=80):
sents = split_sentences_zh(text)
chunks, buf = [], ""
for s in sents:
if len(buf) + len(s) <= chunk_size:
buf += s
else:
if buf:
chunks.append(buf)
# 简单重叠:从当前块尾部截取 overlap 字符与下一句拼接
buf = (buf[-overlap:] if overlap > 0 and len(buf) > overlap else "") + s
if buf:
chunks.append(buf)
return chunks
chunks = sentence_chunk(text, chunk_size=600, overlap=90)HanLP 分句示例:
from hanlp_common.constant import ROOT
import hanlp
tokenizer = hanlp.load('PKU_NAME_MERGED_SIX_MONTHS_CONVSEG') # 或句法/句子级管线
# HanLP 高层 API 通常通过句法/语料管线获得句子边界,具体以所用版本 API 为准
# 将句子列表再做聚合为 chunk_size基于递归字符分块
import re
from langchain_text_splitters import RecursiveCharacterTextSplitter
separators = [
r"\n#{1,6}\s", # 标题
r"\n\d+(?:\.\d+)*\s", # 数字编号标题 1. / 2.3. 等
"\n\n", # 段落
"\n", # 行
" ", # 空格
"", # 兜底字符级
]
splitter = RecursiveCharacterTextSplitter(
separators=separators,
chunk_size=700,
chunk_overlap=100,
is_separator_regex=True, # 告诉分割器上面包含正则
)
chunks = splitter.split_text(text)总结
利用文档固有结构(标题层级、列表、代码块、表格、对话轮次)作为分块边界,逻辑清晰、可追溯性强,能在保证上下文完整性的同时提升检索信噪比。
结构化文本分块
import re
from typing import List, Dict
heading_pat = re.compile(r'^(#{1,6})\s+(.*)$') # 标题
fence_pat = re.compile(r'^```') # fenced code fence
def split_markdown_structure(text: str, chunk_size=900, min_chunk=250, overlap_ratio=0.1) -> List[Dict]:
lines = text.splitlines()
sections = []
in_code = False
current = {"level": 0, "title": "", "content": [], "path": []}
path_stack = [] # [(level, title)]
for ln in lines:
if fence_pat.match(ln):
in_code = not in_code
m = heading_pat.match(ln) if not in_code else None
if m:
if current["content"]:
sections.append(current)
level = len(m.group(1))
title = m.group(2).strip()
while path_stack and path_stack[-1][0] >= level:
path_stack.pop()
path_stack.append((level, title))
breadcrumbs = [t for _, t in path_stack]
current = {"level": level, "title": title, "content": [], "path": breadcrumbs}
else:
current["content"].append(ln)
if current["content"]:
sections.append(current)
# 通过二次拆分/合并将部分平铺成块
chunks = []
def emit_chunk(text_block: str, path: List[str], level: int):
chunks.append({
"text": text_block.strip(),
"meta": {
"section_title": path[-1] if path else "",
"breadcrumbs": path,
"section_level": level,
}
})
for sec in sections:
raw = "\n".join(sec["content"]).strip()
if not raw:
continue
if len(raw) <= chunk_size:
emit_chunk(raw, sec["path"], sec["level"])
else:
paras = [p.strip() for p in raw.split("\n\n") if p.strip()]
buf = ""
for p in paras:
if len(buf) + len(p) + 2 <= chunk_size:
buf += (("\n\n" + p) if buf else p)
else:
if buf:
emit_chunk(buf, sec["path"], sec["level"])
buf = p
if buf:
emit_chunk(buf, sec["path"], sec["level"])
merged = []
for ch in chunks:
if not merged:
merged.append(ch)
continue
if len(ch["text"]) < min_chunk and merged[-1]["meta"]["breadcrumbs"] == ch["meta"]["breadcrumbs"]:
merged[-1]["text"] += "\n\n" + ch["text"]
else:
merged.append(ch)
overlap = int(chunk_size * overlap_ratio)
for ch in merged:
bc = " > ".join(ch["meta"]["breadcrumbs"][-3:])
prefix = f"[{bc}]\n" if bc else ""
if prefix and not ch["text"].startswith(prefix):
ch["text"] = prefix + ch["text"]
# optional character overlap can在检索阶段用邻接聚合替代,这里略
return merged对话式分块
from typing import List, Dict
def chunk_dialogue(turns: List[Dict], max_turns=10, max_chars=900, overlap_turns=2):
"""
turns: [{"speaker":"User","text":"..." , "ts_start":123, "ts_end":130}, ...]
"""
chunks = []
i = 0
while i < len(turns):
j = i
char_count = 0
speakers = set()
while j < len(turns):
t = turns[j]
uttr_len = len(t["text"])
# 若单条超长,允许在句级二次切分(此处略),但不跨 speaker
if (j - i + 1) > max_turns or (char_count + uttr_len) > max_chars:
break
char_count += uttr_len
speakers.add(t["speaker"])
j += 1
if j > i:
window = turns[i:j]
elif i < len(turns):
window = [turns[i]]
else:
break
text = "\n".join([f'{t["speaker"]}: {t["text"]}' for t in window])
meta = {
"speakers": list(speakers),
"turns_range": (i, j - 1),
"ts_start": window[0].get("ts_start"),
"ts_end": window[-1].get("ts_end"),
}
chunks.append({"text": text, "meta": meta})
# 按轮次重叠回退
if j >= len(turns):
break
next_start = i + len(window) - overlap_turns
i = max(next_start, i + 1) # 确保至少前进1步
return chunks总结
该方法不依赖文档的物理结构,而是依据语义连续性与话题转移来决定切分点,尤其适合希望“块内高度内聚、块间清晰分界”的知识库与研究类文本。
语义分块
from typing import List, Dict, Tuple
import numpy as np
from sentence_transformers import SentenceTransformer
import re
def split_sentences_zh(text: str) -> List[str]:
# 简易中文分句,可替换为 HanLP/Stanza 更稳健的实现
pattern = re.compile(r'([^。!?;]*[。!?;]+|[^。!?;]+$)')
return [m.group(0).strip() for m in pattern.finditer(text) if m.group(0).strip()]
def rolling_mean(vecs: np.ndarray, i: int, w: int) -> np.ndarray:
s = max(0, i - w)
e = min(len(vecs), i + w + 1)
return vecs[s:e].mean(axis=0)
def semantic_chunk(
text: str,
model_name: str = "BAAI/bge-m3",
window_size: int = 2,
min_chars: int = 350,
max_chars: int = 1100,
lambda_std: float = 0.8,
overlap_chars: int = 80,
) -> List[Dict]:
sents = split_sentences_zh(text)
if not sents:
return []
model = SentenceTransformer(model_name)
emb = model.encode(sents, normalize_embeddings=True, batch_size=64, show_progress_bar=False)
emb = np.asarray(emb)
# 基于窗口均值的“新颖度”分数
novelties = []
for i in range(len(sents)):
ref = rolling_mean(emb, i-1, window_size) if i > 0 else emb[0]
ref = ref / (np.linalg.norm(ref) + 1e-8)
novelty = 1.0 - float(np.dot(emb[i], ref))
novelties.append(novelty)
novelties = np.array(novelties)
# 相对阈值:μ + λσ
mu, sigma = float(novelties.mean()), float(novelties.std() + 1e-8)
threshold = mu + lambda_std * sigma
chunks, buf, start_idx = [], "", 0
def flush(end_idx: int):
nonlocal buf, start_idx
if buf.strip():
chunks.append({
"text": buf.strip(),
"meta": {"start_sent": start_idx, "end_sent": end_idx-1}
})
buf, start_idx = "", end_idx
for i, s in enumerate(sents):
# 若超长则先冲洗
if len(buf) + len(s) > max_chars and len(buf) >= min_chars:
flush(i)
# 结构化重叠:附加上一个块的尾部
if overlap_chars > 0 and len(s) < overlap_chars:
buf = s
continue
buf += s
# 达到最小长度后遇到突变则切分
if len(buf) >= min_chars and novelties[i] > threshold:
flush(i + 1)
if buf:
flush(len(sents))
return chunks主题的分块
from typing import List, Dict
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.cluster import KMeans
import re
def split_sentences_zh(text: str) -> List[str]:
pattern = re.compile(r'([^。!?;]*[。!?;]+|[^。!?;]+$)')
return [m.group(0).strip() for m in pattern.finditer(text) if m.group(0).strip()]
def topic_chunk(
text: str,
k_topics: int = 5,
min_chars: int = 500,
max_chars: int = 1400,
smooth_window: int = 2,
model_name: str = "BAAI/bge-m3"
) -> List[Dict]:
sents = split_sentences_zh(text)
if not sents:
return []
model = SentenceTransformer(model_name)
emb = model.encode(sents, normalize_embeddings=True, batch_size=64, show_progress_bar=False)
emb = np.asarray(emb)
km = KMeans(n_clusters=k_topics, n_init="auto", random_state=42)
labels = km.fit_predict(emb)
# 简单序列平滑:滑窗多数投票
smoothed = labels.copy()
for i in range(len(labels)):
s = max(0, i - smooth_window)
e = min(len(labels), i + smooth_window + 1)
window = labels[s:e]
vals, counts = np.unique(window, return_counts=True)
smoothed[i] = int(vals[np.argmax(counts)])
chunks, buf, start_idx, cur_label = [], "", 0, smoothed[0]
def flush(end_idx: int):
nonlocal buf, start_idx
if buf.strip():
chunks.append({
"text": buf.strip(),
"meta": {"start_sent": start_idx, "end_sent": end_idx-1, "topic": int(cur_label)}
})
buf, start_idx = "", end_idx
for i, s in enumerate(sents):
switched = smoothed[i] != cur_label
over_max = len(buf) + len(s) > max_chars
under_min = len(buf) < min_chars
# 尝试延后切分,保证最小块长
if switched and not under_min:
flush(i)
cur_label = smoothed[i]
if over_max and not under_min:
flush(i)
buf += s
if buf:
flush(len(sents))
return chunks小-大分块
# 离线:构建小块索引,并保存 parent_id -> 大块文本 的映射
# 在线检索:
small_hits = small_index.search(embed(query), top_k=30)
groups = group_by_parent(small_hits)
scored_parents = score_groups(groups, agg="max")
candidates = top_m(scored_parents, m=3)
# 交叉编码重排
rerank_inputs = [(query, parent_text(pid)) for pid in candidates]
reranked = cross_encoder_rerank(rerank_inputs)
# 组装上下文:对每个父块,仅保留命中句及其邻近窗口,并加上标题路径
contexts = []
for pid, _ in reranked:
hits = groups[pid]
context = build_local_window(parent_text(pid), hits, window_sents=1)
contexts.append(prefix_with_breadcrumbs(pid) + context)
final_context = pack_under_budget(contexts, token_budget=3000) # 留出回答空间父子段分块
from typing import List, Dict, Tuple
import numpy as np
from sentence_transformers import SentenceTransformer
embedder = SentenceTransformer("BAAI/bge-m3")
def search_parent_child(query: str, top_k_child=40, top_m_parent=3, window_chars=180):
q = embedder.encode([query], normalize_embeddings=True)[0]
hits = small_index.search(q, top_k=top_k_child) # 返回 [(child_id, score), ...]
# 分组
groups: Dict[str, List[Tuple[str, float]]] = {}
for cid, score in hits:
p = child_parent_id[cid]
groups.setdefault(p, []).append((cid, float(score)))
# 聚合打分(max + coverage)
scored = []
for pid, items in groups.items():
scores = np.array([s for _, s in items])
agg = 0.7 * scores.max() + 0.3 * (len(items) / (len(parents[pid]["sent_spans"]) + 1e-6))
scored.append((pid, float(agg)))
scored.sort(key=lambda x: x[1], reverse=True)
candidates = [pid for pid, _ in scored[:top_m_parent]]
# 为每个父块构造“命中窗口”
contexts = []
for pid in candidates:
ptext = parents[pid]["text"]
# 找到子块命中区间并合并窗口
spans = sorted([(children[cid]["start"], children[cid]["end"]) for cid, _ in groups[pid]])
merged = []
for s, e in spans:
s = max(0, s - window_chars)
e = min(len(ptext), e + window_chars)
if not merged or s > merged[-1][1] + 50:
merged.append([s, e])
else:
merged[-1][1] = max(merged[-1][1], e)
windows = [ptext[s:e] for s, e in merged]
prefix = " > ".join(parents[pid]["meta"].get("breadcrumbs", [])[-3:])
contexts.append((pid, f"[{prefix}]\n" + "\n...\n".join(windows)))
# 交叉编码重排(此处用占位函数)
reranked = cross_encoder_rerank(query, [c[1] for c in contexts]) # 返回 indices 顺序
ordered = [contexts[i] for i in reranked]
return ordered # [(parent_id, context_text), ...]代理式分块
系统:你是分块器。目标:为RAG检索创建高内聚、可追溯的块。规则:
1) 不得在代码/表格/公式中间切分;
2) 每块400-1000字;
3) 保持标题路径完整;
4) 尽量让“定义+解释”在同一块;
5) 输出JSON,含 start_offset/end_offset/title_path。
用户:<文档片段文本>
助手(示例输出):
{
"segments": [
{"start": 0, "end": 812, "title_path": ["指南","安装"], "reason": "完整步骤+注意事项"},
{"start": 813, "end": 1620, "title_path": ["指南","配置"], "reason": "参数表与示例紧密相关"}
]
}单一策略难覆盖所有文档与场景。混合分块通过“先粗后细、按需细化”,在效率、可追溯性与答案质量之间取得稳健平衡。
from typing import List, Dict
def hybrid_chunk(
doc_text: str,
parse_structure, # 函数:返回 [{'type': 'text|code|table|dialogue', 'text': str, 'breadcrumbs': [...], 'anchor': str}]
recursive_splitter, # 函数:text -> [{'text': str}]
sentence_splitter, # 函数:text -> [{'text': str}]
semantic_splitter, # 函数:text -> [{'text': str}]
dialogue_splitter, # 函数:turns(list) -> [{'text': str}],若无对话则忽略
max_coarse_len: int = 1100,
min_chunk_len: int = 320,
target_len: int = 750,
overlap_ratio: float = 0.1,
) -> List[Dict]:
"""
返回格式: [{'text': str, 'meta': {...}}]
"""
blocks = parse_structure(doc_text) # 先拿到结构块
chunks: List[Dict] = []
def emit(t: str, meta_base: Dict):
t = t.strip()
if not t:
return
# 结构重叠前缀(标题路径)
bc = " > ".join(meta_base.get("breadcrumbs", [])[-3:])
prefix = f"[{bc}]\n" if bc else ""
chunks.append({
"text": (prefix + t) if not t.startswith(prefix) else t,
"meta": meta_base
})
for b in blocks:
t = b["text"]
btype = b.get("type", "text")
# 原子块:代码/表格
if btype in {"code", "table", "formula"}:
emit(t, {**b, "splitter": "atomic"})
continue
# 对话块
if btype == "dialogue":
for ck in dialogue_splitter(b.get("turns", [])):
emit(ck["text"], {**b, "splitter": "dialogue"})
continue
# 普通文本:依据长度与“可读性”启用不同细分器
if len(t) <= max_coarse_len:
# 中短文本:递归 or 句子
sub = recursive_splitter(t)
# 合并过短子块
buf = ""
for s in sub:
txt = s["text"]
if len(buf) + len(txt) < min_chunk_len:
buf += txt
else:
emit(buf or txt, {**b, "splitter": "recursive"})
buf = "" if buf else ""
if buf:
emit(buf, {**b, "splitter": "recursive"})
else:
# 超长文本:语义分块优先
for ck in semantic_splitter(t):
emit(ck["text"], {**b, "splitter": "semantic"})
# 轻量字符重叠(可选)
if overlap_ratio > 0:
overlapped = []
for i, ch in enumerate(chunks):
overlapped.append(ch)
if i + 1 < len(chunks) and ch["meta"].get("breadcrumbs") == chunks[i+1]["meta"].get("breadcrumbs"):
# 在相邻同章节块间引入小比例重叠
ov = int(len(ch["text"]) * overlap_ratio)
if ov > 0:
head = ch["text"][-ov:]
chunks[i+1]["text"] = head + chunks[i+1]["text"]
chunks = overlapped
return chunks
1. 告别数据无序:得物数据研发与管理平台的破局之路
2. 从一次启动失败深入剖析:Spring循环依赖的真相|得物技术
3. Apex AI辅助编码助手的设计和实践|得物技术
4. 从 JSON 字符串到 Java 对象:Fastjson 1.2.83 全程解析|得物技术
5. 用好 TTL Agent 不踩雷:避开内存泄露与CPU 100%两大核心坑|得物技术
关注得物技术,每周更新技术干货
要是觉得文章对你有帮助的话,欢迎评论转发点赞~
未经得物技术许可严禁转载,否则依法追究法律责任。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。