Impala 如果是Fragment 一步一步 Prepare -> Open -> Read -> Close
Status PlanNode::CreatePlanNode(ObjectPool * pool, const TPlanNode & tnode, PlanNode ** node)
{
switch (tnode.node_type)
{
case TPlanNodeType::HDFS_SCAN_NODE:
*node = pool->Add(new HdfsScanPlanNode());
break;
case TPlanNodeType::HBASE_SCAN_NODE:
case TPlanNodeType::DATA_SOURCE_NODE:
case TPlanNodeType::KUDU_SCAN_NODE:
*node = pool->Add(new ScanPlanNode());
break;
case TPlanNodeType::AGGREGATION_NODE:
*node = pool->Add(new AggregationPlanNode());
break;
case TPlanNodeType::HASH_JOIN_NODE:
*node = pool->Add(new PartitionedHashJoinPlanNode());
break;
case TPlanNodeType::NESTED_LOOP_JOIN_NODE:
*node = pool->Add(new NestedLoopJoinPlanNode());
break;
case TPlanNodeType::EMPTY_SET_NODE:
*node = pool->Add(new EmptySetPlanNode());
break;
case TPlanNodeType::EXCHANGE_NODE:
*node = pool->Add(new ExchangePlanNode());
break;
case TPlanNodeType::SELECT_NODE:
*node = pool->Add(new SelectPlanNode());
break;
case TPlanNodeType::SORT_NODE:
if (tnode.sort_node.type == TSortType::PARTIAL)
{
*node = pool->Add(new PartialSortPlanNode());
}
else if (tnode.sort_node.type == TSortType::TOPN || tnode.sort_node.type == TSortType::PARTITIONED_TOPN)
{
*node = pool->Add(new TopNPlanNode());
}
else
{
DCHECK(tnode.sort_node.type == TSortType::TOTAL);
*node = pool->Add(new SortPlanNode());
}
break;
case TPlanNodeType::UNION_NODE:
*node = pool->Add(new UnionPlanNode());
break;
case TPlanNodeType::ANALYTIC_EVAL_NODE:
*node = pool->Add(new AnalyticEvalPlanNode());
break;
case TPlanNodeType::SINGULAR_ROW_SRC_NODE:
*node = pool->Add(new SingularRowSrcPlanNode());
break;
case TPlanNodeType::SUBPLAN_NODE:
*node = pool->Add(new SubplanPlanNode());
break;
case TPlanNodeType::UNNEST_NODE:
*node = pool->Add(new UnnestPlanNode());
break;
case TPlanNodeType::CARDINALITY_CHECK_NODE:
*node = pool->Add(new CardinalityCheckPlanNode());
break;
default:
map<int, const char *>::const_iterator i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
const char * str = "unknown node type";
if (i != _TPlanNodeType_VALUES_TO_NAMES.end())
{
str = i->second;
}
stringstream error_msg;
error_msg << str << " not implemented";
return Status(error_msg.str());
}
return Status::OK();
}
Status DataSinkConfig::CreateConfig(
const TDataSink & thrift_sink, const RowDescriptor * row_desc, FragmentState * state, DataSinkConfig ** data_sink)
{
ObjectPool * pool = state->obj_pool();
*data_sink = nullptr;
switch (thrift_sink.type)
{
case TDataSinkType::DATA_STREAM_SINK:
if (!thrift_sink.__isset.stream_sink)
return Status("Missing data stream sink.");
// TODO: figure out good buffer size based on size of output row
*data_sink = pool->Add(new KrpcDataStreamSenderConfig());
break;
case TDataSinkType::TABLE_SINK:
if (!thrift_sink.__isset.table_sink)
return Status("Missing table sink.");
switch (thrift_sink.table_sink.type)
{
case TTableSinkType::HDFS:
*data_sink = pool->Add(new HdfsTableSinkConfig());
break;
case TTableSinkType::KUDU:
RETURN_IF_ERROR(CheckKuduAvailability());
*data_sink = pool->Add(new KuduTableSinkConfig());
break;
case TTableSinkType::HBASE:
*data_sink = pool->Add(new HBaseTableSinkConfig());
break;
default:
stringstream error_msg;
map<int, const char *>::const_iterator i = _TTableSinkType_VALUES_TO_NAMES.find(thrift_sink.table_sink.type);
const char * str = i != _TTableSinkType_VALUES_TO_NAMES.end() ? i->second : "Unknown table sink";
error_msg << str << " not implemented.";
return Status(error_msg.str());
}
break;
case TDataSinkType::PLAN_ROOT_SINK:
*data_sink = pool->Add(new PlanRootSinkConfig());
break;
case TDataSinkType::HASH_JOIN_BUILDER: {
*data_sink = pool->Add(new PhjBuilderConfig());
break;
}
case TDataSinkType::NESTED_LOOP_JOIN_BUILDER: {
*data_sink = pool->Add(new NljBuilderConfig());
break;
}
default:
stringstream error_msg;
map<int, const char *>::const_iterator i = _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
const char * str = i != _TDataSinkType_VALUES_TO_NAMES.end() ? i->second : "Unknown data sink type ";
error_msg << str << " not implemented.";
return Status(error_msg.str());
}
RETURN_IF_ERROR((*data_sink)->Init(thrift_sink, row_desc, state));
return Status::OK();
}
Status ScanPlanNode::CreateExecNode(RuntimeState * state, ExecNode ** node) const
{
ObjectPool * pool = state->obj_pool();
switch (tnode_->node_type)
{
case TPlanNodeType::HBASE_SCAN_NODE:
*node = pool->Add(new HBaseScanNode(pool, *this, state->desc_tbl()));
break;
case TPlanNodeType::DATA_SOURCE_NODE:
*node = pool->Add(new DataSourceScanNode(pool, *this, state->desc_tbl()));
break;
case TPlanNodeType::KUDU_SCAN_NODE:
if (tnode_->kudu_scan_node.use_mt_scan_node)
{
DCHECK_GT(state->query_options().mt_dop, 0);
*node = pool->Add(new KuduScanNodeMt(pool, *this, state->desc_tbl()));
}
else
{
DCHECK(state->query_options().mt_dop == 0 || state->query_options().num_scanner_threads == 1);
*node = pool->Add(new KuduScanNode(pool, *this, state->desc_tbl()));
}
break;
default:
DCHECK(false) << "Unexpected scan node type: " << tnode_->node_type;
}
return Status::OK();
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。