前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Impala 从 Fragment -> DataSink -> RowBatch 粗讲

Impala 从 Fragment -> DataSink -> RowBatch 粗讲

原创
作者头像
jasong
发布2021-12-27 21:44:08
6450
发布2021-12-27 21:44:08
举报
文章被收录于专栏:ClickHouse

Impala

Impala 如果是Fragment 一步一步 Prepare -> Open -> Read -> Close

Fragment -> DataSink --> RowBatch
Fragment -> DataSink --> RowBatch

PlanNode

代码语言:javascript
复制
Status PlanNode::CreatePlanNode(ObjectPool * pool, const TPlanNode & tnode, PlanNode ** node)
{
    switch (tnode.node_type)
    {
        case TPlanNodeType::HDFS_SCAN_NODE:
            *node = pool->Add(new HdfsScanPlanNode());
            break;
        case TPlanNodeType::HBASE_SCAN_NODE:
        case TPlanNodeType::DATA_SOURCE_NODE:
        case TPlanNodeType::KUDU_SCAN_NODE:
            *node = pool->Add(new ScanPlanNode());
            break;
        case TPlanNodeType::AGGREGATION_NODE:
            *node = pool->Add(new AggregationPlanNode());
            break;
        case TPlanNodeType::HASH_JOIN_NODE:
            *node = pool->Add(new PartitionedHashJoinPlanNode());
            break;
        case TPlanNodeType::NESTED_LOOP_JOIN_NODE:
            *node = pool->Add(new NestedLoopJoinPlanNode());
            break;
        case TPlanNodeType::EMPTY_SET_NODE:
            *node = pool->Add(new EmptySetPlanNode());
            break;
        case TPlanNodeType::EXCHANGE_NODE:
            *node = pool->Add(new ExchangePlanNode());
            break;
        case TPlanNodeType::SELECT_NODE:
            *node = pool->Add(new SelectPlanNode());
            break;
        case TPlanNodeType::SORT_NODE:
            if (tnode.sort_node.type == TSortType::PARTIAL)
            {
                *node = pool->Add(new PartialSortPlanNode());
            }
            else if (tnode.sort_node.type == TSortType::TOPN || tnode.sort_node.type == TSortType::PARTITIONED_TOPN)
            {
                *node = pool->Add(new TopNPlanNode());
            }
            else
            {
                DCHECK(tnode.sort_node.type == TSortType::TOTAL);
                *node = pool->Add(new SortPlanNode());
            }
            break;
        case TPlanNodeType::UNION_NODE:
            *node = pool->Add(new UnionPlanNode());
            break;
        case TPlanNodeType::ANALYTIC_EVAL_NODE:
            *node = pool->Add(new AnalyticEvalPlanNode());
            break;
        case TPlanNodeType::SINGULAR_ROW_SRC_NODE:
            *node = pool->Add(new SingularRowSrcPlanNode());
            break;
        case TPlanNodeType::SUBPLAN_NODE:
            *node = pool->Add(new SubplanPlanNode());
            break;
        case TPlanNodeType::UNNEST_NODE:
            *node = pool->Add(new UnnestPlanNode());
            break;
        case TPlanNodeType::CARDINALITY_CHECK_NODE:
            *node = pool->Add(new CardinalityCheckPlanNode());
            break;
        default:
            map<int, const char *>::const_iterator i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
            const char * str = "unknown node type";
            if (i != _TPlanNodeType_VALUES_TO_NAMES.end())
            {
                str = i->second;
            }
            stringstream error_msg;
            error_msg << str << " not implemented";
            return Status(error_msg.str());
    }
    return Status::OK();
}

DataSinkConfig 创建

代码语言:javascript
复制
Status DataSinkConfig::CreateConfig(
    const TDataSink & thrift_sink, const RowDescriptor * row_desc, FragmentState * state, DataSinkConfig ** data_sink)
{
    ObjectPool * pool = state->obj_pool();
    *data_sink = nullptr;
    switch (thrift_sink.type)
    {
        case TDataSinkType::DATA_STREAM_SINK:
            if (!thrift_sink.__isset.stream_sink)
                return Status("Missing data stream sink.");
            // TODO: figure out good buffer size based on size of output row
            *data_sink = pool->Add(new KrpcDataStreamSenderConfig());
            break;
        case TDataSinkType::TABLE_SINK:
            if (!thrift_sink.__isset.table_sink)
                return Status("Missing table sink.");
            switch (thrift_sink.table_sink.type)
            {
                case TTableSinkType::HDFS:
                    *data_sink = pool->Add(new HdfsTableSinkConfig());
                    break;
                case TTableSinkType::KUDU:
                    RETURN_IF_ERROR(CheckKuduAvailability());
                    *data_sink = pool->Add(new KuduTableSinkConfig());
                    break;
                case TTableSinkType::HBASE:
                    *data_sink = pool->Add(new HBaseTableSinkConfig());
                    break;
                default:
                    stringstream error_msg;
                    map<int, const char *>::const_iterator i = _TTableSinkType_VALUES_TO_NAMES.find(thrift_sink.table_sink.type);
                    const char * str = i != _TTableSinkType_VALUES_TO_NAMES.end() ? i->second : "Unknown table sink";
                    error_msg << str << " not implemented.";
                    return Status(error_msg.str());
            }
            break;
        case TDataSinkType::PLAN_ROOT_SINK:
            *data_sink = pool->Add(new PlanRootSinkConfig());
            break;
        case TDataSinkType::HASH_JOIN_BUILDER: {
            *data_sink = pool->Add(new PhjBuilderConfig());
            break;
        }
        case TDataSinkType::NESTED_LOOP_JOIN_BUILDER: {
            *data_sink = pool->Add(new NljBuilderConfig());
            break;
        }
        default:
            stringstream error_msg;
            map<int, const char *>::const_iterator i = _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
            const char * str = i != _TDataSinkType_VALUES_TO_NAMES.end() ? i->second : "Unknown data sink type ";
            error_msg << str << " not implemented.";
            return Status(error_msg.str());
    }
    RETURN_IF_ERROR((*data_sink)->Init(thrift_sink, row_desc, state));
    return Status::OK();
}

ScanNode

代码语言:javascript
复制
Status ScanPlanNode::CreateExecNode(RuntimeState * state, ExecNode ** node) const
{
    ObjectPool * pool = state->obj_pool();
    switch (tnode_->node_type)
    {
        case TPlanNodeType::HBASE_SCAN_NODE:
            *node = pool->Add(new HBaseScanNode(pool, *this, state->desc_tbl()));
            break;
        case TPlanNodeType::DATA_SOURCE_NODE:
            *node = pool->Add(new DataSourceScanNode(pool, *this, state->desc_tbl()));
            break;
        case TPlanNodeType::KUDU_SCAN_NODE:
            if (tnode_->kudu_scan_node.use_mt_scan_node)
            {
                DCHECK_GT(state->query_options().mt_dop, 0);
                *node = pool->Add(new KuduScanNodeMt(pool, *this, state->desc_tbl()));
            }
            else
            {
                DCHECK(state->query_options().mt_dop == 0 || state->query_options().num_scanner_threads == 1);
                *node = pool->Add(new KuduScanNode(pool, *this, state->desc_tbl()));
            }
            break;
        default:
            DCHECK(false) << "Unexpected scan node type: " << tnode_->node_type;
    }
    return Status::OK();
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Impala
  • PlanNode
  • DataSinkConfig 创建
  • ScanNode
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档