先看一个简单的sql ,pv_id 去重计数
SELECT
visit_type,
count(DISTINCT pv_id) as pv_cnt
from exp_table
where ds=20220320
group by visit_type;
在默认情况下,相同的visit_type 的pv_id 会被分配到同一个reducer中处理,如果某个visit_type的数据量特别大,那么对应的reducer执行耗时会比较久或者可能会发生OOM,因此常规优化方式是:
select
visit_type,count(*)
from (
SELECT
visit_type,pv_id
from exp_table
where ds=20220320
group by visit_type,pv_id
) group by visit_type;
也就是将count distinct 转换为 group by 操作,第一层根据visit_type,pv_id分组,第二层根据visit_type 直接求和即可,使数据分布更加均匀。但是 这种方式在第二层group by 也可能会产生大量的数据shuffle操作,可以再次优化:
select
visit_type,sum(cnt)
from (
SELECT
visit_type,
count(distinct pv_id) as cnt
from exp_table
where ds=20220320
group by visit_type,hash(pv_id)%50
) group by visit_type;
第一层使用visit_type+hash(pv_id)%50 方式分组,对相同visit_type下的pv_id分了50组,保证相同pv_id 都能分配到相同的reducer中去,然后执行去重计数(cnt)操作,然后在第二层中根据visit_type 分组,对cnt求和即可。这种方式在第二层shuffle过程中数据就会相对减少很多。
SELECT
visit_type,
count(distinct pv_id),
count(distinct item_id)
from exp_table
where ds=20220320
group by visit_type;
这次同时需要对pv_id与item_id去重计数,如果还是按照上述的优化方式将visit_type、pv_id、item_id组合很显然已经行不通了,没办法保证相同的session_id或者item_id都会分配在同一个reducer中去。先使用常规意义上的操作:
SELECT a.visit_type
,a.cnt1
,b.cnt2
FROM (
SELECT visit_type
,count(*) AS cnt1
FROM (
SELECT visit_type
,pv_id
FROM exp_table
WHERE ds = 20220320
GROUP BY visit_type
,pv_id
)
GROUP BY visit_type
) a
join (
SELECT visit_type
,count(*) AS cnt2
FROM (
SELECT visit_type
,item_id
FROM exp_table
WHERE ds = 20220320
GROUP BY visit_type
,item_id
)
GROUP BY visit_type
) b
ON a.visit_type = b.visit_type
;
也就是先拆分再join, 很显然这种方式开发难度大,特别是在处理字段更多的情况下。再重新按照单字段优化方式思考,希望按照所有的去重字段组合的情况下,仍然能够保证相同pv_id或者item_id都会分配在同一个reducer中去处理, 也是pv_id与item_id各自不影响其分配方式,可以采取先扩充数据,即将每一条数据扩充到去重字段个数的倍数,并且保证一个去重的字段不为空,并且增加标识字段,表明去重的列,如下图:
扩充后的数据执行常规的去重操作,即然后组合去重字段分组然后最外层进行汇总,由于扩充之后的数据每一条只有一个不为空的列,那么在执行shuffle 的时候,相同的pv_id或者item_id一定会分配在同一个reducer中去处理。数据扩充使用udtf实现:
@Override
public void process(Object[] args) throws UDFException {
// TODO
for(int i=0;i<args.length;i++){
Object[] nObjects=new Object[args.length+1];
for(int j=0;j<args.length;j++){
if(i==j) {
nObjects[j]=args[i];
}else{
nObjects[j]=null;
}
}
nObjects[args.length]="flag"+i;
this.forward(nObjects);
}
}
具体优化sql:
SELECT visit_type
,count(CASE WHEN TYPE='flag0' THEN 1 END) AS pv_cnt
,count(CASE WHEN TYPE='flag1' THEN 1 END) AS item_cnt
FROM (
SELECT visit_type
,pv_id1
,item_id1
,type
FROM (
SELECT visit_type
,pv_id1
,item_id1
,type
FROM exp_table
LATERAL VIEW ExpandHash(pv_id,item_id) tmp AS pv_id1,item_id1,type
WHERE ds = 20220320
)
GROUP BY visit_type
,pv_id1
,item_id1
,type
)
GROUP BY visit_type
这种方式导致了数据量翻倍,在shuffle阶段io 也会耗时增加,具体耗时、资源消耗以实际情况为准,然后去做均衡具体选择哪一种方式。
思考
Q: 同时存在count distinct 与 sum 类的聚合该如何优化倾斜问题?
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有