Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >debezium采集MySQL CDC指南

debezium采集MySQL CDC指南

作者头像
从大数据到人工智能
发布于 2023-10-18 06:48:39
发布于 2023-10-18 06:48:39
90500
代码可运行
举报
文章被收录于专栏:大数据-BigData大数据-BigData
运行总次数:0
代码可运行

Debezium 是一个开源的分布式平台,用于捕获数据库的变更数据(Change Data Capture,CDC)。它支持多种数据库,包括 MySQL。下面我们详细说一下如何进行配置。

MySQL配置

创建用户

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE USER 'debezium_user'@'localhost' IDENTIFIED BY 'Pass-123-debezium_user';

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium_user' IDENTIFIED BY 'Pass-123-debezium_user';

flush privileges;

开启binlog

检查binlog是否开启

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// for MySql 5.x
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM information_schema.global_variables WHERE variable_name='log_bin';
// for MySql 8.x
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM performance_schema.global_variables WHERE variable_name='log_bin';

在执行上述命令时如果出现如下报错:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
ERROR 3167 (HY000): The 'INFORMATION_SCHEMA.GLOBAL_VARIABLES' feature is disabled; see the documentation for 'show_compatibility_56'

请先修改数据库配置,将show_compatibility_56设置为ON

设置完上述配置后,再次执行检查binlog是否开启的SQL,如果为 OFF,请使用以下属性配置 MySQL 服务器配置文件,如下表所述:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
server-id         = 223344 # Querying variable is called server_id, e.g. SELECT variable_value FROM information_schema.global_variables WHERE variable_name='server_id';
log_bin           = mysql-bin
binlog_format     = ROW
binlog_row_image  = FULL
expire_logs_days  = 10

重启MySQL之后,通过再次检查 binlog 状态来确认您的更改:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// for MySql 5.x
mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM information_schema.global_variables WHERE variable_name='log_bin';
// for MySql 8.x
mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM performance_schema.global_variables WHERE variable_name='log_bin';

得到:

开启GTIDs

全局事务标识符 (GTID) 唯一标识集群内服务器上发生的事务。 虽然 Debezium MySQL 连接器不需要,但使用 GTID 可以简化复制,并使您能够更轻松地确认主服务器和副本服务器是否一致。

基本步骤:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
set GLOBAL gtid_mode=OFF_PERMISSIVE;
set GLOBAL gtid_mode=ON_PERMISSIVE;
set GLOBAL gtid_mode=ON;

set GLOBAL enforce_gtid_consistency=ON;

查看修改:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
show global variables like '%GTID%';

得到:

设置Session超时时间

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
set interactive_timeout=60;
set wait_timeout=60;

开启query log events

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
set binlog_rows_query_log_events=ON;

查看当前变量值:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
show global variables where variable_name = 'binlog_row_value_options';

开始部署

在开始部署之前,确定你已经安装了kafka,并且配置了Debezium MySQL connector的kafka connect已经启动。

kafka安装可参考:

下面说一下kafka connect配置问题。

首先下载kafka二进制包,例如下属例子中,将其下载到/data/app目录下。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
cd /data/app && wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.12-3.3.1.tgz
tar zxvf kafka_2.12-3.3.1.tgz
ln -s kafka_2.12-3.3.1 kafka

下载MySQL connector plug-in.

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 新建plugin目录
cd kafka && mkdir plugins
cd plugins && wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.7.Final/debezium-connector-mysql-1.9.7.Final-plugin.tar.gz
tar zxvf debezium-connector-mysql-1.9.7.Final-plugin.tar.gz

修改配置,设置kafka plugin目录

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
vim /data/app/kafka/config/connect-distributed.properties

# 设置

plugin.path=/data/app/kafka/plugins

接下来便可以启动kafka connect

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
bin/connect-distributed.sh config/connect-distributed.properties 

kafka connect默认启动的端口为8083

创建MySQL同步任务

在mysql中新建products 表

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
create database if not exists inventory;
CREATE TABLE IF NOT EXISTS inventory.products (
 id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
 name VARCHAR(255) NOT NULL,
 description VARCHAR(512),
 weight FLOAT
);

插入数据:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
insert inventory.products values(1, 'tom', 'tall', 1.8);

