通过一文搞懂这一系列的文章,我们已经知道了,Flink 作业的提交过程:
这篇文章主要聚焦在
我们以简单的代码为例
/**
* @author shengjk1
* @date 2018/11/23
*/
public class FlinkJava8Demo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5);
source.flatMap((Integer number, Collector<String> out)->{
StringBuilder builder = new StringBuilder();
for (int i = 0; i < number; i++) {
builder.append("a");
out.collect(builder.toString());
}
}).returns(Types.STRING).print();
source.map(i-> Tuple2.of(i,i))
.returns(Types.TUPLE(Types.INT,Types.INT))
.print();
env.execute("aa");
}
}
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}
flatMap 是 DataStream 的一个方法或者就是我们常数的算子,而 StreamFlatMap 其实才是 StreamOperator
transform 方法
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
// DataStream 的一个子类
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
//添加 operator,成为 StreamGraph 的一个 operator
getExecutionEnvironment().addOperator(resultTransform);
// 返回 stream,供下游继续操作
return returnStream;
}
像 filter、map等都会进行类似的操作,flink sql 中也是采用这样的方式来将 ExecNode 转化为 flink operator 的
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有