前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【基于Flink的城市交通实时监控平台】需求四:车辆违法路径跟踪-使用FlinkSQL在Dlink写入HBase

【基于Flink的城市交通实时监控平台】需求四:车辆违法路径跟踪-使用FlinkSQL在Dlink写入HBase

作者头像
火之高兴
发布2024-07-25 15:45:08
1140
发布2024-07-25 15:45:08
举报
文章被收录于专栏:大数据应用技术

案例需求分析

通过Kafka发送模拟实时车辆JSON信息给Flink接收,FlinkSQL将实时车辆JSON信息与t_violation_list表中已经捕捉到的违规违章车辆做连接对比,选择出通过当前路段的违章记录车辆,将其存入HBase中。

本次需求四案例,将基于t_violation_list表中数据:

代码语言:javascript
复制
CREATE TABLE `t_violation_list` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `car` varchar(255) NOT NULL,
  `violation` varchar(1000) DEFAULT NULL,
  `create_time` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8
代码语言:javascript
复制
INSERT INTO `t_violation_list` (`car`, `violation`, `create_time`) VALUES
('豫A99999', '嫌疑套牌车', 1686690777),
('豫DF09991', '排水道过弯', 1686609999);

当前模拟了两辆车在t_violation_list表中,稍后启动Kafka Topic-car时,只有在该表中的车,会被记录到HBase。

代码语言:javascript
复制
mysql> select * from t_violation_list;
+----+------------+-----------------+-------------+
| id | car        | violation       | create_time |
+----+------------+-----------------+-------------+
|  2 | 豫A99999   | 嫌疑套牌车      |  1686690777 |
|  3 | 豫DF09991  | 排水道过弯      |  1686609999 |
+----+------------+-----------------+-------------+
2 rows in set (0.00 sec)

需求代码

代码语言:javascript
复制
CREATE TABLE table1 ( 
  `actionTime` BIGINT,
  `monitorId`  STRING,
  `cameraId`   STRING,
  `car`        STRING,
  `speed`      double,
  `roadId`     STRING,
  `areaId`     STRING,
   proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'topic-car',
  'properties.bootstrap.servers' = 'hadoop10:9092',
  'properties.group.id' = 'c1',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);
CREATE TABLE table2 ( 
  id    	  		INT, 
  car         		STRING, 
  violation   		STRING, 
  create_time       BIGINT
) WITH ( 
   'connector' = 'jdbc', 
   'url' = 'jdbc:mysql://hadoop10:3306/yangyulin?useSSL=false&useUnicode=true&characterEncoding=utf8', 
   'table-name' = 't_violation_list', 
   'username' = 'root', 
   'password' = '0000' 
);
create table table3(
   rowkey STRING,
   cf1 ROW<monitorId String,cameraId String,car String,roadId String,areaId String,speed Double,actionTime BIGINT>,
   PRIMARY KEY (rowkey) NOT ENFORCED
)with(
  'connector' = 'hbase-2.2',
 'table-name' = 't_track_info',
 'zookeeper.quorum' = 'hadoop10:2181'
);
insert into table3
select concat(car,'_',cast(actionTime as string)),ROW(monitorId,cameraId,car,roadId,areaId,speed,actionTime)
from(
	select t1.monitorId,t1.cameraId,t1.car,t1.roadId,t1.areaId,t1.speed,t1.actionTime
	from table1 t1 
	inner join table2 for system_time as of t1.proctime as t2
	on t1.car = t2.car
)t3

测试数据

从Topic-car中发送JSON信息给Flink

在这里插入图片描述
在这里插入图片描述
代码语言:javascript
复制
[root@hadoop10 kafka0.11]# kafka-console-producer.sh --broker-list   hadoop10:9092 --topic topic-car                           >{"actionTime":1686647525,"monitorId":"0001","cameraId":"1","car":"豫A99999","speed":100,"roadId":"01","areaId":"20"}
>{"actionTime":1686647525,"monitorId":"0001","cameraId":"1","car":"豫DF09991","speed":100,"roadId":"01","areaId":"20"}
>{"actionTime":1686647526,"monitorId":"0001","cameraId":"1","car":"LZ21740","speed":100,"roadId":"01","areaId":"20"}

