这里的命令以kafka2.2之后版本进行说明,社区推荐命令指定 --bootstrap-server参数,受kafka安全认证体系的约束,如果使用 --zookeeper 会绕过 Kafka 的安全体系。
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1
bin/kafka-topics.sh --bootstrap-server broker_host:port --list
bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数>
以 max.message.bytes为例
bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
eg:bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name testInfoTopic --alter --add-config max.message.bytes=128000
查看topic修改情况
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic testInfoTopic
Topic: testInfoTopic TopicId: KzPy24fVSsCR03ZOYRzq8g PartitionCount: 3 ReplicationFactor: 1 Configs: max.message.bytes=128000,unclean.leader.election.enable=false
Topic: testInfoTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: testInfoTopic Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: testInfoTopic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
zookeeper 查看修改后内容
./zookeeper-shell.sh localhost:2181
> get /config/topics/testInfoTopic
{"version":1,"config":{"max.message.bytes":"128000"}}
bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config max.message.bytes
限制某个主题副本在执行副本同步机制时,带宽消耗不要过多(不得占用超过 100MBps)
--entity-name 就是 Broker ID。倘若该主题的副本分别在 0、1、2 多个 Broker 上,那么你还要依次为 Broker 1、2、3 执行这条命令。
for i in {0..2}
do
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name $i
done
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>
broker_host:port ==> localhost:9092
Topic ==> testInfoTopic
Consumer Group ==> G1
./kafka-console-producer.sh --broker-list localhost:9092 --topic testInfoTopic
举例:
# 指定生产者参数 acks 为 -1,同时启用了 LZ4 的压缩算法
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testInfoTopic --request-required-acks -1 --producer-property compression.type=lz4
向topic 发送10w条消息,每条消息1KB,在producer-props 后面指定要设置的生产者参数,比如本例中的压缩算法、延时时间等
bin/kafka-producer-perf-test.sh --topic testInfoTopic --num-records 100000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=localhost:9092 acks=-1 linger.ms=2000 compression.type=lz4
100000 records sent, 24764.735017 records/sec (24.18 MB/sec), 93.07 ms avg latency, 672.00 ms max latency, 56 ms 50th, 301 ms 95th, 325 ms 99th, 335 ms 99.9th.
生产吞吐量,消息发送延迟都可以看到
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testInfoTopic
举例:
# 注意,这里消费最好指定一个消费组G1,如果没有指定的话,每次运行 Console Consumer,它都会自动生成一个新的消费者组来消费。时间长久后,就会产生大量的以 console-consumer的消费者组
# --from-beginning 等同于Consumer 端参数 auto.offset.reset 设置成 earliest;如果不指定,会默认从最新位移消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testInfoTopic --group G1 --from-beginning --consumer-property enable.auto.commit=false
bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --messages 100000 --topic testInfoTopic
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-11-05 10:35:46:535, 2022-11-05 10:35:48:699, 97.5835, 45.0940, 100013, 46216.7283, 665, 1499, 65.0990, 66719.8132
消费吞吐量的指标
bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>
./bin/kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --group G1 --describe
1 https://zhuanlan.zhihu.com/p/578421962 Kafka消费者组消费进度实现窥探
该主题保存了消费者组的位移数据,默认有50个分区
如果该主题的副本值已经是 1 了,我们如何增加该主题的副本到3
第一步:创建一个 json 文件,显式提供 50 个分区对应的副本数,注意要将replicas 中的 3 台 Broker 排列顺序不同,使 Leader 副本均匀地分散在 Broker上
{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]},
{"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
...
{"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}`
第二步:执行kafka-reassign-partitions.sh
bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
该主题为了支持事务引入的,默认有50个分区,操作方法参考__consumer_offsets
解决办法:重启broker后,会自动恢复
解决办法:
第 1 步,手动删除 ZooKeeper 节点 /admin/delete_topics 下待删除topic的 znode。
第 2 步,手动删除该主题在磁盘上的分区目录。
第 3 步,在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。(会导致大量的leader重选举)
可以用 jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题。
重启对应的broker节点
#### kafka推荐配置
auto.create.topics.enable=false # 是否允许自动创建Topic
unclean.leader.election.enable=false # 是否允许 Unclean Leader 选举
auto.leader.rebalance.enable=false # 是否允许定期进行 Leader 选举。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。