★
感谢您的关注 + 点赞 + 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! ”
宝贝们,还记得前几天博主去的火山引擎大数据场嘛,其中比较令大家感兴趣的就是最后一讲,字节一站式埋点平台的 flink 标准化清洗及拆流任务。
其中大家感觉比较流啤的就是的就是字节做到了:
总的来说就是任务永不停,不可能停止的,好么,beiber。
★字节火山引擎 PPT。公众号回复 20210724 获取。 ”
6
★本文博主就主要介绍第一点,即做到规则动态变化,可以做到动态添加一个 sink kafka topic,动态删除一个 sink kafka topic,而不重启任务。相信能抛砖引玉,给大家一些启发。 ”
本文从以下几个章节详细介绍框架实现:
首先来看看字节他们做这件事情的背景:
如图:
因此诞生了这个框架。
项目代码、在博主公众号回复【揭秘字节跳动埋点数据实时动态处理引擎】获取:
上述的痛点很多,本节就从最痛的任务重启的延迟角度出发解决问题,揭秘字节动态配置化的 flink 任务的实现。
预期效果如下:
1.即在任务不停止的情况下可以动态的上线一个动态规则、一个 sink kafka topic,上线某个、某类埋点对应的流数据的 kafka topic
如图左边是修改配置,添加了一个拆流规则以及对应 topic,右边这个规则 topic 就开始产出数据,对应的 console consumer 就消费到了复合规则的数据。(gif 加载可能比较慢)
8
2.即在任务不停止的情况下可以动态的下线一个动态规则、一个 sink kafka topic,下线某个、某类埋点对应的流数据的 kafka topic
如图左边是修改配置,删除了一个拆流规则以及对应 topic,右边这个规则 topic 就不产出数据了,对应的 console consumer 就没有新数据可以消费了。(gif 加载可能比较慢)
9
3.总体效果如下:
首先带大家分析下,实现这个框架,最基本的模块都需要包含什么:
先说说方案选择的结论:
整体方案架构图如图所示:
项目代码、在博主公众号回复【揭秘字节跳动埋点数据实时动态处理引擎】获取:
4
5
整个任务的实现非常简单。
本地运行,可以参考下面两篇安装 zk 和 kafka。
首先来看看整个任务的入口逻辑,ProcessFunction 的功能很简单:
env.addSource(new UserDefinedSource())
.process(new ProcessFunction<ClientLogSource, ClientLogSink>() {
// 动态规则配置中心
private ZkBasedConfigCenter zkBasedConfigCenter;
// kafka producer 管理中心
private KafkaProducerCenter kafkaProducerCenter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.zkBasedConfigCenter = ZkBasedConfigCenter.getInstance();
this.kafkaProducerCenter = KafkaProducerCenter.getInstance();
}
@Override
public void processElement(ClientLogSource clientLogSource, Context context, Collector<ClientLogSink> collector)
throws Exception {
// 遍历所有的动态规则
this.zkBasedConfigCenter.getMap().forEach(new BiConsumer<Long, DynamicProducerRule>() {
@Override
public void accept(Long id, DynamicProducerRule dynamicProducerRule) {
// 验证该条数据是否符合该条规则
if (dynamicProducerRule.eval(clientLogSource)) {
// 将符合规则的数据发向对应规则的 topic 中
kafkaProducerCenter.send(dynamicProducerRule.getTargetTopic(), clientLogSource.toString());
}
}
});
}
@Override
public void close() throws Exception {
super.close();
// 关闭规则池
this.zkBasedConfigCenter.close();
// 关闭 producer 池
this.kafkaProducerCenter.close();
}
});
env.execute();
来看 flink ProcessFunction 中的核心点,第一部分就是 ZkBasedConfigCenter。其功能包含:
动态规则包含的内容与用户需求息息相关:
举例:用户需要将在首页上报 + id > 300 用户的客户端日志都写入 topic_id_bigger_than_300_and_main_page 的 kafka topic 中。
那么针对这个 flink 任务来说就有以下三项用户的输入:
clientLogSource.getId() > 300 && clientLogSource.getPage().equals("首页")
;其中 clientLogSource 是原始日志 modeltopic_id_bigger_than_300_and_main_page
针对上述要求设计动态规则配置的 schema 如下:
{
"id-数值类型 string": {
"condition-过滤条件": "1==1",
"targetTopic-目标 topic 名称": "tuzisir1"
}
"1": {
"condition": "clientLogSource.getId() > 300 && clientLogSource.getPage().equals(\"首页\")",
"targetTopic": "topic_id_bigger_than_300_and_main_page"
},
"2": {
"condition": "clientLogSource.getPage().equals(\"个人主页\")",
"targetTopic": "topic_profile_page"
}
}
对应动态规则 model 设计如下:
项目代码、在博主公众号回复【揭秘字节跳动埋点数据实时动态处理引擎】获取:
public class DynamicProducerRule implements Evaluable {
// 具体过滤规则
private String condition;
// 具体写入 topic
private String targetTopic;
// 使用 janino 编译的规则过滤器
private Evaluable evaluable;
public void init(Long id) {
try {
// 使用 janino 初始化规则
Class<Evaluable> clazz = JaninoUtils.genCodeAndGetClazz(id, targetTopic, condition);
this.evaluable = clazz.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public boolean eval(ClientLogSource clientLogSource) {
return this.evaluable.eval(clientLogSource);
}
}
重点在于 Evaluable 接口,动态生成代码就是继承了这个接口,用于执行过滤规则的基础接口。
代码动态生成下面会详细介绍。
public interface Evaluable {
// 动态规则接口过滤方法
boolean eval(ClientLogSource clientLogSource);
}
使用了 zk 作为动态配置中心,来动态监听规则配置以及更新规则池。
public class ZkBasedConfigCenter {
// zk config 变化监听器
private TreeCache treeCache;
// zk 客户端
private CuratorFramework zkClient;
private ZkBasedConfigCenter() {
try {
open();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// !!!规则池!!!规则池!!!规则池
private ConcurrentMap<Long, DynamicProducerRule> map = new ConcurrentHashMap<>();
private void open() throws Exception {
// 初始化规则
// 初始化 zk config 监听器
// 当有配置变更时
// 调用 private void update(String json) 更新规则
}
public void close() {
this.treeCache.close();
this.zkClient.close();
}
private void update(String json) {
Map<Long, DynamicProducerRule>
result = getNewMap(json);
// 1.将新增规则添加进规则池
// 2.将下线规则从规则池删除
}
private Map<Long, DynamicProducerRule> getNewMap(String json) {
// 将新规则解析,并使用 janino 进行初始化
}
}
可以使用一个固定路径的配置,如图博主使用的是 /kafka-config 这个路径
7
目前字节使用的引擎是 Groovy,但是博主常用 flink sql,sql 中的代码生成是使用 janino 做的,因此就比较了 janino 和 groovy 的性能差异,janino 编译出的原生 class 性能接近原生 class,是 Groovy 的 4 倍左右。其他的引擎不考虑,要么易用性差,要么性能差。
★Notes:性能这一点真的是很重要,1:4 的差距可以说是差别很大了。如果你的场景也是大流量,非常耗费性能的场景,建议直接入手 janino!!! ”
来看看具体的 benchmark case 代码:
// ClientLogSource 是原始日志
boolean eval(flink.examples.datastream._01.bytedance.split.model.ClientLogSource clientLogSource) {
return String.valueOf(clientLogSource.getId()).equals("1");
}
上面这段代码,在博主 mac 本地执行,每次循环执行 5kw 次,总计执行 5 次 得出的结果如下:
java:847 ms
janino:745 ms
groovy:4110 ms
java:1097 ms
janino:1170 ms
groovy:4052 ms
java:916 ms
janino:1117 ms
groovy:4311 ms
java:915 ms
janino:1112 ms
groovy:4382 ms
java:921 ms
janino:1104 ms
groovy:4321 ms
重复执行了很多次:java object : janino 编译原生 class :groovy :几乎都是 1:1:4 的耗时。所以此处我们选择性能更好的 janino。
public class JaninoUtils {
public static Class<Evaluable> genCodeAndGetClazz(Long id, String topic, String condition) throws Exception {
// 动态生成代码
// 初始化 Class<Evaluable> 并返回
}
}
来看入口类中的第二个核心点,就是 KafkaProducerCenter。其功能包含:
项目代码、在博主公众号回复【揭秘字节跳动埋点数据实时动态处理引擎】获取:
public class KafkaProducerCenter {
// kafka producer 池
private final ConcurrentMap<String, Producer<String, String>> producerConcurrentMap
= new ConcurrentHashMap<>();
private Producer<String, String> getProducer(String topicName) {
// 如果 kafka producer 池中有当前 topic 的 producer,则直接返回
// 如果没有,则初始化一个新的 producer 然后返回
}
public void send(String topicName, String message) {
final ProducerRecord<String, String> record = new ProducerRecord<>(topicName,
"", message);
try {
RecordMetadata metadata = getProducer(topicName).send(record).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void close() {
// 关闭所有 producer 连接
}
}
上面就是所有的代码、逻辑实现方案。其实整体看下来是非常简单的。
为这个任务分配独立的队列资源,每当这个任务加载到最新配置时,都将配置在本地存储一份。当配置中心挂了的时候,还可以直接加载机器本地的配置,不至于什么都产出不了。
本文主要揭秘、实现了字节跳动埋点数据实时动态处理引擎。