前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 实践教程:进阶1-维表关联

Flink 实践教程:进阶1-维表关联

原创
作者头像
吴云涛
修改2021-12-08 15:59:04
1K0
修改2021-12-08 15:59:04
举报
文章被收录于专栏:腾讯云流计算 Oceanus

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将您详细介绍如何提取 MySQL 数据与 HBase 数据进行维表关联(流维 join),经过简单聚合分析后存入 Elasticsearch 中。

Flink 实践教程:进阶1-维表关联

前置准备

创建流计算 Oceanus 集群

在流计算 Oceanus 产品活动页面 1 元购买 Oceanus 集群

进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建 MySQL 实例

进入 MySQL 控制台 [3],点击【新建】。具体可参考官方文档 创建 MySQL 实例 [4]。

数据准备:

进入实例数据库,创建 oceanus_advanced1_student_grade 表,并手动插入数据。

代码语言:sql
复制
-- 建表语句
CREATE TABLE `oceanus_advanced1_student_grade` (
  `name`    varchar(50) NOT NULL DEFAULT '',
  `grade`   int(3)               DEFAULT NULL,
  PRIMARY KEY (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
-- 数据插入
INSERT INTO `oceanus_advanced1_student_grade` (`name`, `grade`) VALUES ('Oceanus-1', 85);
INSERT INTO `oceanus_advanced1_student_grade` (`name`, `grade`) VALUES ('Oceanus-2', 95)

创建 EMR 集群

登录 弹性 MapReduce 控制台 [5],选择【集群列表】>【新建集群】,开始新建集群,具体可参考 创建 EMR 集群 [6]。新建集群时,需选择安装 HBase 组件。

数据准备:

登录 EMR集群节点,通过 HBase Shell 命令进入 HBase 实例数据库,并新建表,手动插入数据。

代码语言:shell
复制
# 进入 HBase 命令
root@yourhostname~# hbase shell
代码语言:sql
复制
-- 建表语句
create 'oceanus_advanced1_student_info','StuInfo'
-- 数据插入
put 'oceanus_advanced1_student_info','Oceanus-1','StuInfo:Class','01'
put 'oceanus_advanced1_student_info','Oceanus-1','StuInfo:Age','17'
put 'oceanus_advanced1_student_info','Oceanus-2','StuInfo:Class','01'
put 'oceanus_advanced1_student_info','Oceanus-2','StuInfo:Age','20'
put 'oceanus_advanced1_student_info','Oceanus-3','StuInfo:Class','01'
put 'oceanus_advanced1_student_info','Oceanus-3','StuInfo:Age','18'

创建 Elasticsearch 集群

进入 Elasticsearch 控制台 [7],点击左上方【新建】,创建 Elasticsearch 实例,具体操作请访问 创建 Elasticsearch 集群 [8]

流计算 Oceanus 作业

1. 创建 Source
代码语言:sql
复制
CREATE TABLE `mysql_cdc_source_table` (
  `name`        STRING,
  `grade`       STRING,
  `proc_time`   AS PROCTIME(),      -- 这里 proc_time 字段配合下面流维 join 时使用。
  PRIMARY KEY (`name`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
  'connector' = 'mysql-cdc',        -- 固定值 'mysql-cdc'
  'hostname' = '10.0.0.158',        -- 数据库的 IP
  'port' = '3306',                  -- 数据库的访问端口
  'username' = 'root',              -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)
  'password' = 'Tencent123$',       -- 数据库访问的密码
  'database-name' = 'testdb',       -- 需要同步的数据库
  'table-name' = 'oceanus_advanced1_student_grade'   -- 需要同步的数据表名
);
2. 创建 HBase 维表
代码语言:sql
复制
CREATE TABLE hbase_table (
  rowkey      STRING,
  StuInfo     ROW <Class STRING,Age STRING>,
  PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
  'connector' = 'hbase-1.4',                         -- Flink 1.13 支持 hbase-2.2
  'table-name' = 'oceanus_advanced1_student_info',   -- HBase 表名
  'zookeeper.quorum' = '10.0.0.118:2181,10.0.0.119:2181,10.0.0.3:2181'   -- HBase 的 zookeeper 地址
);
3. 创建 Sink
代码语言:sql
复制
CREATE TABLE elasticsearch6_sink_table (
    `class`       STRING,
    `amount`      BIGINT,
    PRIMARY KEY(`class`) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-6',      -- 输出到 Elasticsearch 6
    'username' = 'elastic',               -- 选填 用户名
    'password' = 'Tencent123$',           -- 选填 密码
    'hosts' = 'http://10.0.0.97:9200',    -- Elasticsearch 的连接地址
    'index' = 'oceanus_advanced1',        -- Elasticsearch 的 Index 名
    'document-type' = '_doc',             -- Elasticsearch 的 Document 类型
    'sink.bulk-flush.max-actions' = '1',  -- 每条数据都刷新
    'format' = 'json'                     -- 输出数据格式,目前只支持 'json'
);
4. 编写业务 SQL
代码语言:sql
复制
INSERT INTO elasticsearch6_sink_table
SELECT
b.StuInfo.Class            AS class,
COUNT(a.name)              AS amount
FROM mysql_cdc_source_table AS a
JOIN hbase_table FOR SYSTEM_TIME AS OF a.proc_time AS b
-- 这里一定要加入 for SYSTEM_TIME as of 语句,否则虽然仍然可以执行 JOIN,但是只会全量读取一次数据库,结果可能不符合预期。
ON a.name = b.rowkey
WHERE CAST(a.grade AS INT) >= 90 AND CAST(b.StuInfo.Age AS INT) >= 18
GROUP BY b.StuInfo.Class

总结

流计算 Oceanus 平台当前内置 Connector 支持了 MySQL、PostgreSQL、Hive、HBase、Redis 和 Oracle 维表,无需用户自己开发即可使用。具体如何使用维表可参考 内置维表参考列表 [9] 本例统计的是各个班级年龄大于等于18岁,并且成绩大于等于90分的人数,无实际业务用途。

在创建 Sink 表到 ES 时,Flink 会将上述 CREATE TABLE 语句定义的主键 class 字段当成 _id 生成主键,并据此更新之前的文档(Upsert 流)。如无主键的定义,Flink 会随机生成字符串当成 _id 生成主键,此时为 Append 流写入。

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298

[3] MySQL 控制台:https://console.cloud.tencent.com/cdb

[4] 创建 MySQL 实例:https://cloud.tencent.com/document/product/236/46433

[5] 弹性 MapReduce 控制台:https://console.cloud.tencent.com/emr

[6] 创建 EMR 集群:https://cloud.tencent.com/document/product/589/10981

[7] Elasticsearch 控制台:https://console.cloud.tencent.com/es

[8] 创建 Elasticsearch 集群:https://cloud.tencent.com/document/product/845/19536

[9] 内置维表参考列表: https://cloud.tencent.com/document/product/849/48264

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 流计算 Oceanus 简介
  • 前置准备
    • 创建流计算 Oceanus 集群
      • 创建 MySQL 实例
        • 数据准备:
      • 创建 EMR 集群
        • 数据准备:
      • 创建 Elasticsearch 集群
        • 1. 创建 Source
        • 2. 创建 HBase 维表
        • 3. 创建 Sink
        • 4. 编写业务 SQL
    • 流计算 Oceanus 作业
    • 总结
    • 参考链接
    相关产品与服务
    流计算 Oceanus
    流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档