因为网络上很多资料都过时了,有的是版本太老了,本文针对最新版本的1.13.2快速构建一个WordCount程序
本文创建一个可以从网络上读取输入,然后每5秒钟输出每个单词个数的项目
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.13.2 \
-DgroupId=mflink \
-DartifactId=mflink \
-Dversion=0.1 \
-Dpackage=myflink \
-DinteractiveMode=false用IDE打开这个项目,里面已经创建了两个类StreamingJob和BatchJob,本文使用StreamingJob来完成一个实时统计单词的任务
可以修改后面一些自定义的参数
StreamExecutionEnvironment: 这是一个入口类,可以用来设置参数和创建数据源以及提交任务// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");这创建了一个字符串类型的 DataStream。DataStream 是 Flink 中做流处理的核心 API,上面定义了非常多常见的操作(如,过滤、转换、聚合、窗口、关联等)。
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
});DataStream<Tuple2<String, Integer>> windowCounts = wordCounts
.keyBy((KeySelector<Tuple2<String, Integer>, Object>) tuple -> tuple.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);windowCounts.print().setParallelism(1);// execute program
env.execute("Socket Window WordCount");最后的 env.execute 调用是启动实际Flink作业所必需的。所有算子操作(例如创建源、聚合、打印)只是构建了内部算子操作的图形。只有在execute()被调用时才会在提交到集群上或本地计算机上执行。
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 通过连接 socket 获取输入数据,这里连接到本地9000端口,如果9000端口已被占用,请换一个端口
DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");
// 解析数据,按 word 分组,开窗,聚合
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
});
DataStream<Tuple2<String, Integer>> windowCounts = wordCounts
.keyBy((KeySelector<Tuple2<String, Integer>, Object>) tuple -> tuple.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
// 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程
windowCounts.print().setParallelism(1);
// execute program
env.execute("Socket Window WordCount");
}
}nc -lk 9000直接在IDE中启动就可以了

java.lang.ClassNotFoundException: org.apache.flink.api.common.functions.FlatMapFunction解决方法: 把pom.xml文件中的<scope>provided</scope>注释掉
Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?解决方案: .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))