通过Kafka发送模拟实时车辆JSON信息给Flink接收,FlinkSQL将实时车辆JSON信息与t_violation_list
表中已经捕捉到的违规违章车辆做连接对比,选择出通过当前路段的违章记录车辆,将其存入HBase中。
本次需求四案例,将基于t_violation_list
表中数据:
CREATE TABLE `t_violation_list` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`car` varchar(255) NOT NULL,
`violation` varchar(1000) DEFAULT NULL,
`create_time` bigint(20) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8
INSERT INTO `t_violation_list` (`car`, `violation`, `create_time`) VALUES
('豫A99999', '嫌疑套牌车', 1686690777),
('豫DF09991', '排水道过弯', 1686609999);
当前模拟了两辆车在t_violation_list
表中,稍后启动Kafka Topic-car时,只有在该表中的车,会被记录到HBase。
mysql> select * from t_violation_list;
+----+------------+-----------------+-------------+
| id | car | violation | create_time |
+----+------------+-----------------+-------------+
| 2 | 豫A99999 | 嫌疑套牌车 | 1686690777 |
| 3 | 豫DF09991 | 排水道过弯 | 1686609999 |
+----+------------+-----------------+-------------+
2 rows in set (0.00 sec)
CREATE TABLE table1 (
`actionTime` BIGINT,
`monitorId` STRING,
`cameraId` STRING,
`car` STRING,
`speed` double,
`roadId` STRING,
`areaId` STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'topic-car',
'properties.bootstrap.servers' = 'hadoop10:9092',
'properties.group.id' = 'c1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE table2 (
id INT,
car STRING,
violation STRING,
create_time BIGINT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop10:3306/yangyulin?useSSL=false&useUnicode=true&characterEncoding=utf8',
'table-name' = 't_violation_list',
'username' = 'root',
'password' = '0000'
);
create table table3(
rowkey STRING,
cf1 ROW<monitorId String,cameraId String,car String,roadId String,areaId String,speed Double,actionTime BIGINT>,
PRIMARY KEY (rowkey) NOT ENFORCED
)with(
'connector' = 'hbase-2.2',
'table-name' = 't_track_info',
'zookeeper.quorum' = 'hadoop10:2181'
);
insert into table3
select concat(car,'_',cast(actionTime as string)),ROW(monitorId,cameraId,car,roadId,areaId,speed,actionTime)
from(
select t1.monitorId,t1.cameraId,t1.car,t1.roadId,t1.areaId,t1.speed,t1.actionTime
from table1 t1
inner join table2 for system_time as of t1.proctime as t2
on t1.car = t2.car
)t3
从Topic-car中发送JSON信息给Flink
[root@hadoop10 kafka0.11]# kafka-console-producer.sh --broker-list hadoop10:9092 --topic topic-car >{"actionTime":1686647525,"monitorId":"0001","cameraId":"1","car":"豫A99999","speed":100,"roadId":"01","areaId":"20"}
>{"actionTime":1686647525,"monitorId":"0001","cameraId":"1","car":"豫DF09991","speed":100,"roadId":"01","areaId":"20"}
>{"actionTime":1686647526,"monitorId":"0001","cameraId":"1","car":"LZ21740","speed":100,"roadId":"01","areaId":"20"}
包括了创建t_track_info','cf1
表和两次查询表中内容,注意查询后row的数量,即为成功插入的违法车辆追踪信息
hbase:002:0> create 't_track_info','cf1'
Created table t_track_info
Took 0.8506 seconds
=> Hbase::Table - t_track_info
hbase:003:0> scan 't_track_info'
ROW COLUMN+CELL
0 row(s)
Took 0.0376 seconds
hbase:004:0> scan 't_track_info'
ROW COLUMN+CELL
\xE8\xB1\xABA99999_1686647525 column=cf1:actionTime, timestamp=2023-06-16T00:06:36.355, value=\x00\x00\x00\x00d\x882\xE5
\xE8\xB1\xABA99999_1686647525 column=cf1:areaId, timestamp=2023-06-16T00:06:36.355, value=20
\xE8\xB1\xABA99999_1686647525 column=cf1:cameraId, timestamp=2023-06-16T00:06:36.355, value=1
\xE8\xB1\xABA99999_1686647525 column=cf1:car, timestamp=2023-06-16T00:06:36.355, value=\xE8\xB1\xABA99999
\xE8\xB1\xABA99999_1686647525 column=cf1:monitorId, timestamp=2023-06-16T00:06:36.355, value=0001
\xE8\xB1\xABA99999_1686647525 column=cf1:roadId, timestamp=2023-06-16T00:06:36.355, value=01
\xE8\xB1\xABA99999_1686647525 column=cf1:speed, timestamp=2023-06-16T00:06:36.355, value=@Y\x00\x00\x00\x00\x00\x00
1 row(s)
Took 0.0363 seconds
hbase:005:0> scan 't_track_info'
ROW COLUMN+CELL
\xE8\xB1\xABA99999_1686647525 column=cf1:actionTime, timestamp=2023-06-16T00:06:36.355, value=\x00\x00\x00\x00d\x882\xE5
\xE8\xB1\xABA99999_1686647525 column=cf1:areaId, timestamp=2023-06-16T00:06:36.355, value=20
\xE8\xB1\xABA99999_1686647525 column=cf1:cameraId, timestamp=2023-06-16T00:06:36.355, value=1
\xE8\xB1\xABA99999_1686647525 column=cf1:car, timestamp=2023-06-16T00:06:36.355, value=\xE8\xB1\xABA99999
\xE8\xB1\xABA99999_1686647525 column=cf1:monitorId, timestamp=2023-06-16T00:06:36.355, value=0001
\xE8\xB1\xABA99999_1686647525 column=cf1:roadId, timestamp=2023-06-16T00:06:36.355, value=01
\xE8\xB1\xABA99999_1686647525 column=cf1:speed, timestamp=2023-06-16T00:06:36.355, value=@Y\x00\x00\x00\x00\x00\x00
\xE8\xB1\xABDF09991_1686647525 column=cf1:actionTime, timestamp=2023-06-16T00:07:43.398, value=\x00\x00\x00\x00d\x882\xE5
\xE8\xB1\xABDF09991_1686647525 column=cf1:areaId, timestamp=2023-06-16T00:07:43.398, value=20
\xE8\xB1\xABDF09991_1686647525 column=cf1:cameraId, timestamp=2023-06-16T00:07:43.398, value=1
\xE8\xB1\xABDF09991_1686647525 column=cf1:car, timestamp=2023-06-16T00:07:43.398, value=\xE8\xB1\xABDF09991
\xE8\xB1\xABDF09991_1686647525 column=cf1:monitorId, timestamp=2023-06-16T00:07:43.398, value=0001
\xE8\xB1\xABDF09991_1686647525 column=cf1:roadId, timestamp=2023-06-16T00:07:43.398, value=01
\xE8\xB1\xABDF09991_1686647525 column=cf1:speed, timestamp=2023-06-16T00:07:43.398, value=@Y\x00\x00\x00\x00\x00\x00
2 row(s)
Took 0.0394 seconds
hbase:006:0> scan 't_track_info'
ROW COLUMN+CELL
\xE8\xB1\xABA99999_1686647525 column=cf1:actionTime, timestamp=2023-06-16T00:06:36.355, value=\x00\x00\x00\x00d\x882\xE5
\xE8\xB1\xABA99999_1686647525 column=cf1:areaId, timestamp=2023-06-16T00:06:36.355, value=20
\xE8\xB1\xABA99999_1686647525 column=cf1:cameraId, timestamp=2023-06-16T00:06:36.355, value=1
\xE8\xB1\xABA99999_1686647525 column=cf1:car, timestamp=2023-06-16T00:06:36.355, value=\xE8\xB1\xABA99999
\xE8\xB1\xABA99999_1686647525 column=cf1:monitorId, timestamp=2023-06-16T00:06:36.355, value=0001
\xE8\xB1\xABA99999_1686647525 column=cf1:roadId, timestamp=2023-06-16T00:06:36.355, value=01
\xE8\xB1\xABA99999_1686647525 column=cf1:speed, timestamp=2023-06-16T00:06:36.355, value=@Y\x00\x00\x00\x00\x00\x00
\xE8\xB1\xABDF09991_1686647525 column=cf1:actionTime, timestamp=2023-06-16T00:07:43.398, value=\x00\x00\x00\x00d\x882\xE5
\xE8\xB1\xABDF09991_1686647525 column=cf1:areaId, timestamp=2023-06-16T00:07:43.398, value=20
\xE8\xB1\xABDF09991_1686647525 column=cf1:cameraId, timestamp=2023-06-16T00:07:43.398, value=1
\xE8\xB1\xABDF09991_1686647525 column=cf1:car, timestamp=2023-06-16T00:07:43.398, value=\xE8\xB1\xABDF09991
\xE8\xB1\xABDF09991_1686647525 column=cf1:monitorId, timestamp=2023-06-16T00:07:43.398, value=0001
\xE8\xB1\xABDF09991_1686647525 column=cf1:roadId, timestamp=2023-06-16T00:07:43.398, value=01
\xE8\xB1\xABDF09991_1686647525 column=cf1:speed, timestamp=2023-06-16T00:07:43.398, value=@Y\x00\x00\x00\x00\x00\x00
2 row(s)
Took 0.0459 seconds
本次需求使用了FlinkSQL的网页可视化工具Dlink,在安装Dlink过程中遇到了很多坑,官方文档和官方二进制文件似乎不太对版,Dinky和Dlink的名词使用有模糊歧义,出现了很多异常和错误,尤其是要注意Jar包的正确导入。除此之外,按照Dlink官方网站给的文档进行安装基本是傻瓜式的复制命令敲入linux。
flink-dist_2.11-1.13.6.jar
这个jar包是最让人费解的,它需要被正确的放在"/opt/installs/dlink0.7.3/plugins/flink-dist_2.11-1.13.6.jar"
Dlink的目录下,但是却不能放在Flink的目录下,Flink的lib目录里需要放flink-dist_2.12-1.13.6.jar
版本的。
另外flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar
和flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
会有冲突,但是我也见到了没有出现冲突的成功实验。我的Flink版本是1.13.6,Hadoop版本是3.4.6。
最后还需要下载HBase的连接依赖,JSON的依赖等。
以下是实例各版本参考,请注意目录: