
生产环境kafka集群,在数据量大的情况下,经常会出现单机各个磁盘间的占用不均匀情况。
这是因为kafka在之前的版本中只保证分区数在各个磁盘上均匀分布,但因无法统计每个分区实际大小,导致大概率出现某些分区数据量巨大导致磁盘利用率不均衡。
在 kafka1.1 版本之前,用户对此基本没有什么优雅的处理方法,即便手动迁移日志文件和 offset 信息,也需要重启生效,风险极高。在 1.1 之前,kafka只支持分区数据在不同broker间的reassigment,而无法做到在同一个broker下的不同磁盘间做重新分配。而在1.1 版本后,kafka正式开始支持副本在不同路径间迁移,具体的实现细节可以看kafka官方wiki KIP-113。
假设我在server.properties文件中配置了多个日志存储路径(表示日志数据存储在多块磁盘),如下所示:
# A comma seperated list of directories under which to store log files
log.dirs=/data1/kafka-logs,/data2/kafka-logs,/data3/kafka-logs首先创建一个 9 分区的topic,并发送 1000W 条消息。查询这些数据目录,发现Kafka均匀地将 9 个分区分布在这三个路径上:
> ll /data1/kafka-logs/ |grep test-topic
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-3
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-4
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-5
> ll /data2/kafka-logs/ |grep test-topic
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-0
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-1
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-2
> ll /data3/kafka-logs/ |grep test-topic
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-6
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-7
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-8假设由于还有其他topic数据分布等原因,导致磁盘存储不均衡。需要将test-topic的6,7,8分区全部迁移到/data2路径下,并且把test-topic的1分区迁移到/data1下。若要实现这个需求,我们首先需要写一个JSON文件,migrate-replica.json:
{
"partitions": [
{
"topic": "test-topic",
"partition": 1,
"replicas": [
0
],
"log_dirs": [
"/data1/kafka-logs"
]
},
{
"topic": "test-topic",
"partition": 6,
"replicas": [
0
],
"log_dirs": [
"/data2/kafka-logs"
]
},
{
"topic": "test-topic",
"partition": 7,
"replicas": [
0
],
"log_dirs": [
"/data2/kafka-logs"
]
},
{
"topic": "test-topic",
"partition": 8,
"replicas": [
0
],
"log_dirs": [
"/data2/kafka-logs"
]
}
],
"version": 1
}其中,replicas中的 0 表示broker ID,由于本文只启动了一个broker,且broker.id = 0,故这里只写 0 即可。实际上你可以指定多个broker实现为多个broker同时迁移副本的功能。另外当前的version固定是 1。
保存好这个JSON后,我们执行以下命令执行副本迁移:
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --bootstrap-server localhost:9092 --reassignment-json-file ../migrate-replica.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test-topic","partition":8,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":4,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":5,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":2,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":6,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":3,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":1,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":7,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":0,"replicas":[0],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.执行完成后,我们再次查看存储目录副本分布:
> ll /data1/kafka-logs/ |grep test-topic
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-1
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-3
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-4
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-5
> ll /data2/kafka-logs/ |grep test-topic
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-0
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-1
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-2
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-6
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-7
drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-8
> ll /data3/kafka-logs/ |grep test-topic可以看到,6,7,8已经被成功地迁移到/data2下,而分区1也迁移到了/data1下。值得一提的是,不仅所有的日志段、索引文件被迁移,实际上分区外层的checkpoint文件也会被更新。比如我们检查/data2下的replication-offset-checkpoint文件可以发现,现在该文件已经包含了6,7,8分区的位移数据,如下所示:
> cat replication-offset-checkpoint
0
7
test-topic 8 1000000
test-topic 2 1000000
test 0 1285714
test-topic 6 1000000
test-topic 7 1000000
test-topic 0 1000000
test 2 1285714