Maxwell是一个读取MySQL binlog并将行更新作为JSON写入Kafka,Kinesis或其他流平台的应用程序。
tar
curl -sLo - https://github.com/zendesk/maxwell/releases/download/v1.20.0/maxwell-1.20.0.tar.gz \
| tar zxvf -
cd maxwell-1.20.0
Docker
docker pull zendesk/maxwell
macOS homebrew
brew install maxwell
vim /etc/mysql/my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
mysql> CREATE DATABASE maxwell;
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
mysql> flush privileges;
配置完成之后,重启MySQL
maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=stdout
创建测试数据库和表进行测试
create database test;
use test;
create table maxwell(id int,daemon varchar(100));
insert into `test`.`maxwell` set id = 1, daemon = 'Stanislaw Lem';
update test.maxwell set daemon = 'firebus! firebus!' where id = 1;
控制台输出
{
"database": "test",
"table": "maxwell",
"type": "insert",
"ts": 1551862585,
"xid": 381,
"commit": true,
"data": {
"id": 1,
"daemon": "Stanislaw Lem"
}
}
{
"database": "test",
"table": "maxwell",
"type": "update",
"ts": 1551862800,
"xid": 450,
"commit": true,
"data": {
"id": 1,
"daemon": "firebus! firebus!"
},
"old": {
"daemon": "Stanislaw Lem"
}
}
启动Zookeeper
zkServer.sh start
启动kafka server
kafka-server-start.sh /opt/kafka/config/server-1.properties
启动Maxwell
maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell
启动kafka-console-consumer消费消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic maxwell
创建测试数据进行测试
insert into `test`.`maxwell` set id = 1, daemon = 'Stanislaw Lem';
update test.maxwell set daemon = 'firebus! firebus!!' where id = 1;
参考文献:
Maxwell官网quickstart:http://maxwells-daemon.io/quickstart/
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。