创建同步任务

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
{
    "name": "inventory-connector", 
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
        "database.hostname": "192.168.74.129", 
        "database.port": "3306", 
        "database.user": "debezium_user", 
        "database.password": "Pass-123-debezium_user", 
        "database.server.id": "223344", 
        "database.server.name": "fullfillment", 
        "database.include.list": "inventory", 
        "database.history.kafka.bootstrap.servers": "kafka:30092", 
        "database.history.kafka.topic": "dbhistory.fullfillment", 
        "include.schema.changes": "true" 
    }
}

可以看到kafka connect控制台输出:

kafka中查看数据

相关DDL

0 0 投票数

文章评分

本文为从大数据人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://cloud.tencent.com/developer/article/2345732

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-07-12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
编辑精选文章
换一批
Debezium使用指南
实时数仓的第一步便是变更数据捕获(CDC),Debezium就是一款功能非常强大的CDC工具。Debezium是构建于Kafka之上的,将捕获的数据实时的采集到Kafka上
姜同学
2022/10/27
3.8K2
Debezium使用指南
Debezium的基本使用(以MySQL为例)
简单理解就是Debezium可以捕获数据库中所有行级的数据变化并包装成事件流顺序输出。
GreatSQL社区
2023/02/23
3.6K0
关于 MySQL GTID 复制
MySQL5.7以后都基本用GTID方式复制了,相对于binlog和position号方式,在failover时候减少很多人工切换操作
星哥玩云
2022/08/18
4620
MGR搭建以及性能测试
昨天的文章中简单介绍了MySQL的组复制的概念,今天搭建了一套单机多实例单主节点的MGR环境来进行性能测试。
AsiaYe
2019/11/06
1.1K0
MySQL 8 复制(四)——GTID与复制
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wzy0623/article/details/91047395
用户1148526
2019/07/02
4.2K0
Mysql5.7主从复制配置全过程
mysql主从复制是最常见的高可用方式,通过主-从的方式,实现系统的高可用。在生产环境种,通常采用一主多从的方式,通过主库写数据,从库读数据,来提升系统的性能。 现在就演示一下,mysql5.7之下,如何配置主从复制。
冬天里的懒猫
2021/09/26
4.2K0
Flink + Debezium CDC 实现原理及代码实战
Debezium 是一个分布式平台,它将现有的数据库转换为事件流,应用程序消费事件流,就可以知道数据库中的每一个行级更改,并立即做出响应。
kk大数据
2020/12/29
8.4K0
Flink + Debezium CDC 实现原理及代码实战
MySQL主从复制之增强半同步(无损复制)、延迟复制和并行复制
MySQL有四种同步方式: 1、异步复制(Async Replication) 2、同步复制(sync Replication) 3、半同步复制(Async Replication) 4、增强半同步复制(lossless Semi-Sync Replication)、无损复制
AiDBA宝典
2023/04/26
7.8K0
MySQL主从复制之增强半同步(无损复制)、延迟复制和并行复制
MySQL传统点位复制在线转为GTID模式复制
MySQL传统点位复制在5.7版本前是主要的主从复制模式,而随着MySQL5.6版本引入GTID,并且MySQL5.7进行各方面的优化以后,在mySQL5.7(尤其是MySQL5.7.6)版本后GTID模式的主从复制方式成为一个新的选择方式。要使用GTID模式,首先也需知其优缺点,其主要的优缺点如下:
俊才
2019/11/18
2K0
MySQL传统点位复制在线转为GTID模式复制
mysql-MGR集群搭建
MGR是MySQL数据库未来发展的一个重要方向。 MGR基础结构要求: 引擎必须为innodb,因为需事务支持在commit时对各节点进行冲突检查 每个表必须有主键,在进行事务冲突检测时需要利用主键值对比 必须开启binlog且为row格式 开启GTID,且主从状态信息存于表中(--master-info-repository=TABLE 、--relay-log-info-repository=TABLE),--log-slave-updates打开 一致性检测设置--transaction-write-set-extraction=XXHASH64 MGR使用限制: RP和普通复制binlog校验不能共存,需设置--binlog-checksum=none 不支持gap lock(间隙锁),隔离级别需设置为read_committed 不支持对表进行锁操作(lock /unlock table),不会发送到其他节点执行 ,影响需要对表进行加锁操作的情况,列入mysqldump全表备份恢复操作 不支持serializable(序列化)隔离级别 DDL语句不支持原子性,不能检测冲突,执行后需自行校验是否一致 不支持外键:多主不支持,单主模式不存在此问题 最多支持9个节点:超过9台server无法加入组
章工运维
2023/08/24
4590
mysql-MGR集群搭建
MySQL基于GTID主从复制的杂谈
先来回顾一下MySQL的二进制知识点。基于Row格式的日志可以避免MySQL主从复制中出现的主从不一致问题。在一个sql语句修改了1000条数据的情况下,基于段的日志格式只会记录这个sql语句。而基于row的日志格式会有1000条记录来记录每一行的数据修改。
用户2032165
2018/12/10
1.6K0
MySQL基于GTID主从复制的杂谈
MySQL MGR集群单主模式的自动搭建和自动化故障修复
随着MySQL MGR的版本的升级以及技术成熟,在把MHA拉下神坛之后, MGR越来越成为MySQL高可用的首选方案。 MGR的搭建并不算很复杂,但是有一系列手工操作步骤,为了简便MGR的搭建和故障诊断,这里完成了一个自动化的脚本,来实现MGR的自动化搭建,自动化故障诊断以及修复。
星哥玩云
2022/08/18
8730
MySQL MGR集群单主模式的自动搭建和自动化故障修复
Debezium结合kafka connect实时捕获mysql变更事件写入elasticsearch实现搜索流程
本文将会实现一套完整的Debezium结合Kafka Connect实时捕获MySQL变更事件写入Elasticsearch并实现查询的流程.
XING辋
2019/03/26
7.5K4
Debezium结合kafka connect实时捕获mysql变更事件写入elasticsearch实现搜索流程
MySQL高可用方案MGR+consul组合测试
今天来简单理一下MGR和consul的组合方案,前期的准备和步骤还是比较多的,晚上完成了基础的调试,来来回回切换了好多次,还算有点意思。
jeanron100
2018/10/24
2.2K0
Mysql实时数据变更事件捕获kafka confluent之debezium
如果你的后端应用数据存储使用的MySQL,项目中如果有这样的业务场景你会怎么做呢?
XING辋
2019/03/26
3.6K0
Mysql实时数据变更事件捕获kafka confluent之debezium
关于mysql集群主从服务器搭建
在高并发流量下,数据库往往是服务端的瓶颈,由于数据库数据需要确保落地,同时保证数据同步,数据即时性,有效性的问题,导致数据库不能像平常后端程序一样负载均衡.  
仙士可
2020/04/16
1.4K0
MySQL基于GTID主从复制之半同步复制
1.从库的IO线程向主库的主进程发送请求,主库验证从库,交给主库IO线程负责数据传输; 2.主库IO线程对比从库发送过来的master.info里的信息,将binlog文件信息,偏移量和binlog文件名等发送给从库 3.从库接收到信息后,将binlog信息保存到relay-bin中,同时更新master.info的偏移量和binlog文件名 4.从库的SQL线程不断的读取relay-bin的信息,同时将读到的偏移量和文件名写道relay-log.info文件,binlog信息写进自己的数据库,一次同步操作完成。 5.完成上次同步后,从库IO线程不断的向主库IO线程要binlog信息 6.从库如果也要做主库,也要打开log_bin 和log-slave-update参数
星哥玩云
2022/08/16
8570
MySQL基于GTID主从复制之半同步复制
mysql主从搭建
# 1.原理 master的I/O线程将数据写入binlog中; slave的I/O线程从master的binlog中读取数据,写入自己的Relay_Log_File日志中; slave的SQL线程从Relay_Log_File日志中解析sql,完成数据的复制。 # 2.应用场景 从服务器作为主服务器的实时数据备份 主从服务器实现读写分离(主写从读),从服务器实现负载均衡 把多个从服务器根据业务重要性进行拆分访问(从服务器根据业务进行拆分) # 3.master主库配置 修改my.cnf [root@loc
summerking
2022/10/27
3.2K0
kafka 连接器实现 Mysql 数据同步 Elasticsearch
Mysql 作为传统的关系型数据库,主要面向 OLTP,性能优异,支持事务,但是在一些全文检索,复杂查询上面并不快。Elasticsearch 底层基于 Lucense 实现,天然分布式,采用倒排索引存储数据,全文检索效率很高,使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。
Se7en258
2021/05/18
2.8K0
kafka 连接器实现 Mysql 数据同步 Elasticsearch
MySQL中涉及安全性的SQL语句
您的MySQL安全吗?数据是最有价值的资产,数据安全已经成为重中之重。本篇将介绍如何使用SQL语句确保MySQL的安全性。
MySQLSE
2024/07/18
1872
MySQL中涉及安全性的SQL语句
相关推荐
Debezium使用指南
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验