环境: 源端:Oracle12.2 ogg for Oracle 12.3 目标端:Kafka ogg for bigdata 12.3 将Oracle中的数据通过OGG同步到Kafka
源端配置: 1、为要同步的表添加附加日志 dblogin USERID ogg@orclpdb, PASSWORD ogg add trandata scott.tab1 add trandata scott.tab2
2、 添加抽取进程 GGSCI>add extract EXT_KAF1,integrated tranlog, begin now GGSCI>add EXTTRAIL ./dirdat/k1, extract EXT_KAF1,MEGABYTES 200
编辑抽取进程参数: GGSCI> edit params EXT_KAF1
extract EXT_KAF1 userid c##ggadmin,PASSWORD ggadmin LOGALLSUPCOLS UPDATERECORDFORMAT COMPACT exttrail ./dirdat/k1,FORMAT RELEASE 12.3 SOURCECATALOG orclpdb --(指定pdb) table scott.tab1; table scott.tab2;
注册进程 GGSCI> DBLOGIN USERID c##ggadmin,PASSWORD ggadmin GGSCI> register extract EXT_KAF1 database container (orclpdb)
3、添加投递进程: GGSCI>add extract PMP_KAF1, exttrailsource ./dirdat/k1 GGSCI>add rmttrail ./dirdat/f1,EXTRACT PMP_KAF1,MEGABYTES 200
编辑投递进程参数: GGSCI>edit param PMP_KAF1
EXTRACT PMP_KAF1 USERID c##ggadmin,PASSWORD ggadmin PASSTHRU RMTHOST 10.1.1.247, MGRPORT 9178 RMTTRAIL ./dirdat/f1,format release 12.3 SOURCECATALOG orclpdb TABLE scott.tab1; table scott.tab2;
4、添加数据初始化进程(Oracle initial load) 可以多个表分开初始化也可以一起初始化,此处选择分开初始化 GGSCI> add extract ek_01, sourceistable
编辑参数: GGSCI> EDIT PARAMS ek_01
EXTRACT ek_01 USERID c##ggadmin,PASSWORD ggadmin RMTHOST 10.1.1.247, MGRPORT 9178 RMTFILE ./dirdat/ka,maxfiles 999, megabytes 500,format release 12.3 SOURCECATALOG orclpdb table scott.tab1;
GGSCI> add extract ek_02, sourceistable
EDIT PARAMS ek_02 EXTRACT ek_02 USERID c##ggadmin,PASSWORD ggadmin RMTHOST 10.1.1.247, MGRPORT 9178 RMTFILE ./dirdat/kb,maxfiles 999, megabytes 500,format release 12.3 SOURCECATALOG orclpdb table scott.tab2;
5、生成def文件: GGSCI> edit param defgen1
USERID c##ggadmin,PASSWORD ggadmin defsfile /home/oracle/ogg/ggs12/dirdef/defgen1.def,format release 12.3 SOURCECATALOG orclpdb table scott.tab1; table scott.tab2;
在OGG_HOME下执行如下命令生成def文件 defgen paramfile dirprm/defgen1.prm
将生成的def文件传到目标端$OGG_HOME/dirdef下
目标端配置: 1、将$OGG_HOME/AdapterExamples/big-data/kafka下的所有文件copy到$OGG_HOME/dirprm下 cd $OGG_HOME/AdapterExamples/big-data/kafka cp * $OGG_HOME/dirprm
2、将$ORACLE_HOME/AdapterExamples/trail下的文件tr000000000 copy到$OGG_HOME/dirdat下 cd $ORACLE_HOME/AdapterExamples/trail cp tr000000000 $OGG_HOME/dirdat
3、添加初始化进程:(可以多表一起初始化也可以分开初始化,此处选择单独初始化) GGSCI> ADD replicat rp_01, specialrun
GGSCI> EDIT PARAMS rp_01 SPECIALRUN end runtime setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK") targetdb libfile libggjava.so set property=./dirprm/kafka1.props SOURCEDEFS ./dirdef/defgen1.def EXTFILE ./dirdat/ka reportcount every 1 minutes, rate grouptransops 10000 MAP orclpdb.scott.tab1, TARGET scott.tab1;
GGSCI> ADD replicat rp_02, specialrun GGSCI> EDIT PARAMS rp_02
SPECIALRUN end runtime setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK") targetdb libfile libggjava.so set property=./dirprm/kafka2.props SOURCEDEFS ./dirdef/defgen1.def EXTFILE ./dirdat/kb reportcount every 1 minutes, rate grouptransops 10000 MAP orclpdb.scott.tab2, TARGET scott.tab2;
4、添加恢复进程: GGSCI>add replicat r_kaf1,exttrail ./dirdat/f1 GGSCI>edit params r_kaf1
REPLICAT r_kaf1 setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK") HANDLECOLLISIONS targetdb libfile libggjava.so set property=./dirprm/kafka1.props SOURCEDEFS ./dirdef/defgen1.def reportcount every 1 minutes, rate grouptransops 10000 MAP orclpdb.scott.tab1, TARGET scott.tab1;
GGSCI> add replicat r_kaf2,exttrail ./dirdat/f2 GGSCI> edit params r_kaf2
REPLICAT r_kaf2 setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK") HANDLECOLLISIONS targetdb libfile libggjava.so set property=./dirprm/kafka2.props SOURCEDEFS ./dirdef/defgen1.def reportcount every 1 minutes, rate grouptransops 10000 MAP orclpdb.scott.tab2, TARGET scott.tab2;
5、参数配置: custom_kafka_producer.properties文件内容如下:
bootstrap.servers=10.1.1.246:9200,10.1.1.247:9200 --只需要改动这一行就行,指定kafka的地址和端口号 acks=1 reconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=16384 linger.ms=10000
kafka1.props文件内容如下: gg.handlerlist = kafkahandler gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties #The following resolves the topic name using the short table name gg.handler.kafkahandler.topicMappingTemplate= topic1 #gg.handler.kafkahandler.format=avro_op gg.handler.kafkahandler.format =json --这里做了改动,指定格式为json格式 gg.handler.kafkahandler.format.insertOpKey=I gg.handler.kafkahandler.format.updateOpKey=U gg.handler.kafkahandler.format.deleteOpKey=D gg.handler.kafkahandler.format.truncateOpKey=T gg.handler.kafkahandler.format.prettyPrint=false gg.handler.kafkahandler.format.jsonDelimiter=CDATA[] gg.handler.kafkahandler.format.includePrimaryKeys=true --包含主键 gg.handler.kafkahandler.SchemaTopicName= topic1 --此处指定为要同步到的目标topic名字 gg.handler.kafkahandler.BlockingSend =false gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode=op goldengate.userexit.timestamp=utc goldengate.userexit.writers=javawriter javawriter.stats.display=TRUE javawriter.stats.full=TRUE gg.log=log4j gg.log.level=INFO gg.report.time=30sec #Sample gg.classpath for Apache Kafka gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/ --指定classpath,这里很重要,必须有kafka安装文件的类库。 #Sample gg.classpath for HDP #gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/ javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
启动进程进程恢复: 1、启动源端抓取进程 GGSCI> start EXT_KAF1 2、启动源端投递进程 GGSCI> start PMP_KAF1 3、启动源端初始化进程 GGSCI> start ek_01 4、启动目标端初始化进程 在$OGG_HOME下执行如下命令: ./replicat paramfile ./dirprm/rp_01.prm reportfile ./dirrpt/rp_01.rpt -p INITIALDATALOAD 5、启动目标端恢复进程 GGSCI> start R_KAF1
遇到的错误: 1、ERROR OGG-15050 Error loading Java VM runtime library(2 no such file or directory)
原因:找不到类库(配置好环境变量之后,OGG的mgr进程没有重启,导致的) 解决:重启MGR进程
2、ERROR OG-15051 Java or JNI exception
原因:没有使用ogg12.3.1.1.1自带的kafka.props,而是copy了ogg12.2的kafka.props,导致出现异常。 解决:使用ogg12.3.1.1.1自带的kafka.props,并指定相关的属性,解决。