HBase表中操作

包括了创建t_track_info','cf1表和两次查询表中内容,注意查询后row的数量,即为成功插入的违法车辆追踪信息

代码语言:javascript
复制
hbase:002:0> create 't_track_info','cf1'
Created table t_track_info
Took 0.8506 seconds
=> Hbase::Table - t_track_info
hbase:003:0> scan 't_track_info'
ROW                      COLUMN+CELL
0 row(s)
Took 0.0376 seconds
hbase:004:0> scan 't_track_info'
ROW                              COLUMN+CELL
 \xE8\xB1\xABA99999_1686647525   column=cf1:actionTime, timestamp=2023-06-16T00:06:36.355, value=\x00\x00\x00\x00d\x882\xE5
 \xE8\xB1\xABA99999_1686647525   column=cf1:areaId, timestamp=2023-06-16T00:06:36.355, value=20
 \xE8\xB1\xABA99999_1686647525   column=cf1:cameraId, timestamp=2023-06-16T00:06:36.355, value=1
 \xE8\xB1\xABA99999_1686647525   column=cf1:car, timestamp=2023-06-16T00:06:36.355, value=\xE8\xB1\xABA99999
 \xE8\xB1\xABA99999_1686647525   column=cf1:monitorId, timestamp=2023-06-16T00:06:36.355, value=0001
 \xE8\xB1\xABA99999_1686647525   column=cf1:roadId, timestamp=2023-06-16T00:06:36.355, value=01
 \xE8\xB1\xABA99999_1686647525   column=cf1:speed, timestamp=2023-06-16T00:06:36.355, value=@Y\x00\x00\x00\x00\x00\x00
1 row(s)
Took 0.0363 seconds
hbase:005:0> scan 't_track_info'
ROW                              COLUMN+CELL
 \xE8\xB1\xABA99999_1686647525   column=cf1:actionTime, timestamp=2023-06-16T00:06:36.355, value=\x00\x00\x00\x00d\x882\xE5
 \xE8\xB1\xABA99999_1686647525   column=cf1:areaId, timestamp=2023-06-16T00:06:36.355, value=20
 \xE8\xB1\xABA99999_1686647525   column=cf1:cameraId, timestamp=2023-06-16T00:06:36.355, value=1
 \xE8\xB1\xABA99999_1686647525   column=cf1:car, timestamp=2023-06-16T00:06:36.355, value=\xE8\xB1\xABA99999
 \xE8\xB1\xABA99999_1686647525   column=cf1:monitorId, timestamp=2023-06-16T00:06:36.355, value=0001
 \xE8\xB1\xABA99999_1686647525   column=cf1:roadId, timestamp=2023-06-16T00:06:36.355, value=01
 \xE8\xB1\xABA99999_1686647525   column=cf1:speed, timestamp=2023-06-16T00:06:36.355, value=@Y\x00\x00\x00\x00\x00\x00
 \xE8\xB1\xABDF09991_1686647525  column=cf1:actionTime, timestamp=2023-06-16T00:07:43.398, value=\x00\x00\x00\x00d\x882\xE5
 \xE8\xB1\xABDF09991_1686647525  column=cf1:areaId, timestamp=2023-06-16T00:07:43.398, value=20
 \xE8\xB1\xABDF09991_1686647525  column=cf1:cameraId, timestamp=2023-06-16T00:07:43.398, value=1
 \xE8\xB1\xABDF09991_1686647525  column=cf1:car, timestamp=2023-06-16T00:07:43.398, value=\xE8\xB1\xABDF09991
 \xE8\xB1\xABDF09991_1686647525  column=cf1:monitorId, timestamp=2023-06-16T00:07:43.398, value=0001
 \xE8\xB1\xABDF09991_1686647525  column=cf1:roadId, timestamp=2023-06-16T00:07:43.398, value=01
 \xE8\xB1\xABDF09991_1686647525  column=cf1:speed, timestamp=2023-06-16T00:07:43.398, value=@Y\x00\x00\x00\x00\x00\x00
