前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用ogg实现oracle到kafka的增量数据实时同步

使用ogg实现oracle到kafka的增量数据实时同步

作者头像
栗筝i
发布2022-12-01 19:47:42
1.4K0
发布2022-12-01 19:47:42
举报
文章被收录于专栏:迁移内容

Oracle Golden Gate软件是一种基于日志的结构化数据复制备份软件,它通过解析源数据库在线日志或归档日志获得数据的增量变化,再将这些变化应用到目标数据库,从而实现源数据库与目标数据库同步。

0、本篇中源端和目标端的一些配置信息:

-

版本

OGG版本

id地址

源端

Oracle11gR2

Oracle GoldenGate 11.2.1.0.1 for Oracle on Linux x86-64

Carlota3

目标端

kafka_2.12-2.5.0

Linux x86-64上的Oracle GoldenGate for Big Data 19.1.0.0.1

Carlota2

源端和目标端的文件不一样,目标端需要下载Oracle GoldenGate for Big Data,源端需要下载Oracle GoldenGate for Oracle!

PS:源端是安装好了Oracle的机器,目标端是安装好了Kafka的机器,二者环境变量之前都配置好了。

1、源端OGG安装

先建立ogg目录

代码语言:javascript
复制
mkdir -p /opt/ogg

解压zip文件

代码语言:javascript
复制
unzip ogg112101_fbo_ggs_Linux_x64_ora11g_64bit.zip

解压后得到一个tar包,再解压这个tar

代码语言:javascript
复制
tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /opt/ogg

使oracle用户有ogg的权限,后面有些需要在oracle用户下执行才能成功

代码语言:javascript
复制
chown -R oracle:oinstall /data/ogg 

配置OGG环境变量

代码语言:javascript
复制
vim /etc/profile

export OGG_HOME=/opt/ogg export LD_LIBRARY_PATH=

ORACLE_HOME/lib:/usr/lib export PATH=

OGG_HOME:$PATH

代码语言:javascript
复制
source /etc/profile

2、目标端OGG安装

先建立ogg目录

代码语言:javascript
复制
mkdir -p /data/ogg

解压zip文件

代码语言:javascript
复制
unzip OGG_BigData_Linux_x64_19.1.0.0.1.zip

解压后得到一个tar包,再解压这个tar

代码语言:javascript
复制
tar xf OGG_BigData_Linux_x64_19.1.0.0.1.tar

使oracle用户有ogg的权限,后面有些需要在oracle用户下执行才能成功

代码语言:javascript
复制
chown -R oracle:oinstall /data/ogg 

配置OGG环境变量

代码语言:javascript
复制
vim /etc/profile

export OGG_HOME=/opt/ogg export LD_LIBRARY_PATH=

JAVA_HOME/jre/lib/amd64:

JAVA_HOME/jre/lib/amd64/server:

JAVA_HOME/jre/lib/amd64/libjsig.so:

JAVA_HOME/jre/lib/amd64/server/libjvm.so:

OGG_HOME/lib export PATH=

OGG_HOME:$PATH

代码语言:javascript
复制
source /etc/profile
代码语言:javascript
复制
ggsci
代码语言:javascript
复制
create subdirs

3、源端Oracle归档模式设置

登陆Oracle用户

代码语言:javascript
复制
su - oracle

登陆Oracle

代码语言:javascript
复制
sqlplus / as sysdba

查看当前是否为归档模式(若为Disabled,则需手动打开)

代码语言:javascript
复制
archive log list 

Database log mode No Archive Mode Automatic archival Disabled Archive destination USE_DB_RECOVERY_FILE_DEST Oldest online log sequence 12 Current log sequence 14

立即关闭数据库

代码语言:javascript
复制
shutdown immediate 

启动实例并加载数据库,但不打开

代码语言:javascript
复制
startup mount 

更改数据库为归档模式

代码语言:javascript
复制
alter database archivelog; 

打开数据库

代码语言:javascript
复制
alter database open;

启用自动归档

代码语言:javascript
复制
alter system archive log start; 

再次查看当前是否为归档模式(看到为Enabled,则成功打开归档模式。)

