文档中心>流计算 Oceanus>快速入门>Flink 动态 CEP 快速入门

Flink 动态 CEP 快速入门

最近更新时间:2024-02-27 14:33:21

我的收藏

前提条件

Flink 动态 CEP 作业需运行于流计算独享集群,若还没有集群,请参考 创建独享集群

操作流程

步骤1:准备测试数据

准备上游 Kafka Topic
1. 登录 消息队列 CKafka 控制台,新建或选择可用的 Ckafka 实例。
2. 创建一个名称为 topic_cep_demo 的 Topic ,存放模拟的用户行为日志。
准备 MySQL 数据库
1. 登录 云数据库 MySQL 控制台,新建或选择可用的 MySQL 实例。
2. 创建 t_mysql_demo 规则表,用来记录 Flink CEP 作业中需要应用的规则。
CREATE DATABASE d_cep_demo; USE d_cep_demo; CREATE TABLE t_mysql_demo ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) );

步骤2:开发并启动 Flink CEP 作业

1. 联系在线客服获取 flink-cep 的 jar 包,并把 jar 包添加到您的 Maven 项目的依赖库中。
2. 开发作业代码。
3. 在流计算 Oceanus 控制台上,上传作业 JAR 包,部署并启动 JAR 作业。
主类:com.tencent.cloud.oceanus.cep.demo.FlinkCepDemo,主类入参如下。
--bootstrap-server 172.28.22.5:9092 --topic topic_cep_demo --properties.group.id flink_cep_demo --url jdbc:mysql://172.28.28.24:3306/d_cep_demo?user=root&password=waze6011601 --table-name t_mysql_demo --pattern-update-interval-ms 3000

步骤3:插入规则

1. 插入动态更新规则到 MySQL 表中。
INSERT INTO t_mysql_demo ( `id`, `version`, `pattern`, `function` ) values( '1', 1, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.tencent.cloud.oceanus.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}', 'com.tencent.cloud.oceanus.cep.demo.dynamic.DemoPatternProcessFunction') ;

为了方便使用并提高数据库中的 Pattern 字段的可读性,流计算 Oceanus 定义了一套 JSON 格式的规则描述。上述 SQL 语句中的 pattern 字段的值就是按照 JSON 格式的规则,给出的序列化后的 pattern 字符串。它的物理意义是去匹配这样的模式:连续3条 action 为 0 的事件发生后,下一条事件的 action 仍非 1。
2. 往 Kafka Topic topic_cep_demo 中发送消息。
1,ZhangSan,0,1,1697181992000 1,ZhangSan,0,1,1697181993000 1,ZhangSan,0,1,1697181994000 1,ZhangSan,0,1,1697181995000
查看 JobManager 日志,搜索 JDBCPeriodicPatternProcessorDiscoverer,查看最新的规则。
"{\\"name\\":\\"end\\",\\"quantifier\\":{\\"consumingStrategy\\":\\"SKIP_TILL_NEXT\\",\\"properties\\":[\\"SINGLE\\"],\\"times\\":null,\\"untilCondition\\":null},\\"condition\\":null,\\"nodes\\":[{\\"name\\":\\"end\\",\\"quantifier\\":{\\"consumingStrategy\\":\\"SKIP_TILL_NEXT\\",\\"properties\\":[\\"SINGLE\\"],\\"times\\":null,\\"untilCondition\\":null},\\"condition\\":{\\"className\\":\\"com.tencent.cloud.oceanus.cep.demo.condition.EndCondition\\",\\"type\\":\\"CLASS\\"},\\"type\\":\\"ATOMIC\\"},{\\"name\\":\\"start\\",\\"quantifier\\":{\\"consumingStrategy\\":\\"SKIP_TILL_NEXT\\",\\"properties\\":[\\"LOOPING\\"],\\"times\\":{\\"from\\":3,\\"to\\":3,\\"windowTime\\":null},\\"untilCondition\\":null},\\"condition\\":{\\"expression\\":\\"action == 0\\",\\"type\\":\\"AVIATOR\\"},\\"type\\":\\"ATOMIC\\"}],\\"edges\\":[{\\"source\\":\\"start\\",\\"target\\":\\"end\\",\\"type\\":\\"SKIP_TILL_NEXT\\"}],\\"window\\":null,\\"afterMatchStrategy\\":{\\"type\\":\\"SKIP_PAST_LAST_EVENT\\",\\"patternName\\":null},\\"type\\":\\"COMPOSITE\\",\\"version\\":1}"
查看 TaskManager 中日志,搜索 A match for Pattern of,查看 CEP 匹配的日志打印。
A match for Pattern of (id, version): (1, 1) is found. The event sequence: start: [Event(1, ZhangSan, 0, 1, 1697181992000), Event(1, ZhangSan, 0, 1, 1697181993000), Event(1, ZhangSan, 0, 1, 1697181994000)]end: [Event(1, ZhangSan, 0, 1, 1697181995000)]

步骤4:更新匹配规则,并查看更新的规则是否生效。

1. 更新 MySQL 表中的规则。
1.1 将 StartCondition 中的 action == 0 修改为 action == 0 || action == 2,并且我们将重复出现的次数从 >=3 改为 >=5,对应 SQL 语句如下。
INSERT INTO t_mysql_demo(`id`, `version`, `pattern`, `function`) values('1', 2, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.tencent.cloud.oceanus.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":5,"to":5,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0 || action == 2","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.tencent.cloud.oceanus.cep.demo.dynamic.DemoPatternProcessFunction');
1.2 再插入一条记录的 id 为 2 新规则。它和规则 1 的版本 1 一样,其 StartCondition 仍为 action == 0 且重复出现的次数为 >=3。
INSERT INTO t_mysql_demo(`id`, `version`, `pattern`, `function`) values('2', 1, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.tencent.cloud.oceanus.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.tencent.cloud.oceanus.cep.demo.dynamic.DemoPatternProcessFunction');
2. 往 Kafak Topic 发送数据。
1,ZhangSan,0,1,1697181992000 1,ZhangSan,0,1,1697181993000 1,ZhangSan,0,1,1697181994000 1,ZhangSan,2,1,1697181995000 1,ZhangSan,0,1,1697181996000 1,ZhangSan,0,1,1697181997000 1,ZhangSan,0,1,1697181998000 1,ZhangSan,2,1,1697181999000
3. 查看 TaskManager 中日志,搜索 A match for Pattern of,查看 CEP 匹配的日志打印。
ID 为1的规则匹配日志:
A match for Pattern of (id, version): (1, 2) is found. The event sequence: start: [Event(1, ZhangSan, 0, 1, 1697181992000), Event(1, ZhangSan, 0, 1, 1697181993000), Event(1, ZhangSan, 0, 1, 1697181994000), Event(1, ZhangSan, 2, 1, 1697181995000), Event(1, ZhangSan, 0, 1, 1697181996000)]end: [Event(1, ZhangSan, 0, 1, 1697181997000)]
ID 为2的规则匹配日志:
A match for Pattern of (id, version): (2, 1) is found. The event sequence: start: [Event(1, ZhangSan, 0, 1, 1697181992000), Event(1, ZhangSan, 0, 1, 1697181993000), Event(1, ZhangSan, 0, 1, 1697181994000)]end: [Event(1, ZhangSan, 2, 1, 1697181995000)] A match for Pattern of (id, version): (2, 1) is found. The event sequence: start: [Event(1, ZhangSan, 0, 1, 1697181996000), Event(1, ZhangSan, 0, 1, 1697181997000), Event(1, ZhangSan, 0, 1, 1697181998000)]end: [Event(1, ZhangSan, 2, 1, 1697181999000)]