前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用tunnel同步PG数据到kafka

使用tunnel同步PG数据到kafka

作者头像
保持热爱奔赴山海
发布2019-10-26 16:23:21
1.7K2
发布2019-10-26 16:23:21
举报
文章被收录于专栏:数据库相关

tunnel同步PG数据到kafka

来自哈罗单车开源的组件。支持同步PG数据到kafka或者ES。

https://github.com/hellobike/tunnel

tunnel整体的部署比较简单的

需要事先部署好zk和kafka(我下面演示的是单节点的zk和kafka)

节点部署关系:

192.168.2.4   部署zk、kafka、pg10运行在1921端口

192.168.2.189 部署tunnel

确保已开启PG的逻辑复制

代码语言:javascript
复制
wal_level = 'logical';

max_replication_slots = 20

注意这个设置要重启PG进程的

然后,创建测试库表和同步用的账号

代码语言:javascript
复制
CREATE DATABASE test_database;

\c test_database

create table test_1 (id int primary key , name char(40));

create table test_2 (id int primary key , name char(40));

 

CREATE ROLE test_rep LOGIN ENCRYPTED PASSWORD 'xxxx' REPLICATION;

GRANT CONNECT ON DATABASE test_database to test_rep;

vim pg_hba.conf 增加2行配置:

代码语言:javascript
复制
host    all                   test_rep        192.168.2.0/24         md5

host    replication     test_rep        192.168.2.0/24         md5

然后 reload 下PG

到192.168.2.189 机器上去 编译tunnel:

注意: tunnel的启动需要事先安装好oracle jdk 1.8

代码语言:javascript
复制
git clone https://github.com/hellobike/tunnel

cd tunnel

mvn clean package -Dmaven.test.skip=true

cd target

unzip AppTunnelService.zip

cd AppTunnelService

vim conf/test.yml 内容如下:

代码语言:javascript
复制
tunnel_subscribe_config:

  pg_dump_path: '/usr/local/pgsql-10.10/bin/pg_dump'

  subscribes:

  - slotName: slot_for_test

    pgConnConf:

      host: 192.168.2.4

      port: 1921

      database: test_database

      user: test_rep

      password: xxxx

    rules:

    - {table: test_1, pks: ['id'], topic: test_1_logs}

    - {table: test_2, pks: ['id'], topic: test_2_logs}

    kafkaConf:

      addrs:

      - 192.168.2.4:9092

tunnel_zookeeper_address: 192.168.2.4:2181

前台启动:

代码语言:javascript
复制
java -server -classpath conf/*:lib/* com.hellobike.base.tunnel.TunnelLauncher -u false -c cfg.properties -p 7788     # 暴露prometheus metric在7788端口(配置监控不是这里的重点,也很简单,暂时先跳过)

然后,我们再在PG10上面的test_database的2张表随便造些数据,然后可以看到kafka里面已经有数据了(下图是通过kafkamanager和 kafka-eagle的结果)。

格式化下,数据就是这样的:

UPDATE的记录的样子:

代码语言:javascript
复制
{

         "dataList": [{

                   "dataType": "integer",

                   "name": "id",

                   "value": "1111"

         }, {

                   "dataType": "character",

                   "name": "name",

                   "value": "大狗蛋 "

         }],

         "eventType": "UPDATE",

         "lsn": 10503246616,

         "schema": "public",

         "table": "test_1"

}

DELETE的记录的样子:

代码语言:javascript
复制
{

         "dataList": [{

                   "dataType": "integer",

                   "name": "id",

                   "value": "3"

         }],

         "eventType": "DELETE",

         "lsn": 10503247064,

         "schema": "public",

         "table": "test_1"

}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/10/24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • tunnel同步PG数据到kafka
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档