如果在你的脑海里,“Apache Flink”和“流处理”没有很强的联系,那么你可能最近没有看新闻。Apache Flink已经席卷全球大数据领域。现在正是这样的工具蓬勃发展的绝佳机会:流处理在数据处理中变得越来越流行,Apache Flink引入了许多重要的创新。
在本文中,我将演示如何使用Apache Flink编写流处理算法。我们将读取维基百科的编辑流,并将了解如何从中获得一些有意义的数据。在这个过程中,您将看到如何读写流数据,如何执行简单的操作以及如何实现更复杂一点的算法。
我相信,如果您是Apache Flink新手,最好从学习批处理开始,因为它更简单,并能为您学习流处理提供一个坚实的基础。我已经写了一篇介绍性的博客文章,介绍如何使用Apache Flink 进行批处理,我建议您先阅读它。
如果您已经知道如何在Apache Flink中使用批处理,那么流处理对您来说没有太多惊喜。和以前一样,我们将看看应用程序中的三个不同的阶段:从源中读取数据,处理数据以及将数据写入外部系统。
与批处理相比,这几乎没有显着差异。首先,在批处理中,所有数据都被提前准备好。当处理进程在运行时,即使有新的数据到达我们也不会处理它。
不过,在流处理方面有所不同。我们在生成数据时会读取数据,而我们需要处理的数据流可能是无限的。采用这种方法,我们几乎可以实时处理传入数据。
在流模式下,Flink将读取数据并将数据写入不同的系统,包括Apache Kafka,Rabbit MQ等基本上可以产生和使用稳定数据流的系统。需要注意的是,我们也可以从HDFS或S3读取数据。在这种情况下,Apache Flink会不断监视一个文件夹,并在文件生成时处理它们。
以下是我们如何在流模式下从文件中读取数据:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.readTextFile("file/path");
请注意,要使用流处理,我们需要使用StreamExecutionEnvironment
类而不是ExecutionEnvironment
类。此外,读取数据的方法会返回一个稍后将用于数据处理的DataStream
类的实例。
我们也可以像批处理案例中那样从集合或数组创建有限流:
DataStream<Integer> numbers = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5 6);
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
对于处理流中的一个流项目,Flink提供给操作员一些类似批处理的操作如map
, filter
,mapReduce
。
让我们来实现我们的第一个流处理示例。我们将阅读一个维基百科的编辑流并显示我们感兴趣的内容。
首先,要阅读编辑流,我们需要使用WikipediaEditsSource
:
DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());
要使用它,我们需要调用用于从Kafka,Kinesis,RabbitMQ等源中读取数据的方法addSource
。此方法返回我们当前可以处理的编辑流。
让我们来筛选所有非机器生成并且已经改变了一千多个字节的编辑:
edits.filter((FilterFunction<WikipediaEditEvent>) edit -> {
return !edit.isBotEdit() && edit.getByteDiff() > 1000;
})
.print();
这与在批处理情况下如何使用filter
方法非常相似,唯一的不同是它处理的是无限流。
现在最后一步是运行我们的程序。像以前一样,我们需要调用执行方法execute
:
env.execute()
该程序将开始打印筛选的Wikipedia编辑,直到我们停止它:
2> WikipediaEditEvent{timestamp=1506499898043, channel='#en.wikipedia', title='17 FIBA Womens Melanesia Basketball Cup', diffUrl='https://en.wikipedia.org/w/index.php?diff=802608251&oldid=802520770', user='Malto15', byteDiff=1853, summary='/* Preliminary round */', flags=0}
7> WikipediaEditEvent{timestamp=1506499911216, channel='#en.wikipedia', title='User:MusikBot/StaleDrafts/Report', diffUrl='https://en.wikipedia.org/w/index.php?diff=802608262&oldid=802459885', user='MusikBot', byteDiff=11674, summary='Reporting 142 stale non-AfC drafts', flags=0}
...
请注意,到目前为止,我们已经讨论过的所有方法都是针对流中的各个元素进行的。看上去我们不可能使用这些简单的操作来实现出许多有趣的流算法。仅使用它们不可能实现以下用例:
很明显,要解决这些问题,我们需要处理一组元素。这是流窗口的用途。
简而言之,流窗口允许我们对流中的元素进行分组,并对每个组执行用户自定义的功能。这个用户自定义函数可以返回零个,一个或多个元素,并以这种方式创建一个新的流,我们可以在一个独立的系统中处理或存储它。
我们如何将流中的元素分组?Flink提供了几个选项来执行此操作:
除了选择如何将元素分配给不同的窗口,我们还需要选择一个流类型。Flink有两种流类型:
现在,让我们使用流窗口来进行一些演示。首先,让我们来看看维基百科每分钟执行多少次编辑。首先,我们需要读取编辑流:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());
然后,我们指定要将流分成一分钟不重复的窗口:
edits
// Non-overlapping one-minute windows
.timeWindowAll(Time.minutes(1))
现在,我们可以定义一个自定义函数来处理每个一分钟窗口中的所有元素。要做到这一点,我们将使用apply
方法并传递接口AllWindowFunction
:
edits
.timeWindowAll(Time.minutes(1))
.apply(new AllWindowFunction<WikipediaEditEvent, Tuple3<Date, Long, Long>, TimeWindow>() {
@Override
public void apply(TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple3<Date, Long, Long>> collector) throws Exception {
long count = 0;
long bytesChanged = 0;
// Count number of edits
for (WikipediaEditEvent event : iterable) {
count++;
bytesChanged += event.getByteDiff();
}
// Output a number of edits and window's end time
collector.collect(new Tuple3<>(new Date(timeWindow.getEnd()), count, bytesChanged));
}
})
.print();
尽管有点长,但该方法非常简单。apply
方法接收三个参数:
timeWindow
:包含关于我们正在处理的窗口的信息。iterable
:单个窗口中元素的迭代器。collector
:可以用来将元素输出到结果流中的对象。我们在这里所做的是计算多个更改,然后使用collector
实例输出计算结果以及窗口的结束时间戳。
如果我们运行这个程序,我们将看到apply
方法生成的条目打印到了输出流中:
1> (Wed Sep 27 12:58:00 IST 2017,62,62016)
2> (Wed Sep 27 12:59:00 IST 2017,82,12812)
3> (Wed Sep 27 13:00:00 IST 2017,89,45532)
4> (Wed Sep 27 13:01:00 IST 2017,79,11128)
5> (Wed Sep 27 13:02:00 IST 2017,82,26582)
现在,我们来看一个更复杂的例子。我们来计算一个用户每十分钟的间隔进行了多少次编辑。这可以帮助识别最活跃的用户或在系统中发现一些不寻常的活动。
当然,我们可以使用非键控流,迭代窗口中的所有元素,并使用一个字典来跟踪计数。但这种方法不利于推广,因为非键控流不可并行化。为了高效地使用Flink集群的资源,我们需要通过用户名键入我们的流,这将创建多个逻辑流,每个用户一个。
DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());
edits
// Key by user name
.keyBy((KeySelector<WikipediaEditEvent, String>) WikipediaEditEvent::getUser)
// Ten-minute non-overlapping windows
.timeWindow(Time.minutes(10))
唯一的区别是我们使用keyBy
方法为我们的流指定一个键。在这里,我们简单地使用用户名作为分区键。
现在,当我们有一个键控流时,我们可以执行一个函数来处理每个窗口。和以前一样,我们将使用apply
方法:
edits
.keyBy((KeySelector<WikipediaEditEvent, String>) WikipediaEditEvent::getUser)
.timeWindow(Time.minutes(10))
.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
@Override
public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
long changesCount = 0;
// Count number of changes
for (WikipediaEditEvent ignored : iterable) {
changesCount++;
}
// Output user name and number of changes
collector.collect(new Tuple2<>(userName, changesCount));
}
})
.print();
这里一个比较大的区别是这个版本的apply
方法有四个参数。额外的第一个参数为我们的函数正在处理的逻辑流指定一个键。
如果我们执行这个程序,我们将得到一个流,其中每个元素包含一个用户名和一个用户每10分钟执行的编辑次数:
...
5> (InternetArchiveBot,6)
1> (Francis Schonken,1)
6> (.30.124.210,1)
1> (MShabazz,1)
5> (Materialscientist,18)
1> (Aquaelfin,1)
6> (Cote d'Azur,2)
1> (Daniel Cavallari,3)
5> (00:1:F159:6D32:2578:A6F7:AB88:C8D,2)
...
正如你所看到的,今天有一些用户在维基百科上疯狂编辑!
这是一篇介绍性文章,还有更多有关Apache Flink的东西。我会在不久的将来写更多关于Flink的文章,敬请关注!