我有下面的代码来计数来自socketTextStream的单词。累积字数和时间加窗字数都是必需的。该程序存在一个问题,即cumulateCounts总是与窗口计数相同。为何会出现这个问题?基于窗口计数计算累积计数的正确方法是什么?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final HashMap<String, Integer> cumulateCounts = new HashMap<String, Integer>();
final DataStream<Tuple2<String, Integer>> counts = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.window(Time.of(5, TimeUnit.SECONDS))
.groupBy(0).sum(1)
.flatten();
counts.print();
counts.addSink(new SinkFunction<Tuple2<String, Integer>>() {
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
String word = value.f0;
Integer delta_count = value.f1;
Integer count = cumulateCounts.get(word);
if (count == null)
count = 0;
count = count + delta_count;
cumulateCounts.put(word, count);
System.out.println("(" + word + "," + count.toString() + ")");
}
});
发布于 2015-10-30 15:41:28
您应该首先分组,并在键控数据流上应用窗口(您的代码在Flink 0.9.1上工作,但是Flink 0.10.0中的新API对此非常严格):
final DataStream<Tuple2<String, Integer>> counts = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.groupBy(0)
.window(Time.of(5, TimeUnit.SECONDS)).sum(1)
.flatten();
如果在无键数据流上应用一个窗口,那么在单个机器上只会有一个线程窗口操作符(即没有并行性)来在整个流上构建窗口(在Flink 0.9.1中,这个全局窗口可以被groupBy()
拆分为子窗口--但是,在Flink 0.10.0中,这将不再起作用)。要计算单词,您需要为每个不同的键值构建一个窗口,即首先为每个键值(通过groupBy()
)获取一个子流,并在每个子流上应用一个窗口操作符(因此,您可以为每个子流创建一个自己的窗口操作符实例,允许并行执行)。
对于全局(累积)计数,可以简单地应用groupBy().sum()
结构。首先,流被分成子流(每个键值一个).第二,计算流上的和。因为流是而不是窗口的,所以计算(累积)中的和以及每个传入元组的更新(更详细地说,和的初始结果值为零,每个元组的结果被更新为result += tuple.value
)。每次调用sum之后,将发出新的当前结果。
在您的代码中,不应该使用特殊的接收器函数,但是应该这样做:
counts.groupBy(0).sum(1).print();
https://stackoverflow.com/questions/33446247
复制相似问题