在线机器学习中,样本是关键的一环。本文将给大家详细的介绍微博是如何用 Flink 来实现在线样本生成的。
在线样本生成对样本的时效性和准确性都有极高的要求。同样对作业的稳定性及是否容灾也都有严格的指标要求。基于这个前提,我们对目前较为流行的几种实时计算框架(Storm 0.10, Spark 2.11, Flink 1.10)进行了分析比较,结论如下:
因此,我们决定使用 Flink 来作为在线样本生成的实时流计算框架。
在线样本生成,简单描述一个业务场景:对用户的曝光数据和点击数据实时的做关联,关联后将数据输出到 Kafka 中,给下游的在线训练作业用。
首先我们要确定两个数据流关联的时间窗口。这一步一般建议先离线对两个数据流的日志做关联,通过离线的方式对两份数据在不同的时间范围内做 join,来判断在线需要的时间窗口。比如业务接受的最低关联比例是 85%,并且通过离线测试确认 20 分钟内两个数据流可以关联 85%的数据,那么就可以采用 20 分钟作为时间窗口。这里的关联比例和窗口时间实际上是在准确性和实时性之间的一个 trade-off。
确定时间窗口后,我们并没有使用 Flink 的 time window 来实现多个数据流的 join,而是选择采用 union + timer 方式来实现。这里主要考虑两点:第一、Flink 自带的 join 操作不支持多个数据流。第二、使用 timer+state 来实现,自定义程度更高,限制更少,也更方便。
接下来,我们把样本生成过程细分为:
一般我们的数据源包括 Kafka,Trigger,MQ 等。Flink 需要从数据源中实时的读取日志。
此处参考伪代码:
public class StateSampleFunction extends KeyedProcessFunction<String, Tuple2, ReturnSample> {
/**
* 这个状态是通过过程函数来维护,使用ValueState
*/
private ValueState state;
private Long timer = null;
public StateSampleFunction (String time){
timer = Long.valueOf(time);
}
@Override
public void open(Configuration parameters) throws Exception {
// 获取state
state = getRuntimeContext().getState(new ValueStateDescriptor<>("state", TypeInformation.of(new TypeHint< ReturnSample >() {})));
}
@Override
public void processElement(Tuple2value, Context context, Collector< ReturnSample > collector) throws Exception {
if (value.f0 == null){
return;
}
Object sampleValue = value.f1;
Long time = context.timerService().currentProcessingTime();
ReturnSample returnSample = state.value();
if (returnSample == null) {
returnSample = new ReturnSample();
returnSample.setKey(value.f0);
returnSample.setTime(time);
context.timerService().registerProcessingTimeTimer(time +timer);
}
// 更新点击数据到state里
if (sampleValue instanceof ClickLog){
ClickLog clickLog = (ClickLog)values;
returnSample =(ReturnSample) clickLog.setSample(returnSample);
}
state.update(returnSample);
}
/**
* @param timestamp
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector< ReturnSample > out) throws Exception {
ReturnSample value = state.value();
state.clear();
out.collect(value);
}
}
样本最终输出到实时的数据队列中。下面是实际的作业拓扑和运行时状态:
整个样本拼接过程的流程图:
使用 RocksDB/Gemini 作为 state 的 Backend 的优势和建议:
我们用大数据对 memory 和 RocksDB,Gemini 做了实验对比,结果显示 RocksDB 和 Gemin 在数据处理,作业稳定性和资源使用等方面比 memory 更合理。其中 Gemini 的优势最为明显。
此外,如果是大数据量的 state,建议使用 Gemini + SSD 固态硬盘。
1. Flink 作业的异常监控
2. 样本输入端 Kafka 的消费延迟监控
3. 样本输出端 Kafka 的写入量的监控
4. 样本监控
样本生成后,如何验证数据是否准确
样本异常对线上模型训练的影响非常大。当发现异常报警时,首先要做的是向在线模型训练作业发送样本异常的报警。收到报警信息后,模型停止更新。从而避免影响模型线上效果。
普通意义的业务故障解决后,丢弃原来的数据,所有输入日志流从最新的时间点开始消费并生成新的样本即可。重要业务需要重置输入日志流的 Kafka offset 从故障时间点开始重新生成样本数据。
通过平台化对样本生成的流程做出严格的规范非常重要。在平台化的过程中,需要提供简单通用的开发模板以提高作业开发效率;提供平台化的作业监控和样本指标监控框架,避免重复造车;提供通用的样本输出落地策略,和在线/离线校验策略,更便捷的为业务方服务。
微博基于 Flink 搭建的在线样本生成平台架构,如图:
UI 页面,如图:
基于平台化开发,用户只需要关心业务逻辑部分即可。需要用户开发的有:
其余的在 UI 上配置即可实现,具体有:
资源情况由平台方审核并配置。完成后,自动生成并提交作业。
作业提交后:
1. 平台会提供如前所述的作业相关监控,如下:
■ Flink 作业的异常监控
■ 样本监控
2. 平台会自动将数据落盘,存储到HDFS上。方便离线验证或者离线训练。
3. 用户只需将精力放到样本的验证上即可,由平台方保证作业的稳定性。
作者介绍:
曹富强,微博机器学习研发中心-高级系统工程师。现负责微博机器学习平台数据计算/数据存储模块,主要涉及实时计算 Flink、Storm、Spark Streaming,数据存储Kafka、Redis,离线计算 Hive、Spark 等。目前专注于 Flink/Kafka/Redis 在微博机器学习场景的应用,为机器学习提供框架,技术,应用层面的支持。
领取专属 10元无门槛券
私享最新 技术干货