2 row(s)
Took 0.0394 seconds
hbase:006:0> scan 't_track_info'
ROW                              COLUMN+CELL
 \xE8\xB1\xABA99999_1686647525   column=cf1:actionTime, timestamp=2023-06-16T00:06:36.355, value=\x00\x00\x00\x00d\x882\xE5
 \xE8\xB1\xABA99999_1686647525   column=cf1:areaId, timestamp=2023-06-16T00:06:36.355, value=20
 \xE8\xB1\xABA99999_1686647525   column=cf1:cameraId, timestamp=2023-06-16T00:06:36.355, value=1
 \xE8\xB1\xABA99999_1686647525   column=cf1:car, timestamp=2023-06-16T00:06:36.355, value=\xE8\xB1\xABA99999
 \xE8\xB1\xABA99999_1686647525   column=cf1:monitorId, timestamp=2023-06-16T00:06:36.355, value=0001
 \xE8\xB1\xABA99999_1686647525   column=cf1:roadId, timestamp=2023-06-16T00:06:36.355, value=01
 \xE8\xB1\xABA99999_1686647525   column=cf1:speed, timestamp=2023-06-16T00:06:36.355, value=@Y\x00\x00\x00\x00\x00\x00
 \xE8\xB1\xABDF09991_1686647525  column=cf1:actionTime, timestamp=2023-06-16T00:07:43.398, value=\x00\x00\x00\x00d\x882\xE5
 \xE8\xB1\xABDF09991_1686647525  column=cf1:areaId, timestamp=2023-06-16T00:07:43.398, value=20
 \xE8\xB1\xABDF09991_1686647525  column=cf1:cameraId, timestamp=2023-06-16T00:07:43.398, value=1
 \xE8\xB1\xABDF09991_1686647525  column=cf1:car, timestamp=2023-06-16T00:07:43.398, value=\xE8\xB1\xABDF09991
 \xE8\xB1\xABDF09991_1686647525  column=cf1:monitorId, timestamp=2023-06-16T00:07:43.398, value=0001
 \xE8\xB1\xABDF09991_1686647525  column=cf1:roadId, timestamp=2023-06-16T00:07:43.398, value=01
 \xE8\xB1\xABDF09991_1686647525  column=cf1:speed, timestamp=2023-06-16T00:07:43.398, value=@Y\x00\x00\x00\x00\x00\x00
2 row(s)
Took 0.0459 seconds

FlinkSQL Web工具Dlink的安装使用

在这里插入图片描述
在这里插入图片描述

本次需求使用了FlinkSQL的网页可视化工具Dlink,在安装Dlink过程中遇到了很多坑,官方文档和官方二进制文件似乎不太对版,Dinky和Dlink的名词使用有模糊歧义,出现了很多异常和错误,尤其是要注意Jar包的正确导入。除此之外,按照Dlink官方网站给的文档进行安装基本是傻瓜式的复制命令敲入linux。

flink-dist_2.11-1.13.6.jar这个jar包是最让人费解的,它需要被正确的放在"/opt/installs/dlink0.7.3/plugins/flink-dist_2.11-1.13.6.jar"Dlink的目录下,但是却不能放在Flink的目录下,Flink的lib目录里需要放flink-dist_2.12-1.13.6.jar版本的。

另外flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jarflink-shaded-hadoop-2-uber-2.7.5-10.0.jar会有冲突,但是我也见到了没有出现冲突的成功实验。我的Flink版本是1.13.6,Hadoop版本是3.4.6。

最后还需要下载HBase的连接依赖,JSON的依赖等。

以下是实例各版本参考,请注意目录:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-06-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 案例需求分析
  • 需求代码
  • 测试数据
  • HBase表中操作
  • FlinkSQL Web工具Dlink的安装使用
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档