代码语言:javascript
复制
archive log list 

Database log mode Archive Mode Automatic archival Enabled Archive destination USE_DB_RECOVERY_FILE_DEST Oldest online log sequence 12 Next log sequence to archive 14 Current log sequence 14

查看辅助日志状态(若为NO,则需要通过命令修改)

代码语言:javascript
复制
select force_logging, supplemental_log_data_min from v$database;

FORCE_ SUPPLEMENTAL_LOG


NO NO

代码语言:javascript
复制
alter database force logging;
代码语言:javascript
复制
alter database add supplemental log data;

再次查看辅助日志状态(为YES即可)

代码语言:javascript
复制
select force_logging, supplemental_log_data_min from v$database;

FORCE_ SUPPLEMENTAL_LOG


YES YES

4、源端oracle创建复制用户

root用户建立相关文件夹,并赋予权限

代码语言:javascript
复制
mkdir -p /data/oracle/oggdata/orcl
代码语言:javascript
复制
chown -R oracle:oinstall /data/oracle/oggdata/orcl

执行下面sql

代码语言:javascript
复制
SQL> create tablespace oggtbs datafile '/data/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on;

Tablespace created.

SQL>  create user ogg identified by ogg default tablespace oggtbs;

User created.

SQL> grant dba to ogg;

Grant succeeded.

Oracle创建测试表

代码语言:javascript
复制
create user test_ogg  identified by test_ogg default tablespace users;
grant dba to test_ogg;
conn test_ogg/test_ogg;
create table test_ogg(id int ,name varchar(20),primary key(id));

5、OGG源端配置

代码语言:javascript
复制
ggsci
代码语言:javascript
复制
create subdirs
代码语言:javascript
复制
dblogin userid ogg password ogg
代码语言:javascript
复制
edit param ./globals

oggschema ogg

配置管理器mgr

代码语言:javascript
复制
edit param mgr

PORT 7809 DYNAMICPORTLIST 7810-7909 AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 * PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3

添加复制表

代码语言:javascript
复制
add trandata test_ogg.test_ogg

info trandata test_ogg.test_ogg

配置extract进程(ORACLE_SID与Orcale中的相同)

代码语言:javascript
复制
edit param extkafka

extract extkafka dynamicresolution SETENV (ORACLE_SID = “orcl11g”) SETENV (NLS_LANG = “american_america.AL32UTF8”) userid ogg,password ogg exttrail /da ta/ogg/dirdat/to table test_ogg.test_ogg;

代码语言:javascript
复制
add extract extkafka,tranlog,begin now

若报错

ERROR: Could not create checkpoint file /opt/ogg/dirchk/EXTKAFKA.cpe (error 2, No such file or directory).

执行下面的命令再重新添加即可。

代码语言:javascript
复制
create subdirs
代码语言:javascript
复制
add exttrail /data/ogg/dirdat/to,extract extkafka

配置pump进程

代码语言:javascript
复制
edit param pukafka

extract pukafka passthru dynamicresolution userid ogg,password ogg rmthost Carlota2 mgrport 7809 rmttrail /data/ogg/dirdat/to table test_ogg.test_ogg;

代码语言:javascript
复制
add extract pukafka,exttrailsource /data/ogg/dirdat/to
代码语言:javascript
复制
add rmttrail /data/ogg/dirdat/to,extract pukafka

配置define文件(Oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输,故需要定义表之间的关系映射,)

代码语言:javascript
复制
edit param test_ogg

defsfile /data/ogg/dirdef/test_ogg.test_ogg userid ogg,password ogg table test_ogg.test_ogg;

返回终端执行

代码语言:javascript
复制
./defgen paramfile dirprm/test_ogg.prm

将生成的/data/ogg/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里:

代码语言:javascript
复制
scp -r /data/ogg/dirdef/test_ogg.test_ogg root@Carlota2:/opt/ogg/dirdef/ 

6、OGG目标端配置

开启kafka服务

代码语言:javascript
复制
zkServer.sh start

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 
代码语言:javascript
复制
ggsic

配置管理器mgr

