pipe 执行逻辑如下
^IQueryPlanStep$
└── IQueryPlanStep [vim src/Processors/QueryPlan/IQueryPlanStep.h +70]
├── CreatingSetsStep [vim src/Processors/QueryPlan/CreatingSetsStep.h +35]
├── IntersectOrExceptStep [vim src/Processors/QueryPlan/IntersectOrExceptStep.h +9]
├── JoinStep [vim src/Processors/QueryPlan/JoinStep.h +12]
├── UnionStep [vim src/Processors/QueryPlan/UnionStep.h +8]
├── ITransformingStep [vim src/Processors/QueryPlan/ITransformingStep.h +9]
│ ├── AggregatingStep [vim src/Processors/QueryPlan/AggregatingStep.h +14]
│ ├── ArrayJoinStep [vim src/Processors/QueryPlan/ArrayJoinStep.h +10]
│ ├── CreatingSetStep [vim src/Processors/QueryPlan/CreatingSetsStep.h +12]
│ ├── CubeStep [vim src/Processors/QueryPlan/CubeStep.h +13]
│ ├── DistinctStep [vim src/Processors/QueryPlan/DistinctStep.h +9]
│ ├── ExpressionStep [vim src/Processors/QueryPlan/ExpressionStep.h +14]
│ ├── ExtremesStep [vim src/Processors/QueryPlan/ExtremesStep.h +7]
│ ├── FilledJoinStep [vim src/Processors/QueryPlan/JoinStep.h +37]
│ ├── FillingStep [vim src/Processors/QueryPlan/FillingStep.h +9]
│ ├── FilterStep [vim src/Processors/QueryPlan/FilterStep.h +11]
│ ├── LimitByStep [vim src/Processors/QueryPlan/LimitByStep.h +8]
│ ├── LimitStep [vim src/Processors/QueryPlan/LimitStep.h +9]
│ ├── MergingAggregatedStep [vim src/Processors/QueryPlan/MergingAggregatedStep.h +12]
│ ├── OffsetStep [vim src/Processors/QueryPlan/OffsetStep.h +9]
│ ├── RollupStep [vim src/Processors/QueryPlan/RollupStep.h +12]
│ ├── SettingQuotaAndLimitsStep [vim src/Processors/QueryPlan/SettingQuotaAndLimitsStep.h +21]
│ ├── SortingStep [vim src/Processors/QueryPlan/SortingStep.h +11]
│ ├── TotalsHavingStep [vim src/Processors/QueryPlan/TotalsHavingStep.h +13]
│ └── WindowStep [vim src/Processors/QueryPlan/WindowStep.h +14]
└── ISourceStep [vim src/Processors/QueryPlan/ISourceStep.h +8]
├── ReadFromMergeTree [vim src/Processors/QueryPlan/ReadFromMergeTree.h +26]
├── ReadFromRemote [vim src/Processors/QueryPlan/ReadFromRemote.h +21]
├── ReadNothingStep [vim src/Processors/QueryPlan/ReadNothingStep.h +8]
└── ReadFromPreparedSource [vim src/Processors/QueryPlan/ReadFromPreparedSource.h +9]
└── ReadFromStorageStep [vim src/Processors/QueryPlan/ReadFromPreparedSource.h +23]
^IProcessor$
└── IProcessor [vim src/Processors/IProcessor.h +110]
├── AggregatingInOrderTransform [vim src/Processors/Transforms/AggregatingInOrderTransform.h +19]
├── AggregatingTransform [vim src/Processors/Transforms/AggregatingTransform.h +102]
├── ConcatProcessor [vim src/Processors/ConcatProcessor.h +16]
├── ConvertingAggregatedToChunksTransform [vim src/Processors/Transforms/AggregatingTransform.cpp +154]
├── CopyTransform [vim src/Processors/Transforms/CopyTransform.h +9]
├── CopyingDataToViewsTransform [vim src/Processors/Transforms/buildPushingToViewsChain.cpp +71]
├── DelayedPortsProcessor [vim src/Processors/DelayedPortsProcessor.h +11]
├── DelayedSource [vim src/Processors/Sources/DelayedSource.h +17]
├── FillingRightJoinSideTransform [vim src/Processors/Transforms/JoiningTransform.h +87]
├── FinalizingViewsTransform [vim src/Processors/Transforms/buildPushingToViewsChain.cpp +148]
├── ForkProcessor [vim src/Processors/ForkProcessor.h +18]
├── GroupingAggregatedTransform [vim src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h +59]
├── IInflatingTransform [vim src/Processors/IInflatingTransform.h +21]
├── IntersectOrExceptTransform [vim src/Processors/Transforms/IntersectOrExceptTransform.h +12]
├── JoiningTransform [vim src/Processors/Transforms/JoiningTransform.h +18]
├── LimitTransform [vim src/Processors/LimitTransform.h +18]
├── OffsetTransform [vim src/Processors/OffsetTransform.h +13]
├── ResizeProcessor [vim src/Processors/ResizeProcessor.h +21]
├── SortingAggregatedTransform [vim src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h +121]
├── StrictResizeProcessor [vim src/Processors/ResizeProcessor.h +77]
├── WindowTransform [vim src/Processors/Transforms/WindowTransform.h +87]
├── IAccumulatingTransform [vim src/Processors/IAccumulatingTransform.h +13]
│ ├── BufferingToFileTransform [vim src/Processors/Transforms/MergeSortingTransform.cpp +30]
│ ├── CreatingSetsTransform [vim src/Processors/Transforms/CreatingSetsTransform.h +26]
│ ├── CubeTransform [vim src/Processors/Transforms/CubeTransform.h +11]
│ ├── MergingAggregatedTransform [vim src/Processors/Transforms/MergingAggregatedTransform.h +12]
│ ├── QueueBuffer [vim src/Processors/QueueBuffer.h +13]
│ ├── RollupTransform [vim src/Processors/Transforms/RollupTransform.h +10]
│ ├── TTLCalcTransform [vim src/Processors/Transforms/TTLCalcTransform.h +14]
│ └── TTLTransform [vim src/Processors/Transforms/TTLTransform.h +15]
├── ISimpleTransform [vim src/Processors/ISimpleTransform.h +17]
│ ├── AddingDefaultsTransform [vim src/Processors/Transforms/AddingDefaultsTransform.h +13]
│ ├── AddingSelectorTransform [vim src/Processors/Transforms/AddingSelectorTransform.h +12]
│ ├── ArrayJoinTransform [vim src/Processors/Transforms/ArrayJoinTransform.h +11]
│ ├── CheckSortedTransform [vim src/Processors/Transforms/CheckSortedTransform.h +12]
│ ├── DistinctSortedTransform [vim src/Processors/Transforms/DistinctSortedTransform.h +21]
│ ├── DistinctTransform [vim src/Processors/Transforms/DistinctTransform.h +10]
│ ├── ExpressionTransform [vim src/Processors/Transforms/ExpressionTransform.h +18]
│ ├── ExtremesTransform [vim src/Processors/Transforms/ExtremesTransform.h +7]
│ ├── FillingTransform [vim src/Processors/Transforms/FillingTransform.h +13]
│ ├── FilterTransform [vim src/Processors/Transforms/FilterTransform.h +18]
│ ├── FinalizeAggregatedTransform [vim src/Processors/Transforms/AggregatingInOrderTransform.h +78]
│ ├── LimitByTransform [vim src/Processors/Transforms/LimitByTransform.h +10]
│ ├── LimitsCheckingTransform [vim src/Processors/Transforms/LimitsCheckingTransform.h +26]
│ ├── MaterializingTransform [vim src/Processors/Transforms/MaterializingTransform.h +8]
│ ├── MergingAggregatedBucketTransform [vim src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h +105]
│ ├── PartialSortingTransform [vim src/Processors/Transforms/PartialSortingTransform.h +13]
│ ├── ReplacingWindowColumnTransform [vim src/Processors/Transforms/ReplacingWindowColumnTransform.h +13]
│ ├── ReverseTransform [vim src/Processors/Transforms/ReverseTransform.h +8]
│ ├── SendingChunkHeaderTransform [vim src/Processors/Sources/ShellCommandSource.cpp +429]
│ ├── TotalsHavingTransform [vim src/Processors/Transforms/TotalsHavingTransform.h +23]
│ ├── TransformWithAdditionalColumns [vim src/Dictionaries/DictionarySourceHelpers.h +40]
│ └── WatermarkTransform [vim src/Processors/Transforms/WatermarkTransform.h +10]
├── ISink [vim src/Processors/ISink.h +9]
│ ├── EmptySink [vim src/Processors/Sinks/EmptySink.h +8]
│ ├── ExternalTableDataSink [vim src/Client/Connection.cpp +682]
│ ├── NullSink [vim src/Processors/Sinks/NullSink.h +8]
│ └── ODBCSink [vim programs/odbc-bridge/ODBCBlockOutputStream.h +14]
├── SortingTransform [vim src/Processors/Transforms/SortingTransform.h +63]
│ ├── FinishSortingTransform [vim src/Processors/Transforms/FinishSortingTransform.h +10]
│ └── MergeSortingTransform [vim src/Processors/Transforms/MergeSortingTransform.h +17]
├── IMergingTransformBase [vim src/Processors/Merges/IMergingTransform.h +12]
│ └── IMergingTransform [vim src/Processors/Merges/IMergingTransform.h +77]
│ ├── AggregatingSortedTransform [vim src/Processors/Merges/AggregatingSortedTransform.h +12]
│ ├── CollapsingSortedTransform [vim src/Processors/Merges/CollapsingSortedTransform.h +10]
│ ├── ColumnGathererTransform [vim src/Processors/Transforms/ColumnGathererTransform.h +105]
│ ├── FinishAggregatingInOrderTransform [vim src/Processors/Merges/FinishAggregatingInOrderTransform.h +12]
│ ├── GraphiteRollupSortedTransform [vim src/Processors/Merges/GraphiteRollupSortedTransform.h +10]
│ ├── MergingSortedTransform [vim src/Processors/Merges/MergingSortedTransform.h +11]
│ ├── ReplacingSortedTransform [vim src/Processors/Merges/ReplacingSortedTransform.h +11]
│ ├── SummingSortedTransform [vim src/Processors/Merges/SummingSortedTransform.h +10]
│ └── VersionedCollapsingTransform [vim src/Processors/Merges/VersionedCollapsingTransform.h +11]
├── ExceptionKeepingTransform [vim src/Processors/Transforms/ExceptionKeepingTransform.h +24]
│ ├── CheckConstraintsTransform [vim src/Processors/Transforms/CheckConstraintsTransform.h +15]
│ ├── ConvertingTransform [vim src/Processors/Transforms/ExpressionTransform.h +36]
│ ├── CountingTransform [vim src/Processors/Transforms/CountingTransform.h +14]
│ ├── ExecutingInnerQueryFromViewTransform [vim src/Processors/Transforms/buildPushingToViewsChain.cpp +86]
│ ├── SquashingChunksTransform [vim src/Processors/Transforms/SquashingChunksTransform.h +9]
│ └── SinkToStorage [vim src/Processors/Sinks/SinkToStorage.h +9]
│ ├── BufferSink [vim src/Storages/StorageBuffer.cpp +535]
│ ├── DistributedSink [vim src/Storages/Distributed/DistributedSink.h +39]
│ ├── EmbeddedRocksDBSink [vim src/Storages/RocksDB/EmbeddedRocksDBSink.h +13]
│ ├── HDFSSink [vim src/Storages/HDFS/StorageHDFS.cpp +438]
│ ├── KafkaSink [vim src/Storages/Kafka/KafkaBlockOutputStream.h +12]
│ ├── LiveViewSink [vim src/Storages/LiveView/LiveViewSink.h +12]
│ ├── LogSink [vim src/Storages/StorageLog.cpp +262]
│ ├── MemorySink [vim src/Storages/StorageMemory.cpp +108]
│ ├── MergeTreeSink [vim src/Storages/MergeTree/MergeTreeSink.h +14]
│ ├── NullSinkToStorage [vim src/Processors/Sinks/SinkToStorage.h +36]
│ ├── PostgreSQLSink [vim src/Storages/StoragePostgreSQL.cpp +109]
│ ├── PushingToLiveViewSink [vim src/Processors/Transforms/buildPushingToViewsChain.cpp +117]
│ ├── PushingToWindowViewSink [vim src/Processors/Transforms/buildPushingToViewsChain.cpp +131]
│ ├── RabbitMQSink [vim src/Storages/RabbitMQ/RabbitMQSink.h +13]
│ ├── RemoteSink [vim src/Processors/Sinks/RemoteSink.h +8]
│ ├── ReplicatedMergeTreeSink [vim src/Storages/MergeTree/ReplicatedMergeTreeSink.h +22]
│ ├── SQLiteSink [vim src/Storages/StorageSQLite.cpp +87]
│ ├── SetOrJoinSink [vim src/Storages/StorageSet.cpp +37]
│ ├── StorageFileSink [vim src/Storages/StorageFile.cpp +676]
│ ├── StorageMySQLSink [vim src/Storages/StorageMySQL.cpp +114]
│ ├── StorageS3Sink [vim src/Storages/StorageS3.cpp +377]
│ ├── StorageURLSink [vim src/Storages/StorageURL.h +103]
│ ├── StripeLogSink [vim src/Storages/StorageStripeLog.cpp +166]
│ └── PartitionedSink [vim src/Storages/PartitionedSink.h +14]
│ ├── PartitionedHDFSSink [vim src/Storages/HDFS/StorageHDFS.cpp +480]
│ ├── PartitionedStorageFileSink [vim src/Storages/StorageFile.cpp +799]
│ ├── PartitionedStorageS3Sink [vim src/Storages/StorageS3.cpp +436]
│ └── PartitionedStorageURLSink [vim src/Storages/StorageURL.cpp +345]
├── IOutputFormat [vim src/Processors/Formats/IOutputFormat.h +23]
│ ├── ArrowBlockOutputFormat [vim src/Processors/Formats/Impl/ArrowBlockOutputFormat.h +18]
│ ├── LazyOutputFormat [vim src/Processors/Formats/LazyOutputFormat.h +13]
│ ├── MySQLOutputFormat [vim src/Processors/Formats/Impl/MySQLOutputFormat.h +20]
│ ├── NativeOutputFormat [vim src/Processors/Formats/Impl/NativeFormat.cpp +55]
│ ├── NullOutputFormat [vim src/Processors/Formats/Impl/NullFormat.h +7]
│ ├── ODBCDriver2BlockOutputFormat [vim src/Processors/Formats/Impl/ODBCDriver2BlockOutputFormat.h +20]
│ ├── ORCBlockOutputFormat [vim src/Processors/Formats/Impl/ORCBlockOutputFormat.h +34]
│ ├── ParallelFormattingOutputFormat [vim src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h +54]
│ ├── ParquetBlockOutputFormat [vim src/Processors/Formats/Impl/ParquetBlockOutputFormat.h +27]
│ ├── PostgreSQLOutputFormat [vim src/Processors/Formats/Impl/PostgreSQLOutputFormat.h +13]
│ ├── PullingOutputFormat [vim src/Processors/Formats/PullingOutputFormat.h +9]
│ ├── TemplateBlockOutputFormat [vim src/Processors/Formats/Impl/TemplateBlockOutputFormat.h +13]
│ ├── PrettyBlockOutputFormat [vim src/Processors/Formats/Impl/PrettyBlockOutputFormat.h +17]
│ │ ├── PrettyCompactBlockOutputFormat [vim src/Processors/Formats/Impl/PrettyCompactBlockOutputFormat.h +13]
│ │ └── PrettySpaceBlockOutputFormat [vim src/Processors/Formats/Impl/PrettySpaceBlockOutputFormat.h +11]
│ └── IRowOutputFormat [vim src/Processors/Formats/IRowOutputFormat.h +24]
│ ├── AvroRowOutputFormat [vim src/Processors/Formats/Impl/AvroRowOutputFormat.h +46]
│ ├── BinaryRowOutputFormat [vim src/Processors/Formats/Impl/BinaryRowOutputFormat.h +17]
│ ├── CSVRowOutputFormat [vim src/Processors/Formats/Impl/CSVRowOutputFormat.h +17]
│ ├── CapnProtoRowOutputFormat [vim src/Processors/Formats/Impl/CapnProtoRowOutputFormat.h +26]
│ ├── CustomSeparatedRowOutputFormat [vim src/Processors/Formats/Impl/CustomSeparatedRowOutputFormat.h +11]
│ ├── JSONCompactEachRowRowOutputFormat [vim src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h +15]
│ ├── MarkdownRowOutputFormat [vim src/Processors/Formats/Impl/MarkdownRowOutputFormat.h +12]
│ ├── MsgPackRowOutputFormat [vim src/Processors/Formats/Impl/MsgPackRowOutputFormat.h +18]
│ ├── ProtobufRowOutputFormat [vim src/Processors/Formats/Impl/ProtobufRowOutputFormat.h +29]
│ ├── RawBLOBRowOutputFormat [vim src/Processors/Formats/Impl/RawBLOBRowOutputFormat.h +27]
│ ├── ValuesRowOutputFormat [vim src/Processors/Formats/Impl/ValuesRowOutputFormat.h +15]
│ ├── VerticalRowOutputFormat [vim src/Processors/Formats/Impl/VerticalRowOutputFormat.h +18]
│ ├── XMLRowOutputFormat [vim src/Processors/Formats/Impl/XMLRowOutputFormat.h +16]
│ ├── JSONEachRowRowOutputFormat [vim src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h +15]
│ │ └── JSONEachRowWithProgressRowOutputFormat [vim src/Processors/Formats/Impl/JSONEachRowWithProgressRowOutputFormat.h +8]
│ ├── JSONRowOutputFormat [vim src/Processors/Formats/Impl/JSONRowOutputFormat.h +16]
│ │ └── JSONCompactRowOutputFormat [vim src/Processors/Formats/Impl/JSONCompactRowOutputFormat.h +16]
│ └── TabSeparatedRowOutputFormat [vim src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.h +15]
│ └── TSKVRowOutputFormat [vim src/Processors/Formats/Impl/TSKVRowOutputFormat.h +14]
└── ISource [vim src/Processors/ISource.h +9]
├── ConvertingAggregatedToChunksSource [vim src/Processors/Transforms/AggregatingTransform.cpp +88]
├── MergeSorterSource [vim src/Processors/Transforms/SortingTransform.h +45]
├── NullSource [vim src/Processors/Sources/NullSource.h +8]
├── ODBCSource [vim programs/odbc-bridge/ODBCBlockInputStream.h +13]
├── PushingAsyncSource [vim src/Processors/Executors/PushingAsyncPipelineExecutor.cpp +17]
├── PushingSource [vim src/Processors/Executors/PushingPipelineExecutor.cpp +15]
├── RemoteExtremesSource [vim src/Processors/Sources/RemoteSource.h +70]
├── RemoteTotalsSource [vim src/Processors/Sources/RemoteSource.h +54]
├── SourceFromNativeStream [vim src/Processors/Transforms/AggregatingTransform.cpp +53]
├── TemporaryFileLazySource [vim src/Processors/Sources/TemporaryFileLazySource.h +10]
├── WaitForAsyncInsertSource [vim src/Processors/Sources/WaitForAsyncInsertSource.h +11]
├── ISourceWithProgress [vim src/Processors/Sources/SourceWithProgress.h +16]
│ └── SourceWithProgress [vim src/Processors/Sources/SourceWithProgress.h +48]
│ ├── BlocksListSource [vim src/Processors/Sources/BlocksListSource.h +12]
│ ├── BlocksSource [vim src/Processors/Sources/BlocksSource.h +23]
│ ├── BufferSource [vim src/Storages/StorageBuffer.cpp +146]
│ ├── CassandraSource [vim src/Dictionaries/CassandraSource.h +14]
│ ├── ColumnsSource [vim src/Storages/System/StorageSystemColumns.cpp +67]
│ ├── DDLQueryStatusSource [vim src/Interpreters/executeDDLQueryOnCluster.cpp +174]
│ ├── DataSkippingIndicesSource [vim src/Storages/System/StorageSystemDataSkippingIndices.cpp +35]
│ ├── DictionarySource [vim src/Dictionaries/DictionarySource.cpp +14]
│ ├── DirectoryMonitorSource [vim src/Storages/Distributed/DirectoryMonitor.cpp +908]
│ ├── EmbeddedRocksDBSource [vim src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp +180]
│ ├── FileLogSource [vim src/Storages/FileLog/FileLogSource.h +13]
│ ├── GenerateSource [vim src/Storages/StorageGenerateRandom.cpp +379]
│ ├── HDFSSource [vim src/Storages/HDFS/StorageHDFS.h +85]
│ ├── JoinSource [vim src/Storages/StorageJoin.cpp +380]
│ ├── KafkaSource [vim src/Storages/Kafka/KafkaSource.h +16]
│ ├── LiveViewEventsSource [vim src/Storages/LiveView/LiveViewEventsSource.h +30]
│ ├── LiveViewSource [vim src/Storages/LiveView/LiveViewSource.h +14]
│ ├── LogSource [vim src/Storages/StorageLog.cpp +57]
│ ├── MemorySource [vim src/Storages/StorageMemory.cpp +30]
│ ├── MergeTreeSequentialSource [vim src/Storages/MergeTree/MergeTreeSequentialSource.h +12]
│ ├── MongoDBSource [vim src/Processors/Transforms/MongoDBSource.h +25]
│ ├── NumbersMultiThreadedSource [vim src/Storages/System/StorageSystemNumbers.cpp +64]
│ ├── NumbersSource [vim src/Storages/System/StorageSystemNumbers.cpp +17]
│ ├── RabbitMQSource [vim src/Storages/RabbitMQ/RabbitMQSource.h +11]
│ ├── RedisSource [vim src/Dictionaries/RedisSource.h +22]
│ ├── RemoteSource [vim src/Processors/Sources/RemoteSource.h +17]
│ ├── SQLiteSource [vim src/Processors/Sources/SQLiteSource.h +15]
│ ├── ShellCommandSource [vim src/Processors/Sources/ShellCommandSource.cpp +247]
│ ├── SourceFromSingleChunk [vim src/Processors/Sources/SourceFromSingleChunk.h +8]
│ ├── StorageFileSource [vim src/Storages/StorageFile.cpp +413]
│ ├── StorageHiveSource [vim src/Storages/Hive/StorageHive.cpp +56]
│ ├── StorageInputSource [vim src/Storages/StorageInput.cpp +28]
│ ├── StorageS3Source [vim src/Storages/StorageS3.h +33]
│ ├── StorageURLSource [vim src/Storages/StorageURL.cpp +115]
│ ├── StripeLogSource [vim src/Storages/StorageStripeLog.cpp +62]
│ ├── SyncKillQuerySource [vim src/Interpreters/InterpreterKillQueryQuery.cpp +127]
│ ├── TablesBlockSource [vim src/Storages/System/StorageSystemTables.cpp +127]
│ ├── WindowViewSource [vim src/Storages/WindowView/WindowViewSource.h +10]
│ ├── ZerosSource [vim src/Storages/System/StorageSystemZeros.cpp +26]
│ ├── MySQLSource [vim src/Processors/Sources/MySQLSource.h +28]
│ │ └── MySQLWithFailoverSource [vim src/Processors/Sources/MySQLSource.h +64]
│ ├── PostgreSQLSource [vim src/Processors/Transforms/PostgreSQLSource.h +19]
│ │ └── PostgreSQLTransactionSource [vim src/Processors/Transforms/PostgreSQLSource.h +66]
│ └── MergeTreeBaseSelectProcessor [vim src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h +32]
│ ├── MergeTreeThreadSelectProcessor [vim src/Storages/MergeTree/MergeTreeThreadSelectProcessor.h +14]
│ └── MergeTreeSelectProcessor [vim src/Storages/MergeTree/MergeTreeSelectProcessor.h +16]
│ ├── MergeTreeInOrderSelectProcessor [vim src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.h +11]
│ └── MergeTreeReverseSelectProcessor [vim src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h +12]
└── IInputFormat [vim src/Processors/Formats/IInputFormat.h +30]
├── ArrowBlockInputFormat [vim src/Processors/Formats/Impl/ArrowBlockInputFormat.h +19]
├── NativeInputFormat [vim src/Processors/Formats/Impl/NativeFormat.cpp +15]
├── ORCBlockInputFormat [vim src/Processors/Formats/Impl/ORCBlockInputFormat.h +21]
├── ParallelParsingInputFormat [vim src/Processors/Formats/Impl/ParallelParsingInputFormat.h +70]
├── ParquetBlockInputFormat [vim src/Processors/Formats/Impl/ParquetBlockInputFormat.h +18]
├── ValuesBlockInputFormat [vim src/Processors/Formats/Impl/ValuesBlockInputFormat.h +20]
└── IRowInputFormat [vim src/Processors/Formats/IRowInputFormat.h +38]
├── AvroConfluentRowInputFormat [vim src/Processors/Formats/Impl/AvroRowInputFormat.h +140]
├── AvroRowInputFormat [vim src/Processors/Formats/Impl/AvroRowInputFormat.h +118]
├── CapnProtoRowInputFormat [vim src/Processors/Formats/Impl/CapnProtoRowInputFormat.h +23]
├── JSONEachRowRowInputFormat [vim src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h +21]
├── LineAsStringRowInputFormat [vim src/Processors/Formats/Impl/LineAsStringRowInputFormat.h +17]
├── MsgPackRowInputFormat [vim src/Processors/Formats/Impl/MsgPackRowInputFormat.h +59]
├── ProtobufRowInputFormat [vim src/Processors/Formats/Impl/ProtobufRowInputFormat.h +29]
├── RawBLOBRowInputFormat [vim src/Processors/Formats/Impl/RawBLOBRowInputFormat.h +16]
├── RegexpRowInputFormat [vim src/Processors/Formats/Impl/RegexpRowInputFormat.h +51]
├── TSKVRowInputFormat [vim src/Processors/Formats/Impl/TSKVRowInputFormat.h +24]
├── JSONAsRowInputFormat [vim src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h +15]
│ ├── JSONAsObjectRowInputFormat [vim src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h +56]
│ └── JSONAsStringRowInputFormat [vim src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h +43]
└── RowInputFormatWithDiagnosticInfo [vim src/Processors/Formats/RowInputFormatWithDiagnosticInfo.h +12]
├── TemplateRowInputFormat [vim src/Processors/Formats/Impl/TemplateRowInputFormat.h +18]
└── RowInputFormatWithNamesAndTypes [vim src/Processors/Formats/RowInputFormatWithNamesAndTypes.h +24]
├── BinaryRowInputFormat [vim src/Processors/Formats/Impl/BinaryRowInputFormat.h +20]
├── CustomSeparatedRowInputFormat [vim src/Processors/Formats/Impl/CustomSeparatedRowInputFormat.h +11]
├── JSONCompactEachRowRowInputFormat [vim src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h +22]
├── TabSeparatedRowInputFormat [vim src/Processors/Formats/Impl/TabSeparatedRowInputFormat.h +14]
└── CSVRowInputFormat [vim src/Processors/Formats/Impl/CSVRowInputFormat.h +18]
└── HiveTextRowInputFormat [vim src/Processors/Formats/Impl/HiveTextRowInputFormat.h +15]
参考https://bbs.huaweicloud.com/blogs/314808
https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/QueryPlan/QueryPlan.cpp#L156
QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
const QueryPlanOptimizationSettings & optimization_settings,
const BuildQueryPipelineSettings & build_pipeline_settings)
{
checkInitialized();
optimize(optimization_settings);
struct Frame
{
Node * node = {};
QueryPipelineBuilders pipelines = {};
};
QueryPipelineBuilderPtr last_pipeline;
std::stack<Frame> stack;
stack.push(Frame{.node = root});
while (!stack.empty())
{
auto & frame = stack.top();
if (last_pipeline)
{
frame.pipelines.emplace_back(std::move(last_pipeline));
last_pipeline = nullptr;
}
size_t next_child = frame.pipelines.size();
if (next_child == frame.node->children.size())
{
bool limit_max_threads = frame.pipelines.empty();
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
if (limit_max_threads && max_threads)
last_pipeline->limitMaxThreads(max_threads);
stack.pop();
}
else
stack.push(Frame{.node = frame.node->children[next_child]});
}
last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback);
last_pipeline->setProcessListElement(build_pipeline_settings.process_list_element);
last_pipeline->addResources(std::move(resources));
return last_pipeline;
}
首先这个逻辑时间QueryPlan(Tree Node) 转化为QueryPipelines 的过程
这个过程使用vector<Frame> Stack 实现, 从Frame{root Node}入栈开始, 到它出栈结束, 很像一次深度 按层次 读取过程
通过一个
select sum(l_orderkey) from lineitem, orders join l_orderkey = o_orderkey; 的简化的QueryPlanNode 来阐述这个过程
stack.top() 即拿去最后一个入栈的frame成员
准备
QueryPipelineBuilderPtr last_pipeline;//注意last_pipeline 这里申明,最终返回
std::stack<Frame> stack;
stack.push(Frame{.node = root}); //root Node(agg)
循环1
Agg
auto & frame = stack.top(); // frame = Frame{.node = root} root= Node(agg)
last_pipeline == nullptr
size_t next_child = frame.pipelines.size() // next_child = 0
frame.node->children 只有一个Node(join), frame.node->children.size = 1
if (next_child == frame.node->children.size()) // 0 != 1
stack.push(Frame{.node = frame.node->children[next_child]}); // Node(join) 进栈
循环2
Agg
Join
auto & frame = stack.top(); // frame = Frame{.node = Node(join)}
last_pipeline == nullptr
size_t next_child = frame.pipelines.size(); // next_child = 0
frame.node->children.size() // frame.node 为 Node(join) children 为2 (Node(scan1) 和 Node(scan2))
if (next_child == frame.node->children.size()) // 0 !=2
stack.push(Frame{.node = frame.node->children[next_child]}); // Frame{Node(scan1)} 如栈
循环3 (转折点)
Agg
Join
Scan1
auto & frame = stack.top(); // frame = Frame{.node = Node(scan1)}
last_pipeline == nullptr
size_t next_child = frame.pipelines.size(); // next_child = 0
frame.node->children.size() // frame.node 为 Node(scan1) 没有children 0
if (next_child == frame.node->children.size()) // 0 == 0 进入if
{
bool limit_max_threads = frame.pipelines.empty();//0
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
//注意这里是 frame.node->step 和 frame.pipelines 整合为一个pipeline 当前Node(scan1) 可以理解为没有太多处理, 因为frame.pipelines 为空
//即step Node(scan1)转换为last_pipeline
stack.pop(); //Node(scan1) 出栈
}
循环4
Agg
Join
auto & frame = stack.top(); // frame = Frame{.node = Node(join)}
last_pipeline // 为 Node(scan1) 转化而来 不做详解
frame.pipelines.emplace_back(std::move(last_pipeline)); //frame.pipelines 入栈 Node(scan1) pipeline 转折点
last_pipeline = nullptr;
size_t next_child = frame.pipelines.size(); // next_child = 1
frame.node->children.size() // frame.node 为 Node(join) children 为2 (Node(scan1) 和 Node(scan2))
if (next_child == frame.node->children.size()) // 1 !=2
stack.push(Frame{.node = frame.node->children[next_child]}); // Frame{Node(scan2)} 入栈
循环5
Agg
Join
Scan2
auto & frame = stack.top(); // frame = Frame{.node = Node(scan2)}
last_pipeline == nullptr
size_t next_child = frame.pipelines.size(); // next_child = 0
frame.node->children.size() // frame.node 为 Node(scan2) 没有children 0
if (next_child == frame.node->children.size()) // 0 == 0 进入if
{
bool limit_max_threads = frame.pipelines.empty();//0
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
//注意这里是 frame.node->step 和 frame.pipelines 整合为一个pipeline 当前Node(scan2) 可以理解为没有太多处理, 因为frame.pipelines 为空
//即step Node(scan2)转换为last_pipeline
stack.pop(); //Node(scan2) 出栈
}
循环6
Agg
Join
auto & frame = stack.top(); // frame = Frame{.node = Node(join)}
last_pipeline // 为 Node(scan2)
frame.pipelines.emplace_back(std::move(last_pipeline)); //注意 frame.pipelines 已经入栈过 Node(Scan1) pipeline
//现在入栈 Node(scan2) pipeline , frame.pipelines.size == 2
last_pipeline = nullptr;
size_t next_child = frame.pipelines.size(); // next_child = 2
frame.node->children.size() // frame.node 为 Node(join) children 为2 (Node(scan1) 和 Node(scan2))
if (next_child == frame.node->children.size()) // 2 == 2 转折点, 进入if
{
bool limit_max_threads = frame.pipelines.empty(); //limit_max_threads = false
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
//这里即 将 frame.node->step(Node(join)) 与 frame.pipelines(Node(scan1), Node(scan2)) updatePipline 为 last_pipeline , 不做详解
stack.pop(); //Node(join) 出栈
}
循环7
Agg
auto & frame = stack.top(); // frame = Frame{.node = Node(Agg)}
last_pipeline // Node(join) 后面的last_pipeline
frame.pipelines.emplace_back(std::move(last_pipeline)); //注意 frame.pipelines 当前为空, 入栈last_pipeline
last_pipeline = nullptr;
size_t next_child = frame.pipelines.size(); // next_child = 1
frame.node->children.size() // frame.node 为 Node(Join) children 为1
if (next_child == frame.node->children.size()) // 1 == 1进入if
{
bool limit_max_threads = frame.pipelines.empty(); //limit_max_threads = false
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
//这里即 将 frame.node->step(Node(join)) 与 frame.pipelines(Node(Join)) updatePipline 为 last_pipeline , 不做详解
stack.pop(); //Node(Agg) 出栈
}
推出循环
返回last_pipeline
如最初第一章张图
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。