设为首页 收藏本站
查看: 930|回复: 0

[经验分享] 使用Goldengate同步异构数据库Kafka中间件之一

[复制链接]

尚未签到

发表于 2019-1-31 09:43:43 | 显示全部楼层 |阅读模式
  收到业务部门需求,要求将Oracle数据库某表同步至Mysql数据库中,异构环境我们用kafka来实现,下面是具体的一些配置;
  由于业务需要,现申请使用架构组数据同步服务同步以下数据到管家MySQL数据库
  代理商用户数据:
a. 数据源:SSP库 AAA.system_user
b. 数据目标:MySQL DLS库 DLS_SYSTEM_USER
c. 同步逻辑: 无
d. 同步数据及对应关系:参见附件
e. 是否涉及敏感信息:否
  准备工作;由于目标库Mysql库该表已经存在,我们将该表备份并且获取建表语句;
--获取建表语句
mysql> show create table dls_system_user;
  --导出单个数据表结构和数据
mysqldump  -uroot -p  dls DLS_SYSTEM_USER > DLS_SYSTEM_USER_180622.sql
  --重命名表
ALTER TABLE DLS_SYSTEM_USERRENAME DLS_SYSTEM_USER_BAK0622;
  --新建空表
CREATE TABLE dls_system_user (
ID varchar(100) NOT NULL,
ACCOUNT_EXPIRED int(1) NOT NULL DEFAULT '0',
ACCOUNT_LOCKED int(1) NOT NULL DEFAULT '0',
ENABLED int(1) NOT NULL DEFAULT '0',
ORG_NO varchar(255) NOT NULL DEFAULT '',
USER_CODE varchar(100) NOT NULL DEFAULT '',
REMARK_NAME varchar(255) NOT NULL DEFAULT '',
IS_CREATE_PERSON varchar(255) NOT NULL DEFAULT '',
STATUS int(10) NOT NULL DEFAULT '0',
PRIMARY KEY (ID),
KEY IDX_DLS_SYSTEM_USER_USER_CODE (USER_CODE)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  Oracle源端GoldenGate配置:
1、为要同步的表添加附加日志
dblogin USERID ggs@ntestdb, password ggs
add trandata AAA.system_user
  2、 添加抽取进程
add extract ext_kafb, tranlog, begin now
add EXTTRAIL ./dirdat/a2, extract ext_kafb,MEGABYTES 200
  edit params EXT_KAFB
  extract EXT_KAFB                       
USERID ggs@ntestdb, password ggs
LOGALLSUPCOLS            
exttrail ./dirdat/a2,FORMAT RELEASE 11.2
table AAA.system_user;
  3、添加投递进程:
add extract pmp_kafb, exttrailsource ./dirdat/a2
add rmttrail ./dirdat/b2,EXTRACT pmp_kafb,MEGABYTES 200
  eidt params pmp_kafb
  EXTRACT pmp_kafb                     
USERID ggs@ntestdb, password ggs
PASSTHRU                                
RMTHOST 172.16.xxx.5, MGRPORT 9178  --kafka服务器地址
RMTTRAIL ./dirdat/b2,format release 11.2
table AAA.system_user;
  ----初始化文件存放在 /ggs/ggs12/dirprm/
  4.添加初始化进程
ADD EXTRACT ek_20, sourceistable    ---源端添加
  edit params ek_20
  EXTRACT ek_20
USERID ggs@ntestdb, password ggs
RMTHOST 172.16.154.5, MGRPORT 9178
RMTFILE ./dirdat/lb,maxfiles 999, megabytes 500
table AAA.system_user;
  5.生成def文件:
GGSCI> edit param defgen_n9
  USERID ggs@ntestdb, password ggs
defsfile /goldengate/ggskafka/dirdef/defgen_n9.def,format release 11.2
table AAA.system_user;
  在OGG_HOME下执行如下命令生成def文件
defgen paramfile /goldengate/ggskafka/dirprm/defgen_n9.prm
  将生成的def文件传到kafka服务器$OGG_HOME/dirdef下
  ---目标端mysql 数据库地址172.16.xxx.148,需要新建kafka用户
  grant select,insert,update,delete,create,drop on DLS.* to 'kafka'@'%' identified by 'jiubugaosuni';
  --kafka服务器GoldenGate操作
1、添加初始化进程:---dirprm
GGSCI> ADD replicat rn_3,specialrun
  EDIT PARAMS rn_3
  SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
targetdb libfile libggjava.so set property=./dirprm/kafkat_n3.props
SOURCEDEFS ./dirdef/defgen_n9.def
EXTFILE ./dirdat/lb
reportcount every 1 minutes, rate
grouptransops 10000
MAP  AAA.system_user, TARGET DLS.DLS_SYSTEM_USER;
  2、添加复制进程:
GGSCI>add replicat RN_KF3,exttrail ./dirdat/b2
GGSCI>edit params RN_KF3
  REPLICAT RN_KF3
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
HANDLECOLLISIONS
targetdb libfile libggjava.so set property=./dirprm/kafkat_n3.props
SOURCEDEFS ./dirdef/defgen_n9.def
reportcount every 1 minutes, rate
grouptransops 10000
MAP  AAA.system_user, TARGET DLS.DLS_SYSTEM_USER;
  3、参数配置:
cd /home/app/ogg/ggs12/dirprm
  custom_kafka_producer.properties 文件内容如下:
  [app@test-datamanager dirprm]$ more custom_kafka_producer.properties
bootstrap.servers=172.16.xxx.5:9092,172.16.xxx.7:9092
acks=1
reconnect.backoff.ms=1000
  value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
100KB per partition
  batch.size=16384
linger.ms=0
  ---vi添加对应文件 kafkat_n3.props
kafka.props文件内容如下:
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate= DLS.DLS_MERCHANT_STATUS
#gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.format =json               --指定文件类型
gg.handler.kafkahandler.format.insertOpKey=I
gg.handler.kafkahandler.format.updateOpKey=U
gg.handler.kafkahandler.format.deleteOpKey=D
gg.handler.kafkahandler.format.truncateOpKey=T
gg.handler.kafkahandler.format.prettyPrint=false
gg.handler.kafkahandler.format.jsonDelimiter=CDATA[]
gg.handler.kafkahandler.format.includePrimaryKeys=true
gg.handler.kafkahandler.SchemaTopicName= DLS.DLS_MERCHANT_STATUS  --指定topic名称
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/  --patch路径
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
  至此我们配置算是基本完成,现在我们来开启进程,初始化数据;
1、启动源端抓取进程
GGSCI> start EXT_KAFB
2、启动源端投递进程
GGSCI> start pmp_kafb
3、启动源端初始化进程
GGSCI> start ek_20
4、启动目标端初始化进程
GGSCI> start rn_3
在$OGG_HOME下执行如下命令:
  ./replicat paramfile ./dirprm/rn_3.prm reportfile ./dirrpt/rn_3.rpt -p INITIALDATALOAD
5、启动目标端恢复进程
GGSCI> start RN_KF3




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669900-1-1.html 上篇帖子: 关于kafka更改消费者对应分组下的offset值 下篇帖子: 关于CDH5.11.0自带kafka 0.10 bootstrap
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表