在上篇文章中我们介绍了MirrorMaker-V1(MM1),本质上MM1是Kafka的消费者和生产者结合体,可以有效地将数据从源群集移动到目标群集,但没有提供太多其他功能。
并且在MM1多年的使用过程中发现了以下局限性:
针对这些无法无法满足企业需要的点,多个企业提供了自己的解决方案,例如:
而kafka开源社区也终于在kafka2.4带来了自己的企业级解决方案MirrorMaker-V2(MM2)。MM2修复了MM1所存在的局限性。
MM2是基于kafka connect框架开发的。与其它的kafka connecet一样MM2有source connector和sink connetor组成,不熟悉kafka connect概念的同学可以这样认为,source connector就是MM1中的消费者,它负责读取远端数据中心的数据。sink connetor是生产者,它负责将拉回来的数据写入本地的数据中心。
MM2共含有4种类型的connector:
与MM1不同的是,MM2的source和sink两个connector包含了,源数据的消费者,目标数据的生产者,和一对AdminClient用来同步topic配置信息。
它的部署方式跟MM1相同,都是部署在目标集群方。
官方提供了4中部署方式:
本来cosmozhu准备使用第三中方式运行MM2集群。因为使用connect cluster运行后可以使用kafka connect restful api 来管理task。但是在实际操作过程中发现这部分还没有开发完成。
这里cosmozhu推荐使用第一种方式,第二种只能用于测试,第四种方式是为了兼容MM1的老用户。
MM2的启动脚本是connect-mirror-maker.sh
,从名称上来看connect开头,很明显这块是纳入到了kafka-connect框架。它的启动配置文件为config/connect-mirror-maker.properties
以最新版本kafka2.5为例。
#定义集群别名
clusters = A, B
A.bootstrap.servers = A\_host1:9092, A\_host2:9092, A\_host3:9092 # 设置A集群的kafka地址列表
B.bootstrap.servers = B\_host1:9092, B\_host2:9092, B\_host3:9092 # 设置B集群的kafka地址列表
A->B.enabled = true # 开启A集群向B集群同步
A->B.topics = .\* # 允许同步topic的正则
B->A.enabled = true # 开启B集群向A集群同步
B->A.topics = .\* # 允许同步topic的正则
#MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
#自定义参数
sync.topic.configs.enabled=true #是否同步源topic配置信息
sync.topic.acls.enabled=true #是否同步源ACL信息
emit.heartbeats.enabled=true #连接器是否发送心跳
emit.heartbeats.interval.seconds=5 #心跳间隔
emit.checkpoints.enabled=true #是否发送检查点
refresh.topics.enabled=true #是否刷新topic列表
refresh.topics.interval.seconds=5 #刷新间隔
refresh.groups.enabled=true #是否刷新消费者组id
refresh.groups.interval.seconds=5 #刷新间隔
readahead.queue.capacity=500 #连接器消费者预读队列大小
replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy #使用LegacyReplicationPolicy模仿MM1
heartbeats.topic.retention.ms=1 day #首次创建心跳主题时,设置心跳数据保留时长
checkpoints.topic.retention.ms=1 day #首次创建检查点主题时,设置检查点数据保留时长
offset.syncs.topic.retention.ms=max long #首次创建偏移量主题时,设置偏移量数据保留时长
replication.factor=2 #远端创建新topic的replication数量设置
bin/connect-mirror-maker.sh config/connect-mirror-maker.properties
参考资料:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
https://blog.cloudera.com/kafka-replication-the-case-for-mirrormaker-2-0/
https://blog.cloudera.com/a-look-inside-kafka-mirrormaker-2/
作者:cosmozhu --90后的老父亲,专注于保护地球的程序员
欢迎转载,转载时请注明出处。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。