去重指标作为业务分析里面的一个重要指标,不管是在OLAP存储引擎还是计算引擎都对其实现做了大量工作,在面对不同的数据量、指标精确性要求,都有不同的实现方式,但是总体都逃脱不了硬算、两阶段方式、bitmap、hll等这些实现。本文将分析Split Distinct Aggregation实现原理与使用代码方式实现其功能。
如果要使用Sql去实现一个去重功能,通常会这样实现:
SELECT day, COUNT(DISTINCT user_id) FROM T GROUP BY day --sql1
或者
select day,count(*) from(
select distinct user_id,day from T ) a
group by day --sql2
在之前的去重系列中SQL方式去重中也对这两种实现方式进行了分析,但是这两种方式都未解决计算热点问题,例如当某一个day 对应的devId 特别大的情况下,那么计算压力都会到该day所在的task,使这个task成为任务的性能瓶颈。
Split Distinct Aggregation是从Flink-1.9版本开始提供的一个对去重的优化功能,该功能必须在Blink planner下并且配置:
val tEnv: TableEnvironment = ...
tEnv.getConfig.getConfiguration .setString("table.optimizer.distinct-agg.split.enabled", "true")
那么sql1 在其内部会转换为
SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
MOD(HASH_CODE(user_id), 1024) 表示对取user_id的hashCode然后对1024取余,也就是将user_id划分到1024个桶里面去,那么里层通过对day与桶编号进行去重(cnt)外层只需要对cnt执行sum操作即可,因为分桶操作限制了相同的user_id 一定会在相同的桶里面,执行效果图如下:
我们也通过tabEnv.explain方式打印执行计划验证一下是否是真的这样执行:
Stage 5 : Operator
content : Calc(select=[status, devId, (HASH_CODE(devId) MOD 1024) AS $f2])
Stage 7 : Operator
content : GroupAggregate(groupBy=[status, $f2], partialFinalType=[PARTIAL], select=[status, $f2, COUNT(DISTINCT devId) AS $f2_0])
ship_strategy : HASH
Stage 9 : Operator
content : GroupAggregate(groupBy=[status], partialFinalType=[FINAL], select=[status, $SUM0_RETRACT($f2_0) AS $f1])
ship_strategy : HAS
Stage 5 中执行分桶操作,Stage 7分桶之后去重操作,Stage 9 最终的sum操作。
在去重系列中实现了使用MapState去重方式,仍然在此基础上来完成Split Distinct Aggregation功能,其业务场景是实时计算广告位访客数,流量数据id(广告位ID)、devId(访问ID)、time(访问时间),实现思路:
•首先通过对id、设备id分桶编号、小时级别时间分组,使用一个ProcessFunction计算分桶后的去重数(与MapState方式相同)•然后通过对id、小时级别时间分组,使用另一个ProcessFunction做sum操作,但是这里面需要注意的一个问题是对于相同id与时间其数据可能会来源于上游不同的task,而上游的每个task的数据都会以全量一直往下发送,如果直接做累加操作会导致重复计算,因此得实现一个类似于sql中retract撤回机制(可参考Flink SQL中可撤回机制解密),也就是上一个ProcessFunction每发送一条数据都需要先将之前的数据发送一份表示其为撤回。
接下来看具体的代码实现,数据结构:
--流量数据
case class AdData(id:Int,devId:String,time:Long)
--第一次keyBy数据
case class AdKey1(id:Int,time:Long,bucketCode:Int)
--第二次keyBy数据
case class AdKey2(id:Int,time:Long)
去重实现Distinct1ProcessFunction:
class Distinct1ProcessFunction extends KeyedProcessFunction[AdKey1, AdData, Tuple2[Boolean, Tuple3[Int, Long, Long]]] {
var devIdState: MapState[String, Int] = _
var devIdStateDesc: MapStateDescriptor[String, Int] = _
var countState: ValueState[Long] = _
var countStateDesc: ValueStateDescriptor[Long] = _
override def open(parameters: Configuration): Unit = {
devIdStateDesc = new MapStateDescriptor[String, Int]("devIdState", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int]))
devIdState = getRuntimeContext.getMapState(devIdStateDesc)
countStateDesc = new ValueStateDescriptor[Long]("countState", TypeInformation.of(classOf[Long]))
countState = getRuntimeContext.getState(countStateDesc)
}
override def processElement(value: AdData, ctx: KeyedProcessFunction[AdKey1, AdData, Tuple2[Boolean, Tuple3[Int, Long, Long]]]#Context, out: Collector[Tuple2[Boolean, Tuple3[Int, Long, Long]]]): Unit = {
val devId = value.devId
devIdState.get(devId) match {
case 1 => {
//表示已经存在
}
case _ => {
//表示不存在
devIdState.put(devId, 1)
val c = countState.value()
val currV = c + 1
countState.update(currV)
if (currV > 1) {
--认为大于1的需要执行撤回
out.collect(Tuple2.apply(false, Tuple3.apply(ctx.getCurrentKey.id, ctx.getCurrentKey.time, c)))
out.collect(Tuple2.apply(true, Tuple3.apply(ctx.getCurrentKey.id, ctx.getCurrentKey.time, currV)))
} else {
out.collect(Tuple2.apply(true, Tuple3.apply(ctx.getCurrentKey.id, ctx.getCurrentKey.time, currV)))
}
}
}
}
}
撤回实现同样使用boolean标识,false表示为撤回数据,true表示正常insert的数据。
聚合实现Distinct2ProcessFunction:
class Distinct2ProcessFunction extends KeyedProcessFunction[Tuple2[Int, Long], Tuple2[Boolean, Tuple3[Int, Long, Long]], Void] {
var cntState: ValueState[Long] = _
var cntStateDesc: ValueStateDescriptor[Long] = _
override def open(parameters: Configuration): Unit = {
cntStateDesc = new ValueStateDescriptor[Long]("distinctValue", TypeInformation.of(classOf[Long]))
cntState = getRuntimeContext.getState(cntStateDesc)
}
override def processElement(value: (Boolean, (Int, Long, Long)), ctx: KeyedProcessFunction[(Int, Long), (Boolean, (Int, Long, Long)), Void]#Context, out: Collector[Void]): Unit = {
val currV = cntState.value()
value._1 match {
case true => {
cntState.update(currV + value._2._3
println(ctx.getCurrentKey + ":" + cntState.value())
}
case false => {
--撤回操作
cntState.update(currV - value._2._3)
println(ctx.getCurrentKey + ":" + cntState.value())
}
}
}
}
重点在于如果收到编码为false 的数据,那么需要从当前计数里面减掉撤回的计数值。
主流程:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaConfig = new Properties()
kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "test1")
val consumer = new FlinkKafkaConsumer[String]("topic1", new SimpleStringSchema, kafkaConfig)
val ds = env.addSource(consumer)
.map(x => {
val s = x.split(",")
AdData(s(0).toInt, s(1), s(2).toLong)
}).keyBy(x => {
val endTime = TimeWindow.getWindowStartWithOffset(x.time, 0,
Time.hours(1).toMilliseconds) + Time.hours(1).toMilliseconds
AdKey1(x.id, endTime, x.devId.hashCode % 3)
}).process(new Distinct1ProcessFunction)
.keyBy(x => {
Tuple2.apply(x._2._1, x._2._2)
}).process(new Distinct2ProcessFunction)
env.execute()
Split Distinct Aggregation是去重计算在数据倾斜的情况下的优化的一种思路,类似于两阶段聚合,第一阶段执行打散操作,第二阶段执行累加操作,这是一种通用的优化思路,而对于使用代码方式实现其重点在于第一阶段到第二阶段的撤回思路避免数据的重复计算
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有