2018刚过去,趁着春节放假对过去一年主导开发的项目做个梳理和总结
平台运营到一定阶段,一定会累积大批量的用户数据,这些用户数据是运营人员的黄金财产。而如何利用用户的数据来做运营(消息推送、触达消息、优惠券发送、广告位等),正是精准运营系统需要解决的问题。本文是基于信贷业务实践后写出来的,其它行业如保险、电商、航旅、游戏等也可以参考。
先看几个具有代表性的需求
用户可用额度在20000~50000元,而且有借款记录,未还本金为0,性别为“男” 用户发生了A行为且未还本金大于5000 用户在1天内发生A行为次数大于等于3次 用户在A行为前24小时内未发生B行为 用户在A行为后一个月内未发生B行为
业务上有两种消息类型
对于用户筛选条件,也主要有两种类型
早期方案.png
早期方案存在以下痛点
下面重点看下kafka connector和Elasticsearch如何使用
kafka connector有Source和Sink两种组件,Source的作用是读取数据到kafka,这里用开源实现debezium来采集mysql的binlog和postgres的xlog。Sink的作用是从kafka读数据写到目标系统,这里自己研发一套组件,根据配置的规则将数据格式化再同步到ES。 kafka connector有以下优点:
对于状态数据,由于状态的写操作相对较少,我们采取嵌套文档的方式,将同个用户的相关实体数据都同步写入到同个文档,具体实现用painless脚本做局部更新操作。效果类似这样:
{
"id":123,
"age":30,
"credit_line":20000,
"education":"bachelor",
...
"last_loan_applications":{
"loan_id":1234,
"status":"reject",
...
}
...
}
事件数据写入比较频繁,数据量比较多,我们使用父子文档的方式做关联,效果类似这样:
{
"e_uid":123,
"e_name":"loan_application",
"e_timestamp":"2019-01-01 10:10:00"
...
}
(e_前缀是为了防止同个index下同名字段冲突) ES这样存储一方面是方便做统计报表,另一方面跟用户筛选和触达有关。
在设计规则引擎前,我们对业界已有的规则引擎,主要包括Esper, Drools, Flink CEP,进行了初步调研。
Esper设计目标为CEP的轻量级解决方案,可以方便的嵌入服务中,提供CEP功能。 优势:
劣势:
Drools开始于规则引擎,后引入Drools Fusion模块提供CEP的功能。 优势:
劣势:
Flink 是一个流式系统,具有高吞吐低延迟的特点,Flink CEP是一套极具通用性、易于使用的实时流式事件处理方案。 优势:
劣势:
综上对比了几大开源规则引擎,发现都无法满足业务特点:
最终我们选择自己根据业务需要,开发基于json的自定义规则,规则类似下面例子:
{
"batchId": "xxxxxxxx", //流水号,创建每条运营规则时生成
"type": "trigger", //usual
"triggerEvent": "login",
"after": "2h", //分钟m,小时h,天d,月M
"pushRules": [//支持同时推送多条不同类型的消息
{
"pushType":
"sms", //wx,app,coupon
"channel": "cl",
"content": "hello #{userInfo.name}"
},
{
"pushType": "coupon",
"couponId": 1234
}
],
"statusConditions": [
{
"name": "and", //逻辑条件,支持与(and)或(or)非(not)
"conditions": [
{
"name": "range",
"field": "credit_line",
"left": 2000,
"right": 10000,
"includeLeft": true,
"includeRight": false
},
{
"name":"in",
"filed":"education",
"values":["bachelor","master"]
}
]
}
],
"eventConditions": [
{
"name": "or",//逻辑条件,支持与(and)或(or)非(not)
"conditions": [
{
"name": "event",
"function": "count", //聚合函数,目前只支持count
"eventName": "xxx_button_click",
"range": { //聚合结果做判断
"left": 1,
"includeLeft": true
},
"timeWindow": {
"type": "fixed", //fixed为固定窗口,sliding为滑动窗口
"start": "2019-01-01 01:01:01",
"end": "2019-02-01 01:01:01"
},
"conditions": [ //event查询条件继承and逻辑条件,所以事件也可以过滤字段
{
"name": "equals",
"field": "f1",
"value": "v1"
}
]
}
]
}
]
}
使用面向对象思维对过滤条件做抽象后,过滤条件继承关系如下:
然后代码里加一层parser把Condition都转成ES查询语句,实现轻量级的业务规则配置功能。
系统组成模块及功能如下: mysql binlog:mysql的数据变更,由kafka connector插件读取到kafka,数据源之一 postgres xlog:pg的数据变更,由kafka connector插件读取到kafka,数据源之一 report server:事件上报服务,数据源之一 tags:用户画像系统计算出来的标签,数据源之一 触发场景路由:分实时触发和延迟触发,实时触发直接到下一步,延迟触发基于 redis的延迟队列实现 用户筛选处理器:将筛选规则翻译为ES查询语句到ES查询用户数据,可以是批量的和单个用户的 幂等处理器:对数据做幂等处理,防止重复消费 变量渲染处理器:对推送内容做处理 推送适配器:兼容不同的推送方式 BloomFilter记录器:将推送用户和流水号记录到redis,用于幂等处理 推送事件记录器:将推送事件推入kafka 定时任务模块:基于elastic-job,处理定时推送任务 规则配置控制台:提供可视化配置界面(运营规则配置、数据采集规则配置、字段元数据配置等) 报表服务:提供报表查询功能 运营位服务:提供外部接口,根据条件匹配运营位(如启动图、首页banner图片等)
未来会继续从技术及业务两方面入手,将系统建设的更加易用、高效。