Kafka的备份的单元是partition,也就是每个partition都都会有leader partiton和follow partiton。其中leader partition是用来进行和producer进行写交互,follow从leader副本进行拉数据进行同步,从而保证数据的冗余,防止数据丢失的目的。如图:
Kafka的数据备份包括两种类型:全量备份和增量备份。
下面分别介绍两种备份方式:
# 指定备份的主题
BACKUP_TOPIC=test
# 指定备份的数据目录
BACKUP_DIR=/tmp/backup
# 创建备份目录
mkdir -p $BACKUP_DIR
# 备份主题数据
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic $BACKUP_TOPIC \
--from-beginning \
> $BACKUP_DIR/$BACKUP_TOPIC.txt
上述代码使用 kafka-console-consumer.sh 工具将主题 BACKUP_TOPIC 的数据备份到 BACKUP_DIR 目录下的
增量备份
增量备份需要借助第三方工具,例如 Kafka 的 MirrorMaker 等实现 。
下面是 MirrorMaker 的用法示例:
# 指定源和目的地址
SOURCE_HOST=localhost:9092
DESTINATION_HOST=backup-host:9092
# 创建 MirrorMaker 配置文件
cat > /tmp/mirror-maker.properties <<EOF
consumer.bootstrap.servers=$SOURCE_HOST
producer.bootstrap.servers=$DESTINATION_HOST
EOF
# 运行 MirrorMaker
kafka-run-class.sh kafka.tools.MirrorMaker \
--consumer.config /tmp/mirror-maker.properties \
--producer.config /tmp/mirror-maker.properties \
--whitelist $BACKUP_TOPIC
上述代码中创建一个 MirrorMaker 配置文件将源端的数据同步到目标端--whitelist
参数指定备份的主题。
# 指定恢复的主题
RESTORE_TOPIC=test
# 指定备份文件路径
BACKUP_FILE=/tmp/backup/$RESTORE_TOPIC.txt
# 恢复主题数据
kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic $RESTORE_TOPIC \
--new-producer \
< $BACKUP_FILE
上述代码将BACKUP_FILE 文件中的数据恢复到 RESTORE_TOPIC 主题中。
注意:该脚本也是同步操作,恢复时间较长时建议使用异步操作。
增量恢复需要使用 MirrorMaker 来实现,下面是 MirrorMaker 的用法示例:
# 创建MirrorMaker 配置文件
cat > /tmp/mirror-maker.properties <<EOF
consumer.bootstrap.servers=backup-host:9092
producer.bootstrap.servers=localhost:9092
EOF
# 运行MirrorMaker
kafka-run-class.sh kafka.tools.MirrorMaker \
--consumer.config /tmp/mirror-maker.properties \
--producer.config /tmp/mirror-maker.properties \
--whitelist $RESTORE_TOPIC
上述代码中创建一个 MirrorMaker 配置文件将备份端的数据同步到目标端 $RESTORE_TOPIC
主题中。
脚本编写(备份和恢复)
下面是一个简单的脚本,用于备份和恢复 Kafka 数据:
#!/bin/bash
function backup_topic() {
local topic=$1
local backup_dir=$2
echo "Starting backup for topic: $topic"
mkdir -p $backup_dir
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic $topic \
--from-beginning \
> $backup_dir/$topic.txt
echo "Backup completed for topic: $topic"
}
function restore_topic() {
local topic=$1
local backup_file=$2
echo "Starting restore for topic: $topic"
kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic $topic \
--new-producer \
< $backup_file
echo "Restore completed for topic: $topic"
}
backup_topic example-topic /tmp/backup
restore_topic example-topic /tmp/backup/example-topic.txt
上述代码中定义了两个函数 backup_topic
和 restore_topic
,分别用于备份和恢复 Kafka 主题数据。在这个脚本中备份的主题是 example-topic
,备份数据存储的目录是 /tmp/backup
。
Kafka 跨集群备份
MirrorMaker 工具 : 实现消息或数据从一个集群到另一个集群的拷贝,
MirrorMaker : 消费者 + 生产者的程序,
consumer.config
: 指定 MirrorMaker 中消费者的配置文件地址 (bootstrap.servers) : 该 MirrorMaker 从哪个 Kafka 集群读取消息。因为 MirrorMaker 有可能在内部创建多个消费者实例并使用消费者组机制,设置 group.id 。配置 auto.offset.reset=earliest
。producer.config
: 指定 MirrorMaker 内部生产者组件的配置文件地址。bootstrap.servers
: 配置拷贝的消息要发送到的目标集群。num.streams
: 创建多少个 KafkaConsumer 实例。whitelist
: 接收一个正则表达式。所有匹配该正则表达式的主题都会被自动地执行镜像。.*
: 同步源集群上的所有主题。bin/kafka-mirror-maker.sh \
--consumer.config ./config/consumer.properties \
--producer.config ./config/producer.properties \
--num.streams 8 --whitelist ".*"
consumer.properties:
partition.assignment.strategy
bootstrap.servers=localhost:9092
group.id=mirrormaker
auto.offset.reset=earliest
producer.properties :
bootstrap.servers=localhost:9093
启动 MirrorMaker
bin/kafka-mirror-maker.sh \
--producer.config ../producer.config \
--consumer.config ../consumer.config \
--num.streams 4 \
--whitelist ".*"
两位移值的差值 = 该分区当前的消息数
获取最早的位移
bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9093 \
--topic test --time -2
获取最新的位移
bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9093 \
--topic test --time -1
参考文章:https://blog.csdn.net/qq_44226094/article/ details/124163789 https://baixinan.blog.csdn.net/ article/details/130474601