

打开你的IDE(如IntelliJ IDEA, Eclipse等)或使用命令行。 创建一个新的Maven项目。如果你使用的是IntelliJ IDEA,可以选择
File->New->Project...,然后选择Maven项目。如果你使用命令行,可以运行类似mvn archetype:generate -DgroupId=com.example -DartifactId=flink-wordcount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false的命令来生成项目结构。 填写项目的基本信息(如GroupId, ArtifactId, Version等)。对于WordCount示例,我们可以将ArtifactId设置为flink-wordcount。


<properties>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId> <!-- flink基本依赖 -->
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId> <!-- flink客户端 -->
<version>${flink.version}</version>
</dependency>
</dependencies>在Java包下创建一个wordcount的包,然后创建一个文件 wc.txt,将下面的文字复制进去
后面进行统计每个单词出现的次数,下面每个单词都是按照空格进行分割,然后进行逐行读取
hello world
flink yyds
hello java
hello flink在wordcount包下创建flink_wc.java文件
package wordcount;
import org.apache.flink.api.java.ExecutionEnvironment;
/**
* @ClassName flink_wc
* @Description TODO
* @Author 长风清留杨
* @Date 2024/8/8 16:24
* @Version 1.0
*/
public class flink_wc {
public static void main(String[] args) {
//创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
}
}ExecutionEnvironment
在 Flink 中,ExecutionEnvironment 是一个非常重要的接口,它是所有 Flink 程序的基础。这个接口提供了配置和执行 Flink 程序的方法。ExecutionEnvironment 允许用户设置并行度、数据源、数据转换操作(如 map、filter、join 等),以及最终的数据接收者(如写入文件系统、打印到控制台或发送到另一个系统等)。
getExecutionEnvironment 方法
ExecutionEnvironment.getExecutionEnvironment(); 是 ExecutionEnvironment 接口的一个静态方法,用于获取一个合适的 ExecutionEnvironment 实例。这个方法会根据程序的运行环境(是本地运行还是集群运行)来自动决定创建哪种类型的 ExecutionEnvironment 实例。
flink run 命令但指定了本地执行模式(通过 -m local 参数)时,getExecutionEnvironment() 方法会返回一个 LocalExecutionEnvironment 的实例。这个实例会配置 Flink 以本地模式运行,即所有操作都在单个 JVM 实例中执行。
getExecutionEnvironment() 方法会返回一个 ClusterExecutionEnvironment 的实例(尽管在实际代码中,这个类可能是隐藏的,因为 Flink 的 API 设计通常不会直接暴露具体的实现类)。这个实例会配置 Flink 以集群模式运行,利用集群中的多个节点和多个 JVM 实例来并行处理数据流。
//读取文件,参数是wc.txt文件的路径
DataSource<String> line_DS = env.readTextFile("src/main/java/wordcount/wc.txt");读取文本文件(readTextFile):
readTextFile(String filePath) 是 Flink 提供的一个方法,用于从指定的文件路径读取文本文件。这个方法会读取文件中的所有行,并将每一行作为字符串(String)返回,从而形成一个数据流。DataSource<String> 的实例,它是 Flink 数据流 API 中的一个关键接口,表示了数据流的起点。在这个例子中,数据流中的元素类型是 String,因为文件是文本文件,每一行都被当作一个字符串处理。数据源(DataSource<String> DS):
DataSource<String> DS 是对从文件中读取的数据流的一个引用。DataSource<T> 是一个泛型接口,T 表示数据流中元素的类型。在这个例子中,T 被指定为 String,因为文件是文本文件。DS 是这个数据源的变量名,你可以通过它来对数据流进行进一步的操作,比如应用转换(map、filter 等)、聚合、窗口操作等。package wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* @ClassName flink_wc
* @Description TODO
* @Author 长风清留杨
* @Date 2024/8/8 16:24
* @Version 1.0
*/
public class flink_wc {
public static void main(String[] args) {
//创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//读取文件,参数是wc.txt文件的路径
DataSource<String> line_DS = env.readTextFile("wordcount/wc.txt");
/**
* 使用flatMap算子进行处理
* FlatMapFunction<String, Tuple2<String, Integer>>
* 第一个参数String表示进来的数据类型是String类型的
* 第二个参数Tuple2<String, Integer>表示出去的类型是二元组类型的,二元组中第一个参数是String类型第二个参数是Integer类型
* 在读取到数据之后需要转换成元祖然后再进行统计,每个单词放入元祖中都放一个1
* 例如(hello,1) (java,1) (hello,1) (flink,1)
* 然后分组统计后面的数字就知道每个单子出现了多少次
*/
FlatMapOperator<String, Tuple2<String, Integer>> word_tuple2 = line_DS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
/**
* 重写flatMap方法
* flatMap(String value, Collector<Tuple2<String, Integer>> out)
* String value表示进来的参数value是String类型的
* Collector<Tuple2<String, Integer>> out Collector是收集器,出去的参数out是二元组类型
* @param value
* @param out
* @throws Exception
*/
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//使用split按照空格进行拆分
String[] words = value.split(" ");
//拆分的结果是一个数组,然后遍历拆分的结果,也就是遍历出来每一个单词
for (String word : words) {
//将单词添加到二元组中,后面写1,就到达了(hello,1) (java,1) (hello,1) (flink,1)这样的效果
Tuple2<String, Integer> tuple2_of = Tuple2.of(word, 1);
//向下游发送数据,collect方法就是把数据发送到下游
out.collect(tuple2_of);
}
}
});
}
}flatMap是Flink中用于将数据流中的每个元素转换为零个、一个或多个新元素的转换操作。在这个例子中,我们想要将每行文本(String)转换为其包含的单词(String)以及每个单词的出现次数(这里暂时为1)。
flatMap转换的具体行为,我们需要实现FlatMapFunction接口。这个接口要求我们重写flatMap方法,该方法接受两个参数:输入元素(value)和收集器(out)。输入元素是数据流中的当前元素(在这里是一行文本),而收集器用于输出转换后的新元素。
value.split(" ")将输入的行文本(String)按空格分割成一个单词数组(String[])。Tuple2<String, Integer>对象,其中包含了单词本身(word)和计数1。out.collect(tuple2_of)将每个Tuple2<String, Integer>对象收集到输出数据流中。flatMap的结果赋值给FlatMapOperator<String, Tuple2<String, Integer>>类型的变量word_tuple2,但这并不是Flink API的标准用法。实际上,flatMap操作会返回一个新的DataStream<Tuple2<String, Integer>>
package wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* @ClassName flink_wc
* @Description TODO
* @Author 长风清留杨
* @Date 2024/8/8 16:24
* @Version 1.0
*/
public class flink_wc {
public static void main(String[] args) throws Exception {
//创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//读取文件,参数是wc.txt文件的路径
DataSource<String> line_DS = env.readTextFile("wordcount/wc.txt");
/**
* 使用flatMap算子进行处理
* FlatMapFunction<String, Tuple2<String, Integer>>
* 第一个参数String表示进来的数据类型是String类型的
* 第二个参数Tuple2<String, Integer>表示出去的类型是二元组类型的,二元组中第一个参数是String类型第二个参数是Integer类型
* 在读取到数据之后需要转换成元祖然后再进行统计,每个单词放入元祖中都放一个1
* 例如(hello,1) (java,1) (hello,1) (flink,1)
* 然后分组统计后面的数字就知道每个单子出现了多少次
*/
FlatMapOperator<String, Tuple2<String, Integer>> word_tuple2 = line_DS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
/**
* 重写flatMap方法
* flatMap(String value, Collector<Tuple2<String, Integer>> out)
* String value表示进来的参数value是String类型的
* Collector<Tuple2<String, Integer>> out Collector是收集器,出去的参数out是二元组类型
* @param value
* @param out
* @throws Exception
*/
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//使用split按照空格进行拆分
String[] words = value.split(" ");
//拆分的结果是一个数组,然后遍历拆分的结果,也就是遍历出来每一个单词
for (String word : words) {
//将单词添加到二元组中,后面写1,就到达了(hello,1) (java,1) (hello,1) (flink,1)这样的效果
Tuple2<String, Integer> tuple2_of = Tuple2.of(word, 1);
//向下游发送数据,collect方法就是把数据发送到下游
out.collect(tuple2_of);
}
}
});
/**
* 分组操作
* 根据单词进行分组,这里是根据二元组中的索引来分组的
* 例如(hello,1)那么第一个元素也就是hello它的索引下标是0,第二个元素的索引下标是1
* 所以这里我们通过0 来根据第一个元素进行分组,相同单词会被分到一组
*/
UnsortedGrouping<Tuple2<String, Integer>> word_by = word_tuple2.groupBy(0);
//对单词分完组之后,需要聚合一下第二个参数,来统计数量,第二个参数的下标是1,那么就写1
AggregateOperator<Tuple2<String, Integer>> word_sum = word_by.sum(1);
//输出结果,这里需要抛异常
word_sum.print();
}
}输出结果
(flink,2) (world,1) (hello,3) (java,1) (yyds,1)
keyBy方法实现的,而不是直接调用groupBy(尽管在某些上下文中,“分组”和“keyBy”可以互换使用,但groupBy通常与窗口操作结合使用)。keyBy方法接受一个KeySelector函数,该函数定义了如何从输入元素中提取键(在这个例子中是单词)。在这个例子中,我们使用了一个Lambda表达式value -> value.f0,它表示从每个Tuple2<String, Integer>中提取第一个字段(即单词)作为键。KeyedStream<Tuple2<String, Integer>>,它是一个特殊的DataStream,其中的元素已经根据键(单词)进行了分组。sum(1)方法来对分组后的数据流的第二个字段(即每个单词的计数)进行求和。sum方法接受一个整数值,该值指定了要对其执行求和操作的字段的索引(从0开始计数)。DataStream,其中包含了每个键(单词)的总和。print()方法来打印聚合后的结果。print()是Flink中用于调试和测试的一个非常方便的方法,它会在Flink任务执行时将结果输出到标准输出或日志中。需要注意的是,这种代码的实现方式,是基于DataSet API的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:
$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar这样,DataSet API就没什么用了,在实际应用中我们只要维护一套DataStream API就可以。这里只是为了方便大家理解,我们依然用DataSet API做了批处理的实现。
我们同样试图读取文档wc.txt中的数据,并统计每个单词出现的频次。整体思路与之前的批处理非常类似,代码模式也基本一致。
因为是读取文件,所以就算使用流处理也是有界的,读取完文件就停止运行了
在wordcount包下创建flink_wc_datastream.java文件
package wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @ClassName flink_wc_datastream
* @Description TODO
* @Author 长风清留扬
* @Date 2024/8/8 17:36
* @Version 1.0
*/
public class flink_wc_datastream {
public static void main(String[] args) {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
}
}StreamExecutionEnvironment类的静态方法getExecutionEnvironment()。
StreamExecutionEnvironment的实例,这个实例代表了当前的流处理执行环境。
StreamExecutionEnvironment实例赋值给变量env。之后,你可以使用env变量来配置你的流处理作业,比如添加数据源、设置转换操作、添加数据汇等。
package wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @ClassName flink_wc_datastream
* @Description TODO
* @Author 长风清留扬
* @Date 2024/8/8 17:36
* @Version 1.0
*/
public class flink_wc_datastream {
public static void main(String[] args) {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件,参数是wc.txt文件的路径,readTextFile方法显示已经过时了,不过不用管
DataStreamSource<String> line_DS = env.readTextFile("src/main/java/wordcount/wc.txt");
}
}readTextFile方法: env.readTextFile("src/main/java/wordcount/wc.txt")调用了StreamExecutionEnvironment实例env上的readTextFile方法。这个方法用于从指定的文件路径读取文本文件。"src/main/java/wordcount/wc.txt"是传递给readTextFile方法的参数,指定了要读取的文本文件的路径。需要注意的是,这个路径是相对于Flink作业的执行环境而言的,具体取决于你的作业是如何部署和运行的。在本地开发环境中,这个路径可能指向你项目中的某个文件;但在集群环境中,这个路径应该指向集群可访问的文件系统(如HDFS)上的文件。DataStreamSource<String>: readTextFile方法返回一个DataStreamSource<String>的实例。DataStreamSource是DataStream的一个特殊子类,用于表示数据流的源。在这个例子中,由于我们读取的是文本文件,所以数据流的元素类型是String,每个元素代表文件中的一行文本。DataStreamSource<String>是一个特殊的数据流,它代表了整个数据处理的起点。在Flink中,数据流(DataStream)是一系列数据的集合,这些数据可以来自于不同的源(如文件、集合、网络套接字等),并可以通过一系列转换操作(如map、filter、reduce等)进行处理。line_DS: readTextFile方法返回的DataStreamSource<String>实例被赋值给变量line_DS。这样,你就可以在后续的代码中使用line_DS来引用这个数据流,并对其进行进一步的处理(如分词、计数等)。package wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @ClassName flink_wc_datastream
* @Description TODO
* @Author 长风清留扬
* @Date 2024/8/8 17:36
* @Version 1.0
*/
public class flink_wc_datastream {
public static void main(String[] args) {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件,参数是wc.txt文件的路径,readTextFile方法显示已经过时了,不过不用管
DataStreamSource<String> line_DS = env.readTextFile("src/main/java/wordcount/wc.txt");
/**
* 使用flatMap算子进行处理
* FlatMapFunction<String, Tuple2<String, Integer>>
* 第一个参数String表示进来的数据类型是String类型的
* 第二个参数Tuple2<String, Integer>表示出去的类型是二元组类型的,二元组中第一个参数是String类型第二个参数是Integer类型
* 在读取到数据之后需要转换成元祖然后再进行统计,每个单词放入元祖中都放一个1
* 例如(hello,1) (java,1) (hello,1) (flink,1)
* 然后分组统计后面的数字就知道每个单子出现了多少次
*/
SingleOutputStreamOperator<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator = line_DS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
/**
* 重写flatMap方法
* flatMap(String value, Collector<Tuple2<String, Integer>> out)
* String value表示进来的参数value是String类型的
* Collector<Tuple2<String, Integer>> out Collector是收集器,出去的参数out是二元组类型
* @param value
* @param out
* @throws Exception
*/
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//使用split按照空格进行拆分
String[] words = value.split(" ");
//拆分的结果是一个数组,然后遍历拆分的结果,也就是遍历出来每一个单词
for (String word : words) {
//将单词添加到二元组中,后面写1,就到达了(hello,1) (java,1) (hello,1) (flink,1)这样的效果
Tuple2<String, Integer> tuple2_of = Tuple2.of(word, 1);
//向下游发送数据,collect方法就是把数据发送到下游
out.collect(tuple2_of);
}
}
});
}
}line_DS:这是一个DataStreamSource<String>类型的变量,代表了包含文本行(每行文本是一个字符串)的数据流。这个数据流可能是通过读取文件、网络套接字或其他数据源获得的。SingleOutputStreamOperator<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator:这是一个变量声明,用于存储flatMap操作的结果。SingleOutputStreamOperator是DataStream的一个子类,用于表示经过某种转换操作后的数据流。在这个例子中,转换操作是flatMap,它将每行文本拆分成单词,并将每个单词及其计数(初始为1)封装成Tuple2<String, Integer>对象。flatMap转换操作flatMap是Flink中一种强大的转换操作,它可以将输入的数据流中的每个元素转换成一个或多个输出元素。这里,它被用来将每行文本拆分成单词,并为每个单词生成一个Tuple2<String, Integer>对象。flatMap方法接受一个FlatMapFunction实现作为参数。FlatMapFunction是一个函数式接口,包含一个flatMap方法,该方法需要被重写以定义转换逻辑。FlatMapFunction实现flatMap方法的签名是public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception。它接受两个参数: String value:这是从输入数据流中读取的当前元素(在这个例子中是文本行)。Collector<Tuple2<String, Integer>> out:这是一个收集器,用于将转换后的元素发送到下游。flatMap方法的实现中,首先使用value.split(" ")按空格拆分文本行,得到一个字符串数组words。然后,遍历这个数组,对于数组中的每个单词(word),使用Tuple2.of(word, 1)创建一个包含单词和计数(初始化为1)的Tuple2<String, Integer>对象,并通过out.collect(tuple2_of)将这个对象发送到下游。package wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @ClassName flink_wc_datastream
* @Description TODO
* @Author 长风清留扬
* @Date 2024/8/8 17:36
* @Version 1.0
*/
public class flink_wc_datastream {
public static void main(String[] args) {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件,参数是wc.txt文件的路径,readTextFile方法显示已经过时了,不过不用管
DataStreamSource<String> line_DS = env.readTextFile("src/main/java/wordcount/wc.txt");
/**
* 使用flatMap算子进行处理
* FlatMapFunction<String, Tuple2<String, Integer>>
* 第一个参数String表示进来的数据类型是String类型的
* 第二个参数Tuple2<String, Integer>表示出去的类型是二元组类型的,二元组中第一个参数是String类型第二个参数是Integer类型
* 在读取到数据之后需要转换成元祖然后再进行统计,每个单词放入元祖中都放一个1
* 例如(hello,1) (java,1) (hello,1) (flink,1)
* 然后分组统计后面的数字就知道每个单子出现了多少次
*/
SingleOutputStreamOperator<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator = line_DS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
/**
* 重写flatMap方法
* flatMap(String value, Collector<Tuple2<String, Integer>> out)
* String value表示进来的参数value是String类型的
* Collector<Tuple2<String, Integer>> out Collector是收集器,出去的参数out是二元组类型
* @param value
* @param out
* @throws Exception
*/
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//使用split按照空格进行拆分
String[] words = value.split(" ");
//拆分的结果是一个数组,然后遍历拆分的结果,也就是遍历出来每一个单词
for (String word : words) {
//将单词添加到二元组中,后面写1,就到达了(hello,1) (java,1) (hello,1) (flink,1)这样的效果
Tuple2<String, Integer> tuple2_of = Tuple2.of(word, 1);
//向下游发送数据,collect方法就是把数据发送到下游
out.collect(tuple2_of);
}
}
});
/**
* 跟批处理的聚合算子不一样,流处理中使用的是keyBy
* keyBy中的参数跟flatmap一样是一个接口,是KeySelector接口,所以需要去实现这个接口
* KeySelector<Tuple2<String, Integer>, String>
* 第一个参数Tuple2<String, Integer>表示进来的数据是一个什么类型的,进来的数据是一个二元组
* (hello,1) 二元组中第一个参数是String类型第二个参数是Integer类型,所以要写成Tuple2<String, Integer>
* 第二个参数String表示要分组的参数的类型,因为要对二元组中的第一个参数进行分组也就是(hello,1)中的hello分组
* 所以数据类型应该是String类型
*/
KeyedStream<Tuple2<String, Integer>, String> word_by = tuple2SingleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
//重写getKey方法 Tuple2<String, Integer> value表示传递进来的参数是一个二元组
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
/**
* 返回结果,返回的结果就是要根据谁进行分组
* 这里根据二元组的下标就不是通过0和1了,f0就表示下标0,f1就表示下标1,f2就表示下标2
* 因为不需要进行其他处理,只需要通过二元组中第一个参数分组就行,所以直接value.f0
*/
return value.f0;
}
});
}
}tuple2SingleOutputStreamOperator:这是一个SingleOutputStreamOperator<Tuple2<String, Integer>>类型的变量,它代表了一个数据流,其中每个元素都是一个Tuple2<String, Integer>对象。这个Tuple2对象通常包含两个字段:第一个字段(f0)是单词(String类型),第二个字段(f1)是该单词的计数(Integer类型,但在这个上下文中,计数可能只是初始化为1,用于后续累加)。keyBy操作keyBy是Flink中的一个转换操作,它用于将数据流中的元素按照指定的键(key)进行分组。在这个例子中,我们想要按照单词(即Tuple2的第一个字段)进行分组,以便后续可以对每个单词的计数进行累加。KeySelector函数式接口keyBy方法接受一个KeySelector函数式接口的实现作为参数。KeySelector是一个泛型接口,它定义了一个getKey方法,该方法接受一个输入元素,并返回该元素的键。KeySelector<Tuple2<String, Integer>, String>接口。这个接口指定了输入元素类型为Tuple2<String, Integer>,键的类型为String。getKey方法getKey方法是KeySelector接口中必须实现的方法。它接受一个Tuple2<String, Integer>类型的参数value,并返回该Tuple2对象的第一个字段(即单词)作为键。getKey方法的实现中,我们通过value.f0访问Tuple2对象的第一个字段,并将其作为键返回。这里,f0是Flink中Tuple2类用于访问第一个字段的约定俗成的字段名。KeyedStream的创建keyBy方法被调用并传入KeySelector实现后,它会返回一个KeyedStream<Tuple2<String, Integer>, String>类型的对象。这个对象代表了按照指定键(在这个例子中是单词)分组后的数据流。KeyedStream提供了多种用于分组和窗口操作的方法,比如sum、min、max、reduce等,这些方法可以应用于分组后的数据,以执行各种聚合操作。package wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @ClassName flink_wc_datastream
* @Description TODO
* @Author 长风清留扬
* @Date 2024/8/8 17:36
* @Version 1.0
*/
public class flink_wc_datastream {
public static void main(String[] args) {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件,参数是wc.txt文件的路径,readTextFile方法显示已经过时了,不过不用管
DataStreamSource<String> line_DS = env.readTextFile("src/main/java/wordcount/wc.txt");
/**
* 使用flatMap算子进行处理
* FlatMapFunction<String, Tuple2<String, Integer>>
* 第一个参数String表示进来的数据类型是String类型的
* 第二个参数Tuple2<String, Integer>表示出去的类型是二元组类型的,二元组中第一个参数是String类型第二个参数是Integer类型
* 在读取到数据之后需要转换成元祖然后再进行统计,每个单词放入元祖中都放一个1
* 例如(hello,1) (java,1) (hello,1) (flink,1)
* 然后分组统计后面的数字就知道每个单子出现了多少次
*/
SingleOutputStreamOperator<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator = line_DS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
/**
* 重写flatMap方法
* flatMap(String value, Collector<Tuple2<String, Integer>> out)
* String value表示进来的参数value是String类型的
* Collector<Tuple2<String, Integer>> out Collector是收集器,出去的参数out是二元组类型
* @param value
* @param out
* @throws Exception
*/
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//使用split按照空格进行拆分
String[] words = value.split(" ");
//拆分的结果是一个数组,然后遍历拆分的结果,也就是遍历出来每一个单词
for (String word : words) {
//将单词添加到二元组中,后面写1,就到达了(hello,1) (java,1) (hello,1) (flink,1)这样的效果
Tuple2<String, Integer> tuple2_of = Tuple2.of(word, 1);
//向下游发送数据,collect方法就是把数据发送到下游
out.collect(tuple2_of);
}
}
});
/**
* 跟批处理的聚合算子不一样,流处理中使用的是keyBy
* keyBy中的参数跟flatmap一样是一个接口,是KeySelector接口,所以需要去实现这个接口
* KeySelector<Tuple2<String, Integer>, String>
* 第一个参数Tuple2<String, Integer>表示进来的数据是一个什么类型的,进来的数据是一个二元组
* (hello,1) 二元组中第一个参数是String类型第二个参数是Integer类型,所以要写成Tuple2<String, Integer>
* 第二个参数String表示要分组的参数的类型,因为要对二元组中的第一个参数进行分组也就是(hello,1)中的hello分组
* 所以数据类型应该是String类型
*/
KeyedStream<Tuple2<String, Integer>, String> word_by = tuple2SingleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
//重写getKey方法 Tuple2<String, Integer> value表示传递进来的参数是一个二元组
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
/**
* 返回结果,返回的结果就是要根据谁进行分组
* 这里根据二元组的下标就不是通过0和1了,f0就表示下标0,f1就表示下标1,f2就表示下标2
* 因为不需要进行其他处理,只需要通过二元组中第一个参数分组就行,所以直接value.f0
*/
return value.f0;
}
});
//跟批处理的聚合一样,因为要对二元组中的第二个参数聚合,第二个参数的下标是1,直接写1
SingleOutputStreamOperator<Tuple2<String, Integer>> word_sum = word_by.sum(1);
word_sum.print();
}
}word_by:这是一个KeyedStream<Tuple2<String, Integer>, String>类型的变量,它代表了按照单词进行分组后的数据流。在这个KeyedStream中,每个元素都是一个Tuple2<String, Integer>对象,其中第一个字段(f0)是单词,第二个字段(f1)是该单词的计数(尽管在keyBy之前的操作中,计数可能只是初始化为1,但在这里,我们将对它们进行累加)。sum聚合操作sum是Flink中KeyedStream上的一个聚合操作,它用于对分组后的数据流中的每个键对应的值进行累加。在这个例子中,我们调用sum(1)来指定对Tuple2的第二个字段(即计数)进行累加。sum方法的参数(在这个例子中是1)指定了要累加的字段的索引。在Flink的Tuple类中,字段索引是从0开始的,所以1表示我们想要对第二个字段(即计数)进行操作。sum操作返回的是一个新的SingleOutputStreamOperator<Tuple2<String, Integer>>对象(在这个例子中被赋值给word_sum变量),它包含了累加后的结果。每个结果元素仍然是一个Tuple2<String, Integer>对象,但这次Integer部分不再是初始的1,而是该单词在所有出现位置上的计数之和。print操作print是Flink中的一个简单操作,用于将数据流中的元素输出到标准输出(通常是控制台)。在这个例子中,我们调用word_sum.print()来打印累加后的单词计数。print操作会拦截并打印出流经该操作的所有元素。这对于调试和验证数据流处理逻辑非常有用。print操作通常用于调试目的,因为它会将数据输出到控制台,这可能会在生产环境中产生大量的日志输出。因此,在生产环境中,通常会使用更复杂的日志记录或监控工具来跟踪数据流的状态。sum操作是并行执行的,Flink会利用多个任务槽(task slots)来并行处理数据,从而提高处理速度。但是,由于keyBy操作的存在,相同键的数据会被发送到同一个任务中进行处理,以确保累加操作的正确性。package wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @ClassName flink_wc_datastream
* @Description TODO
* @Author 长风清留扬
* @Date 2024/8/8 17:36
* @Version 1.0
*/
public class flink_wc_datastream {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件,参数是wc.txt文件的路径,readTextFile方法显示已经过时了,不过不用管
DataStreamSource<String> line_DS = env.readTextFile("src/main/java/wordcount/wc.txt");
/**
* 使用flatMap算子进行处理
* FlatMapFunction<String, Tuple2<String, Integer>>
* 第一个参数String表示进来的数据类型是String类型的
* 第二个参数Tuple2<String, Integer>表示出去的类型是二元组类型的,二元组中第一个参数是String类型第二个参数是Integer类型
* 在读取到数据之后需要转换成元祖然后再进行统计,每个单词放入元祖中都放一个1
* 例如(hello,1) (java,1) (hello,1) (flink,1)
* 然后分组统计后面的数字就知道每个单子出现了多少次
*/
SingleOutputStreamOperator<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator = line_DS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
/**
* 重写flatMap方法
* flatMap(String value, Collector<Tuple2<String, Integer>> out)
* String value表示进来的参数value是String类型的
* Collector<Tuple2<String, Integer>> out Collector是收集器,出去的参数out是二元组类型
* @param value
* @param out
* @throws Exception
*/
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//使用split按照空格进行拆分
String[] words = value.split(" ");
//拆分的结果是一个数组,然后遍历拆分的结果,也就是遍历出来每一个单词
for (String word : words) {
//将单词添加到二元组中,后面写1,就到达了(hello,1) (java,1) (hello,1) (flink,1)这样的效果
Tuple2<String, Integer> tuple2_of = Tuple2.of(word, 1);
//向下游发送数据,collect方法就是把数据发送到下游
out.collect(tuple2_of);
}
}
});
/**
* 跟批处理的聚合算子不一样,流处理中使用的是keyBy
* keyBy中的参数跟flatmap一样是一个接口,是KeySelector接口,所以需要去实现这个接口
* KeySelector<Tuple2<String, Integer>, String>
* 第一个参数Tuple2<String, Integer>表示进来的数据是一个什么类型的,进来的数据是一个二元组
* (hello,1) 二元组中第一个参数是String类型第二个参数是Integer类型,所以要写成Tuple2<String, Integer>
* 第二个参数String表示要分组的参数的类型,因为要对二元组中的第一个参数进行分组也就是(hello,1)中的hello分组
* 所以数据类型应该是String类型
*/
KeyedStream<Tuple2<String, Integer>, String> word_by = tuple2SingleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
//重写getKey方法 Tuple2<String, Integer> value表示传递进来的参数是一个二元组
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
/**
* 返回结果,返回的结果就是要根据谁进行分组
* 这里根据二元组的下标就不是通过0和1了,f0就表示下标0,f1就表示下标1,f2就表示下标2
* 因为不需要进行其他处理,只需要通过二元组中第一个参数分组就行,所以直接value.f0
*/
return value.f0;
}
});
//跟批处理的聚合一样,因为要对二元组中的第二个参数聚合,第二个参数的下标是1,直接写1
SingleOutputStreamOperator<Tuple2<String, Integer>> word_sum = word_by.sum(1);
word_sum.print();
//启动流处理,需要抛异常
env.execute();
}
}env.execute()方法接受一个字符串参数,这个参数是作业的名称。虽然这个参数是可选的(你可以调用无参的execute()方法),但提供一个明确的作业名称是一个好习惯,因为它有助于在Flink的Web UI或日志中更容易地识别你的作业。
env.execute(); 是Apache Flink中启动作业执行的命令。它触发了数据流图的构建、验证、优化、提交和执行等一系列过程,最终将你的数据处理逻辑转化为实际的计算结果。在Flink程序中,这行代码通常位于所有转换操作之后,作为程序的最后一行执行。
7> (flink,1) 3> (hello,1) 3> (hello,2) 2> (java,1) 3> (hello,3) 3> (yyds,1) 5> (world,1) 7> (flink,2)
这里可以看到跟批处理的结果不同,这就体现出来批处理跟流处理的区别,批处理是直接读取完所有的数据之后处理完再输出,流处理是来一条数据处理一条数据,流处理在读取wc.txt文件的时候,先读取第一行数据 hello world,然后去处理,拆分、分组、聚合、输出,走一个这样的流程,然后输出结果就是(hello,1)和(world,1),然后继续读取第二行 flink yyds, 第二行数据再次走一遍处理流程,输出的结果就是(flink,1)和(yyds,1),然后继续读取第三行数据,hello java,第三行数据再次走一遍处理流程,输出的结果就是(hello,2)和(java,1),为什么hello变成2了,是因为上次记录为1的时候flink已经存储下来了,将上次hello的状态保存了下来,这就是有状态处理,然后继续读取第四行数据,hello flink,再次经历一次数据处理,输出结果(hello,3)和(flink,2)
这就是流处理,进来一批数据就处理一批数据
输出结果前面的 7> 3>这些就是线程,flink执行的时候是并行执行的,线程数会根据自身电脑的配置,CPU有多少核心就有多少线程数,多个线程同时执行,所以能看到并行执行的数据输出的顺序是乱的。
Socket 的工作原理可以概括为以下几个步骤:
package wordcount;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @ClassName flink_wc_socket
* @Description TODO
* @Author 长风清留扬
* @Date 2024/8/8 22:47
* @Version 1.0
*/
public class flink_wc_socket {
public static void main(String[] args) {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取数据,从socket中读取数据
DataStreamSource<String> socket_DS = env.socketTextStream("127.0.0.1", 9999);
}
}DataStreamSource<String>DataStreamSource<String> 是 Flink API 中的一个接口,它继承自 DataStream<T>,但专门用于表示数据流的源头。这里的 T 被指定为 String,意味着这个数据流源将产生字符串类型的数据项。DataStream 是 Flink 中用于表示数据流的核心抽象,它支持一系列的操作,如转换(transformations)和聚合(aggregations),以处理数据流中的数据。envenv 是 StreamExecutionEnvironment 的一个实例。StreamExecutionEnvironment 是 Flink 程序的入口点,用于设置执行参数、创建数据源、定义转换和指定数据汇(sink)。env 通常通过调用 StreamExecutionEnvironment.getExecutionEnvironment() 方法来创建,该方法会根据程序的运行环境(是本地执行还是集群执行)返回适当的 StreamExecutionEnvironment 实例。socketTextStreamsocketTextStream 是 StreamExecutionEnvironment 类中的一个方法,用于从TCP套接字读取文本数据。"127.0.0.1",表示本地机器;端口号是 9999,这是一个有效的十进制端口号,用于监听或连接。socketTextStream 方法时,Flink 会尝试连接到指定的主机和端口,并从那里读取数据。读取的数据被假定为文本流,其中数据项由换行符(\n)分隔。socketTextStream 方法返回一个 DataStreamSource<String> 对象,该对象表示从套接字读取的数据流源。socket_DSsocket_DS 是一个变量名,用于引用由 socketTextStream 方法返回的数据流源对象。map、filter 等)来处理数据,或者定义数据汇(如将数据写入文件系统、数据库或通过网络发送)来输出结果。127.0.0.1)的 9999 端口。socketTextStream 方法通常只创建一个数据源实例,因此它可能不是高度并行的数据源。然而,你可以通过添加并行度设置或使用其他并行数据源来优化你的Flink程序。这里直接通过lambda的方式实现,并且直接连续的.分组 .聚合就可以了
package wordcount;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @ClassName flink_wc_socket
* @Description TODO
* @Author 长风清留扬
* @Date 2024/8/8 22:47
* @Version 1.0
*/
public class flink_wc_socket {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取数据,从socket中读取数据
DataStreamSource<String> socket_DS = env.socketTextStream("127.0.0.1", 9999);
//使用lambda表达式来实现
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socket_DS.flatMap(
(String value, Collector<Tuple2<String, Integer>> out) -> {
//拆分
String[] words = value.split(" ");
for (String word : words) {
Tuple2<String, Integer> Tuple2_of = Tuple2.of(word, 1);
out.collect(Tuple2_of);
}
}
)
.returns(Types.TUPLE(Types.STRING,Types.INT))
.keyBy(value -> value.f0)
.sum(1);
//输出结果
sum.print();
//执行流处理
env.execute();
}
}flatMap 方法.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
// 拆分
String[] words = value.split(" ");
for (String word : words) {
Tuple2<String, Integer> Tuple2_of = Tuple2.of(word, 1);
out.collect(Tuple2_of);
}
})flatMap 函数的实现。这个 Lambda 表达式接受两个参数:一个输入字符串 value 和一个 Collector<Tuple2<String, Integer>> 类型的输出收集器 out。
value.split(" ") 将输入的字符串 value 按照空格拆分成单词数组 words。
Tuple2<String, Integer> 对象。这里,Tuple2 是 Flink 提供的一个用于存储两个不同类型元素的类。在这个例子中,它用于存储单词(String 类型)和该单词的计数(初始化为 1,Integer 类型)。
out.collect(Tuple2_of) 方法将其对应的 Tuple2 对象收集到输出流中。这样,原始的字符串数据流就被转换成了包含单词和初始计数的 Tuple2 对象流。
returns 方法.returns(Types.TUPLE(Types.STRING, Types.INT))returns 方法用于明确指定流中元素的类型。虽然这在大多数情况下是可选的,因为它可以通过类型推断自动完成,但显式指定可以提高代码的可读性和在某些情况下的性能。这里,它指定了流中的元素是 Tuple2<String, Integer> 类型,其中第一个元素是 String 类型,第二个元素是 Integer 类型。注意:原代码中的 Types.INSTANT 是不正确的,应该使用 Types.INT。keyBy 方法.keyBy(value -> value.f0)keyBy 方法用于根据流中元素的某个键(或属性)对元素进行分区。在这个例子中,它使用了 Lambda 表达式 value -> value.f0 来指定分区键,其中 value 是流中的元素(即 Tuple2<String, Integer> 对象),而 value.f0 访问了该 Tuple2 对象的第一个元素(即单词)。这意味着所有具有相同单词的元素都将被发送到相同的下游任务中,以便进行后续的聚合操作。sum 方法.sum(1)sum 方法用于对具有相同分区键的元素进行聚合操作。在这个例子中,它指定了对 Tuple2 对象的第二个元素(即计数)进行求和操作。由于流已经被 keyBy 方法根据单词进行了分区,因此这个求和操作会针对每个单词分别进行,从而计算出每个单词在整个数据集中出现的总次数。4、启动netcat
在本地或者服务器或者虚拟机上执行该命令
nc -lk 9999nc:这是 netcat 命令的简写,用于执行网络相关的各种任务,如端口监听、数据传输等。-l:这个选项表示 netcat 应该以监听模式运行,即作为服务器端等待客户端的连接。-k:这个选项使 netcat 在处理完一个连接后不会立即退出,而是继续监听端口,等待新的连接。这允许 netcat 作为持续运行的守护进程或服务器使用。9999:这是指定的端口号,netcat 将在这个端口上监听传入的连接。5、启动flink程序
现在可以看到,flink程序启动之后一直没有输出,也没有关闭,因为发送端没有发送数据

从发送端发送一条hello word
发送之后flink立马就接收到数据了

再发送一条hello java
因为hello已经出现一次了,所以变成2

这就是真正的无界流的流处理,flink程序会一直运行,只要发送端发送数据,flink立刻接收并处理