首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Doris 的 Pipeline 执行模型:从流程到原理解析

Doris 的 Pipeline 执行模型:从流程到原理解析

作者头像
数据极客圈
发布2025-11-12 13:11:46
发布2025-11-12 13:11:46
1990
举报

一、为什么要有 Pipeline 执行模型?

在传统的“火山模型”执行架构中,一个查询会为每个算子(PlanNode)或每台机器创建大量执行线程,容易出现“线程膨胀”的问题:

  • 多核利用不足:并行度需手动配置,生产环境难以调优。
  • 线程膨胀与死锁风险:每个 Instance 占用线程池中的线程,线程池满后会出现假性或逻辑死锁。
  • 阻塞算子占用资源:如 Join、Sort 等算子会长期占用线程,影响整体调度效率;线程切换开销也较高。

简而言之,火山模型在多核、混布、高并发场景下已难以支撑 Doris 的性能需求。为此,Doris 引入 Pipeline 执行模型,以流水线化、任务级调度方式充分释放多核性能潜力。

二、三层规划:Plan → Fragment → PlanNode(算子)

在理解 Pipeline 之前,需要弄清 Doris 的三层规划概念:

  • PLAN(执行计划):整个 SQL 的全局执行逻辑;
  • FRAGMENT(计划片段):在分布式场景下,PLAN 会被切分为多个可以单机执行的片段;每个 Fragment 可独立下发给某个 BE(Backend);
  • PLAN NODE(算子):执行计划中的基本执行单元,例如 Scan、Join、Agg 等。

当 FE(Frontend)把 SQL 转为 PLAN 并插入 Exchange / DataSink 等节点后,会把 PLAN 切分为若干个 FRAGMENT,分发到各个 BE。BE 收到 Fragment 后,继续把 Fragment 拆成 Pipeline 并实例化为具体任务执行。

三、Pipeline、PipelineTask、Operator:关系与职责

  • Pipeline:逻辑上的处理链,首尾由 SourceOperatorSinkOperator 包含,中间连接若干 Operator(算子)。
  • PipelineTask:Pipeline 的可执行实例。将需要处理的数据划分到若干个 Task(例如按 bucket 或分片),每个 Task 在独立线程中运行,拥有自己的 LocalState(比如本地的 HashTable、缓冲等)。
  • Operator:算子,通常对应 PlanNode;对某些“breaking”算子(如 Join、Agg、Sort)会被拆分为 Sink/Source 两端(例如 JoinBuildOperator / JoinProbeOperator,或 AggSinkOperator / AggSourceOperator)。

这样设计的好处是:同一 Pipeline 的不同 Task 使用相同的算子逻辑代码,但持有不同的本地状态,从而实现密集并行处理。

四、算子拆分:为什么把 Join、Agg、Sort 分为 Sink/Source?

某些算子需要预先收集全部上游数据(或至少一端数据)才能继续后续计算,我们称之为 breaking operator。把它们拆为 Sink(负责收集/写入)和 Source(负责读取/后续处理)两部分,能让流水线继续推进:

  • Join:先由 JoinBuild(Sink)构建 HashTable,再由 JoinProbe(Source)做探测;
  • Agg:先由 AggSink 聚合输入并生成聚合表,再由 AggSource 输出聚合结果给下游;
  • Sort:类似,SortSink 收集并排序,SortSource 提供排序后的流。

拆分的本质:把需要“全局一致视图”的工作收口到 Sink 端,再由 Source 端消费结果,保证数据一致性的同时仍能并行化处理。

五、Pipeline 的依赖与调度

多个 Pipeline 之间不是完全独立的:例如 Hash Join 的 Build 阶段和 Probe 阶段属于两个 Pipeline,Probe 必须等待 Build 完成。Doris 使用 Dependency 机制表达这种关系:

  • Pipeline-Build 执行完毕后会调用 set_ready 通知依赖方;
  • 被依赖 Pipeline 在收到通知后,才会启动对应的 PipelineTask;
  • 每个 PipelineTask 被提交到线程池中执行,从而实现多核并行。

这种触发式调度(event-driven)把控制逻辑从线程数的粗暴增长中解耦出来,使得资源利用更可控。

六、Scan 并行化:Scanner 与 DataQueue

扫描是 IO 密集型的操作。Doris 在 ScanOperator 中引入了动态生成多个 Scanner的机制:

  • 每个 Scanner 负责扫描约 100W–200W 行的数据并做本地的解压、过滤等工作;
  • Scanner 把结果写入 DataQueue,由 ScanOperator 按需读取并送入 Pipeline;
  • 多个 Scanner 并发工作可以规避单一分桶/文件导致的热点和倾斜问题,从而显著降低查询延时。

实战建议:在表设计与分桶(bucket)策略上尽量避免极端倾斜,结合 Scan 并行化能获得更稳定的查询性能。

