Oracle Golden Gate软件是一种基于日志的结构化数据复制备份软件,它通过解析源数据库在线日志或归档日志获得数据的增量变化,再将这些变化应用到目标数据库,从而实现源数据库与目标数据库同步。
- | 版本 | OGG版本 | id地址 |
---|---|---|---|
源端 | Oracle11gR2 | Oracle GoldenGate 11.2.1.0.1 for Oracle on Linux x86-64 | Carlota3 |
目标端 | kafka_2.12-2.5.0 | Linux x86-64上的Oracle GoldenGate for Big Data 19.1.0.0.1 | Carlota2 |
源端和目标端的文件不一样,目标端需要下载Oracle GoldenGate for Big Data,源端需要下载Oracle GoldenGate for Oracle!
PS:源端是安装好了Oracle的机器,目标端是安装好了Kafka的机器,二者环境变量之前都配置好了。
先建立ogg目录
mkdir -p /opt/ogg
解压zip文件
unzip ogg112101_fbo_ggs_Linux_x64_ora11g_64bit.zip
解压后得到一个tar包,再解压这个tar
tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /opt/ogg
使oracle用户有ogg的权限,后面有些需要在oracle用户下执行才能成功
chown -R oracle:oinstall /data/ogg
配置OGG环境变量
vim /etc/profile
export OGG_HOME=/opt/ogg export LD_LIBRARY_PATH=
OGG_HOME:$PATH
source /etc/profile
先建立ogg目录
mkdir -p /data/ogg
解压zip文件
unzip OGG_BigData_Linux_x64_19.1.0.0.1.zip
解压后得到一个tar包,再解压这个tar
tar xf OGG_BigData_Linux_x64_19.1.0.0.1.tar
使oracle用户有ogg的权限,后面有些需要在oracle用户下执行才能成功
chown -R oracle:oinstall /data/ogg
配置OGG环境变量
vim /etc/profile
export OGG_HOME=/opt/ogg export LD_LIBRARY_PATH=
JAVA_HOME/jre/lib/amd64/server:
JAVA_HOME/jre/lib/amd64/server/libjvm.so:
OGG_HOME:$PATH
source /etc/profile
ggsci
create subdirs
登陆Oracle用户
su - oracle
登陆Oracle
sqlplus / as sysdba
查看当前是否为归档模式(若为Disabled,则需手动打开)
archive log list
Database log mode No Archive Mode Automatic archival Disabled Archive destination USE_DB_RECOVERY_FILE_DEST Oldest online log sequence 12 Current log sequence 14
立即关闭数据库
shutdown immediate
启动实例并加载数据库,但不打开
startup mount
更改数据库为归档模式
alter database archivelog;
打开数据库
alter database open;
启用自动归档
alter system archive log start;
再次查看当前是否为归档模式(看到为Enabled,则成功打开归档模式。)
archive log list
Database log mode Archive Mode Automatic archival Enabled Archive destination USE_DB_RECOVERY_FILE_DEST Oldest online log sequence 12 Next log sequence to archive 14 Current log sequence 14
查看辅助日志状态(若为NO,则需要通过命令修改)
select force_logging, supplemental_log_data_min from v$database;
FORCE_ SUPPLEMENTAL_LOG
NO NO
alter database force logging;
alter database add supplemental log data;
再次查看辅助日志状态(为YES即可)
select force_logging, supplemental_log_data_min from v$database;
FORCE_ SUPPLEMENTAL_LOG
YES YES
root用户建立相关文件夹,并赋予权限
mkdir -p /data/oracle/oggdata/orcl
chown -R oracle:oinstall /data/oracle/oggdata/orcl
执行下面sql
SQL> create tablespace oggtbs datafile '/data/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on;
Tablespace created.
SQL> create user ogg identified by ogg default tablespace oggtbs;
User created.
SQL> grant dba to ogg;
Grant succeeded.
Oracle创建测试表
create user test_ogg identified by test_ogg default tablespace users;
grant dba to test_ogg;
conn test_ogg/test_ogg;
create table test_ogg(id int ,name varchar(20),primary key(id));
ggsci
create subdirs
dblogin userid ogg password ogg
edit param ./globals
oggschema ogg
配置管理器mgr
edit param mgr
PORT 7809 DYNAMICPORTLIST 7810-7909 AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 * PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3
添加复制表
add trandata test_ogg.test_ogg
info trandata test_ogg.test_ogg
配置extract进程(ORACLE_SID与Orcale中的相同)
edit param extkafka
extract extkafka dynamicresolution SETENV (ORACLE_SID = “orcl11g”) SETENV (NLS_LANG = “american_america.AL32UTF8”) userid ogg,password ogg exttrail /da ta/ogg/dirdat/to table test_ogg.test_ogg;
add extract extkafka,tranlog,begin now
若报错
ERROR: Could not create checkpoint file /opt/ogg/dirchk/EXTKAFKA.cpe (error 2, No such file or directory).
执行下面的命令再重新添加即可。
create subdirs
add exttrail /data/ogg/dirdat/to,extract extkafka
配置pump进程
edit param pukafka
extract pukafka passthru dynamicresolution userid ogg,password ogg rmthost Carlota2 mgrport 7809 rmttrail /data/ogg/dirdat/to table test_ogg.test_ogg;
add extract pukafka,exttrailsource /data/ogg/dirdat/to
add rmttrail /data/ogg/dirdat/to,extract pukafka
配置define文件(Oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输,故需要定义表之间的关系映射,)
edit param test_ogg
defsfile /data/ogg/dirdef/test_ogg.test_ogg userid ogg,password ogg table test_ogg.test_ogg;
返回终端执行
./defgen paramfile dirprm/test_ogg.prm
将生成的/data/ogg/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里:
scp -r /data/ogg/dirdef/test_ogg.test_ogg root@Carlota2:/opt/ogg/dirdef/
开启kafka服务
zkServer.sh start
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
ggsic
配置管理器mgr
edit param mgr
PORT 7809 DYNAMICPORTLIST 7810-7909 AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 * PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3
配置checkpoint
edit param ./GLOBALS
CHECKPOINTTABLE test_ogg.checkpoint
配置replicate进程
edit param rekafka
REPLICAT rekafka sourcedefs /data/ogg/dirdef/test_ogg.test_ogg TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;
配置kafka.props(去掉注释)
cd /opt/ogg/dirprm/
vim kafka.props
gg.handlerlist=kafkahandler //handler类型 gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相关配置 gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名称,无需手动创建 gg.handler.kafkahandler.format=json //传输文件的格式,支持json,xml等 gg.handler.kafkahandler.mode=op //OGG for Big Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次 gg.classpath=dirprm/:/usr/local/apps/kafka_2.12-2.5.0/libs/:/opt/ogg/:/opt/ogg/lib/
vim custom_kafka_producer.properties
bootstrap.servers=192.168.44.129:9092 //kafkabroker的地址 acks=1 compression.type=gzip //压缩类型 reconnect.backoff.ms=1000 //重连延时 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=102400 linger.ms=10000
添加trail文件到replicate进程
add replicat rekafka exttrail /data/ogg/dirdat/to,checkpointtable test_ogg.checkpoint
在源端和目标端的OGG命令行下使用start [进程名]的形式启动所有进程。 启动顺序按照源mgr——目标mgr——源extract——源pump——目标replicate来完成。 全部需要在ogg目录下执行ggsci目录进入ogg命令行。 源端依次是
start mgr
start extkafka
start pukafka
目标端
start mgr
start rekafka
GGSCI (Carlota2) 1> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING REPLICAT RUNNING REKAFKA 00:00:00 00:00:08
GGSCI (Carlota3) 1> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING EXTRACT RUNNING EXTKAFKA 00:00:00 00:00:00 EXTRACT RUNNING PUKAFKA 00:00:00 00:00:10
现在源端执行sql语句
conn test_ogg/test_ogg
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete test_ogg where id=1;
commit;
查看源端trail文件状态
ls -l /data/ogg/dirdat/to*
查看目标端trail文件状态
ls -l /data/ogg/dirdat/to*
查看kafka是否自动建立对应的主题
kafka-topics.sh --list --zookeeper localhost:2181
在列表中显示有test_ogg则表示没问题 通过消费者看是否有同步消息
kafka-console-consumer.sh --bootstrap-server Carlota2:9092 --topic test_ogg --from-beginning
{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“I”,“op_ts”:“2020-07-31 13:42:33.072327”,“current_ts”:“2020-07-31T13:42:38.928000”,“pos”:“00000000000000001066”,“after”:{“ID”:1,“NAME”:“test”}} {“table”:“TEST_OGG.TEST_OGG”,“op_type”:“U”,“op_ts”:“2020-07-31 13:42:46.005763”,“current_ts”:“2020-07-31T13:42:52.201000”,“pos”:“00000000000000001204”,“before”:{},“after”:{“ID”:1,“NAME”:“zhangsan”}} {“table”:“TEST_OGG.TEST_OGG”,“op_type”:“D”,“op_ts”:“2020-07-31 13:42:57.079268”,“current_ts”:“2020-07-31T13:43:02.231000”,“pos”:“00000000000000001347”,“before”:{“ID”:1}}