代码语言:javascript
复制
edit param mgr

PORT 7809 DYNAMICPORTLIST 7810-7909 AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 * PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3

配置checkpoint

代码语言:javascript
复制
edit  param  ./GLOBALS

CHECKPOINTTABLE test_ogg.checkpoint

配置replicate进程

代码语言:javascript
复制
edit param rekafka

REPLICAT rekafka sourcedefs /data/ogg/dirdef/test_ogg.test_ogg TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;

配置kafka.props(去掉注释)

代码语言:javascript
复制
cd /opt/ogg/dirprm/
vim kafka.props

gg.handlerlist=kafkahandler //handler类型 gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相关配置 gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名称,无需手动创建 gg.handler.kafkahandler.format=json //传输文件的格式,支持json,xml等 gg.handler.kafkahandler.mode=op //OGG for Big Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次 gg.classpath=dirprm/:/usr/local/apps/kafka_2.12-2.5.0/libs/:/opt/ogg/:/opt/ogg/lib/

代码语言:javascript
复制
vim custom_kafka_producer.properties

bootstrap.servers=192.168.44.129:9092 //kafkabroker的地址 acks=1 compression.type=gzip //压缩类型 reconnect.backoff.ms=1000 //重连延时 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=102400 linger.ms=10000

添加trail文件到replicate进程

代码语言:javascript
复制
add replicat rekafka exttrail /data/ogg/dirdat/to,checkpointtable test_ogg.checkpoint

7、测试

在源端和目标端的OGG命令行下使用start [进程名]的形式启动所有进程。 启动顺序按照源mgr——目标mgr——源extract——源pump——目标replicate来完成。 全部需要在ogg目录下执行ggsci目录进入ogg命令行。 源端依次是

代码语言:javascript
复制
start mgr
start extkafka
start pukafka

目标端

代码语言:javascript
复制
start mgr
start rekafka

GGSCI (Carlota2) 1> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING REPLICAT RUNNING REKAFKA 00:00:00 00:00:08

GGSCI (Carlota3) 1> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING EXTRACT RUNNING EXTKAFKA 00:00:00 00:00:00 EXTRACT RUNNING PUKAFKA 00:00:00 00:00:10

现在源端执行sql语句

代码语言:javascript
复制
conn test_ogg/test_ogg
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete test_ogg where id=1;
commit;

查看源端trail文件状态

代码语言:javascript
复制
ls -l /data/ogg/dirdat/to*

查看目标端trail文件状态

代码语言:javascript
复制
ls -l /data/ogg/dirdat/to*

查看kafka是否自动建立对应的主题

代码语言:javascript
复制
kafka-topics.sh --list --zookeeper localhost:2181

在列表中显示有test_ogg则表示没问题 通过消费者看是否有同步消息

代码语言:javascript
复制
kafka-console-consumer.sh --bootstrap-server Carlota2:9092 --topic test_ogg --from-beginning

{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“I”,“op_ts”:“2020-07-31 13:42:33.072327”,“current_ts”:“2020-07-31T13:42:38.928000”,“pos”:“00000000000000001066”,“after”:{“ID”:1,“NAME”:“test”}} {“table”:“TEST_OGG.TEST_OGG”,“op_type”:“U”,“op_ts”:“2020-07-31 13:42:46.005763”,“current_ts”:“2020-07-31T13:42:52.201000”,“pos”:“00000000000000001204”,“before”:{},“after”:{“ID”:1,“NAME”:“zhangsan”}} {“table”:“TEST_OGG.TEST_OGG”,“op_type”:“D”,“op_ts”:“2020-07-31 13:42:57.079268”,“current_ts”:“2020-07-31T13:43:02.231000”,“pos”:“00000000000000001347”,“before”:{“ID”:1}}

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-07-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0、本篇中源端和目标端的一些配置信息:
  • 1、源端OGG安装
  • 2、目标端OGG安装
  • 3、源端Oracle归档模式设置
  • 4、源端oracle创建复制用户
  • 5、OGG源端配置
  • 6、OGG目标端配置
  • 7、测试
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档