七、Local Shuffle:解决执行时的数据倾斜

在分布式执行中,数据倾斜会导致某些 Task 处理大量数据,从而拖慢整体查询。Doris 引入 Local Exchange 在本机做局部重分发:

  • Local Exchange 在 Pipeline 内部作为一个 Pipeline Breaker
  • 它把上游输出以 Hash / Round-Robin 等策略均匀分发到下游的所有 Task,解决执行过程中数据倾斜问题;
  • 举例说明:原先三个 Task 读到的数据行数为 (1,1,7),经过 Local Exchange 后变成 (3,3,3),显著降低了倾斜。

Local Exchange 会在 Planner 中根据规则决定是否插入(例如针对耗时的 Join、聚合、窗口函数等算子),这是一个权衡:引入额外的数据重分发成本,但能有效降低最坏任务延迟。

八、调优要点与故障定位思路

  1. 观察瓶颈属于 CPU 还是 IO
    • 若是 IO 瓶颈(磁盘/网络高延时),关注 Scan 并行度、数据布置(bucket)、远程存储(S3/HDFS)读写性能;
    • 若是 CPU 瓶颈,检查是否存在过多小任务导致上下文切换或某些 Task 计算量不均衡。
  2. 检查是否有数据倾斜
    • 在慢查询中看每个 Task 的处理量分布;若极端不均衡,考虑启用或强制插入 Local Exchange,或优化分桶键。
  3. Pipeline 依赖链路问题
    • 如果 Probe 一直等待 Build,可能是 Build 侧资源被限制造成。排查 Build 阶段日志和内存/CPU 使用情况。
  4. 合理设置并发度
    • PipelineTask 的数量与线程池配置、机器核数、IO 带宽要匹配;并发度过高会反而降低吞吐。

九、总结

Doris 的 Pipeline 执行模型把执行计划在 BE 端拆解为逻辑 Pipeline,再把 Pipeline 实例化为可并行的 PipelineTask,通过算子拆分、Scan 并行化与 Local Exchange 三大手段,既保证了并行计算效率,又能有效控制线程膨胀与数据倾斜,是 MPP 查询引擎在多核、多节点下的工程化实现。

往期推荐

Doris BE节点下线卡住?快速排障技巧全攻略!

Apache Doris 索引的全面剖析与使用指南

Apache Doris 湖仓一体:打破数据边界,解锁实时分析的终极答案

Doris vs ClickHouse 企业级实时分析引擎怎么选?

Doris查询报错-230?别慌,教你几招秒解!

Doris Tablet 损坏如何应对?能恢复数据吗?

Doris 导入慢该如何排查和优化

Doris 建表与分区问题全解析

数据极客圈子介绍

圈子1

Apache Doris社区是目前国内最活跃的开源社区(之一)。Apache Doris(Apache 顶级项目) 聚集了世界全国各地的用户与开发人员,致力于打造一个内容完整、持续成长的互联网开发者学习生态圈!

如果您对Apache Doris感兴趣,可以通过以下入口访问官方网站、社区论坛、GitHub和dev邮件组:

💡官网文档:https://doris.apache.org

💡社区论坛:https://ask.selectdb.com

💡GitHub:https://github.com/apache/doris

💡dev邮件组:dev@doris.apache.org

可以加作者微信(Faith_xzc)直接进Doris官方社区群

圈子2

PowerData是由一群数据从业人员,因为热爱凝聚在一起,以开源精神为基础,组成的数据开源社区。

社区群内会定期组织模拟面试、线上分享、行业研讨、线下Meetup、城市聚会、求职内推等活动,同时在社区群内你可以进行技术讨论、问题请教,结识更多志同道合的数据朋友。

社区整理了一份每日一题汇总及社区分享PPT,内容涵盖大数据组件、编程语言、数据结构与算法、企业真实面试题等各个领域,帮助您提升自我,成功上岸。

可以加作者微信(Faith_xzc)直接进PowrData官方社区群

叮咚✨ “数据极客圈” 向你敞开大门,走对圈子跟对人,行业大咖 “唠” 数据,实用锦囊天天有,就缺你咯!快快关注数据极客圈,共同成长!

点击上方公众号关注我们

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-10-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据极客圈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、为什么要有 Pipeline 执行模型?
  • 二、三层规划:Plan → Fragment → PlanNode(算子)
  • 三、Pipeline、PipelineTask、Operator:关系与职责
  • 四、算子拆分:为什么把 Join、Agg、Sort 分为 Sink/Source?
  • 五、Pipeline 的依赖与调度
  • 六、Scan 并行化:Scanner 与 DataQueue
  • 七、Local Shuffle:解决执行时的数据倾斜
  • 八、调优要点与故障定位思路
  • 九、总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档