本文介绍了结合 MySQL 数据库、流计算 Oceanus、HBase 以及腾讯云数据仓库 TCHouse-C 来构建实时数仓,并通过流计算 Oceanus 读取 MySQL 数据、关联 HBase 中的维表,最终将数据存入腾讯云数据仓库 TCHouse-C 进行指标分析,实现完整实时数仓的全流程操作指导。
环境搭建
创建 Oceanus 集群
说明
若未使用过 VPC、日志、存储这些组件,需要先进行创建。
Oceanus 集群需要和下面的 MySQL、EMR 集群使用同一个 VPC,否则需要手动打通(例如对等连接)。
创建 VPC 私有网络
私有网络是一块您在腾讯云上自定义的逻辑隔离网络空间,在构建 MySQL、EMR、ClickHouse 集群等服务时选择的网络必须保持一致,网络才能互通,否则需要使用对等连接、VPN 等方式打通网络。
创建云数据库 MySQL 服务
云数据库 MySQL(TencentDB for MySQL)是腾讯云基于开源数据库 MySQL 专业打造的高性能分布式数据存储服务,让用户能够在云中更轻松地设置、操作和扩展关系数据库。
新建 MySQL 服务时,网络需要选择之前创建的。
创建完 MySQL 服务后,需要修改 binlog 参数,如图修改为 FULL(默认值为 MINIMAL)。
修改完参数后,登录 MySQL 创建示例所需要的数据库和数据库表。创建数据库 mysqltestdb
登录 MySQL 创建示例所需要的数据库。
打开 SQL 窗口或者单击可视化页面创建数据库及表。数据准备
-- 创建数据库CREATE DATABASE mysqltestdb;-- 在新建的数据库上新建表 studentCREATE TABLE `student` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',`name` varchar(10) COLLATE utf8mb4_bin DEFAULT '' COMMENT '名字',`age` int(11) DEFAULT NULL COMMENT '年龄',`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间',PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=COMPACT COMMENT='学生表'-- student 表中插入数据INSERT INTO mysqltestdb.student(id,name,age) VALUES(1,"xiaomin",20);
创建 EMR 集群
弹性 MapReduce 是云端托管的弹性开源泛 Hadoop 服务,支持 Spark、HBase、Presto、Flink、Druid 等大数据框架,本次示例主要需要使用 HBase 组件。
1. 登录 弹性 MapReduce 控制台,选择集群 > 创建集群,开始新建集群,具体可参见 创建 EMR 集群。新建集群时,需选择安装 HBase 组件。
1. 如果是生产环境,服务器配置可根据实际情况选择。网络需要选择之前创建好的 VPC 网络,始终保持服务组件在同一 VPC 下。
2. 在集群列表中,单击新建的集群 ID/名称,进入集群详情页。选择集群资源 > 资源管理,即进入 HBase 的 Master 节点。
3. 进入 云服务器控制台,搜索 EMR 实例 ID,然后单击登录进入服务器。
4. 创建 HBase 表。
# 进入HBase命令root@172~# hbase shell
进入 hbase shell,并新建表:
-- 建表语句create 'dim_hbase', 'cf'-- 插入数据put 'dim_hbase','1','cf:school_name','MingDeSchool'
创建腾讯云数据仓库 TCHouse-C
新建集群
登录 ClickHouse
在之前新建的 EMR 下选择一台云服务器单击登录,最好选择带有外网 IP 的节点。
安装 ClickHouse 客户端
登录客户端
登录客户端示例如下:
-- 登录客户端示例如下,请将 xx.xx.xx.xx 替换为用户 CK 的 IP 地址clickhouse-client -hxx.xx.xx.xx --port 9000 -user xx -password xx-- 新建数据库:CREATE DATABASE testdb ON cluster default_cluster;-- 新建表:CREATE TABLE testdb.student_school ON cluster default_cluster (`stu_id` Int32,`stu_name` Nullable(String),`school_name` Nullable(String),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/student_school', '{replica}', Sign) ORDER BY stu_id;
数据清洗和运算加工
数据准备
按照上面的操作创建表,并向 MySQL 和 HBase 表中插入数据。
创建 Flink SQL 作业
在流计算 Oceanus 控制台创建 SQL 作业。
Source 端
MySQL-CDC Source:
--学生信息作为cdc源表CREATE TABLE `student` (`id` INT NOT NULL,`name` VARCHAR,`age` INT,proc_time AS PROCTIME(),PRIMARY KEY (`id`) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'YoursIp','port' = '3306','username' = '用户名','password' = 'YoursPassword','database-name' = 'mysqltestdb','table-name' = 'student');
HBase 维表:
--示例使用school学校信息作为维表CREATE TABLE dim_hbase (rowkey STRING,cf ROW <school_name STRING>, -- 如果有多个列簇,写法 cf Row<age INT,name String>PRIMARY KEY (rowkey) NOT ENFORCED) WITH ('connector' = 'hbase-1.4','table-name' = 'dim_hbase','zookeeper.quorum' = '用户自己的hbase服务器zookeeper地址,多个用逗号隔开');
Sink 端
创建到 ClickHouse 的创建表语句。
--关联后存入clickhouse表CREATE TABLE `student_school` (stu_id INT,stu_name STRING,school_name STRING,PRIMARY KEY (`stu_id`) NOT ENFORCED) WITH ('connector' = 'clickhouse','url' = 'clickhouse://yourIP:8123',-- 如果ClickHouse集群未配置账号密码可以不指定--'username' = 'root',--'password' = 'root','database-name' = 'testdb','table-name' = 'student_school','table.collapsing.field' = 'Sign');
进行逻辑运算
INSERT INTOstudent_schoolSELECTstudent.id AS stu_id,student.name AS stu_name,dim_hbase.cf.school_nameFROMstudentJOIN dim_hbase for SYSTEM_TIME AS OF student.proc_timeON CAST(student.id AS STRING) = dim_hbase.rowkey;
结果验证
在 ClickHouse 数据库中查询数据是否正确。
SELECT * FROM testdb.student_school;