我们上一章介绍了中间件:Zookeeper,本章将介绍另外一个中间件:Kafka。目前这2个中间件都是基于JAVA语言的。
对于运维来说,一个正常的Kafka的集群,一般情况下我们是不需要去操作ZooKeeper来维护它的集群状态的。我们对Kafka的操作,大部分都通过Kafka自带的脚本来操作它,下面我们就来介绍下使用较多的命令。(部分命令在前面的操作里面已经有演示)。
#下面都是默认自带的脚本
#根据环境和需求不一样,大部分我们都用不到,我们最常用就那么几个
[root@localhost ~]# ll /root/kafka_2.13-2.8.2/bin/
total 156
-rwxr-xr-x 1 root root 1423 Sep 2 2022 connect-distributed.sh
-rwxr-xr-x 1 root root 1396 Sep 2 2022 connect-mirror-maker.sh
-rwxr-xr-x 1 root root 1420 Sep 2 2022 connect-standalone.sh
-rwxr-xr-x 1 root root 861 Sep 2 2022 kafka-acls.sh
-rwxr-xr-x 1 root root 873 Sep 2 2022 kafka-broker-api-versions.sh
-rwxr-xr-x 1 root root 860 Sep 2 2022 kafka-cluster.sh
-rwxr-xr-x 1 root root 864 Sep 2 2022 kafka-configs.sh
-rwxr-xr-x 1 root root 945 Sep 2 2022 kafka-console-consumer.sh
-rwxr-xr-x 1 root root 944 Sep 2 2022 kafka-console-producer.sh
-rwxr-xr-x 1 root root 871 Sep 2 2022 kafka-consumer-groups.sh
-rwxr-xr-x 1 root root 948 Sep 2 2022 kafka-consumer-perf-test.sh
-rwxr-xr-x 1 root root 871 Sep 2 2022 kafka-delegation-tokens.sh
-rwxr-xr-x 1 root root 869 Sep 2 2022 kafka-delete-records.sh
-rwxr-xr-x 1 root root 866 Sep 2 2022 kafka-dump-log.sh
-rwxr-xr-x 1 root root 863 Sep 2 2022 kafka-features.sh
-rwxr-xr-x 1 root root 870 Sep 2 2022 kafka-leader-election.sh
-rwxr-xr-x 1 root root 863 Sep 2 2022 kafka-log-dirs.sh
-rwxr-xr-x 1 root root 873 Sep 2 2022 kafka-metadata-shell.sh
-rwxr-xr-x 1 root root 862 Sep 2 2022 kafka-mirror-maker.sh
-rwxr-xr-x 1 root root 886 Sep 2 2022 kafka-preferred-replica-election.sh
-rwxr-xr-x 1 root root 959 Sep 2 2022 kafka-producer-perf-test.sh
-rwxr-xr-x 1 root root 874 Sep 2 2022 kafka-reassign-partitions.sh
-rwxr-xr-x 1 root root 874 Sep 2 2022 kafka-replica-verification.sh
-rwxr-xr-x 1 root root 10329 Sep 2 2022 kafka-run-class.sh
-rwxr-xr-x 1 root root 1376 Sep 2 2022 kafka-server-start.sh
-rwxr-xr-x 1 root root 1361 Sep 2 2022 kafka-server-stop.sh
-rwxr-xr-x 1 root root 860 Sep 2 2022 kafka-storage.sh
-rwxr-xr-x 1 root root 945 Sep 2 2022 kafka-streams-application-reset.sh
-rwxr-xr-x 1 root root 863 Sep 2 2022 kafka-topics.sh
-rwxr-xr-x 1 root root 958 Sep 2 2022 kafka-verifiable-consumer.sh
-rwxr-xr-x 1 root root 958 Sep 2 2022 kafka-verifiable-producer.sh
-rwxr-xr-x 1 root root 1714 Sep 2 2022 trogdor.sh
drwxr-xr-x 2 root root 4096 Sep 2 2022 windows
-rwxr-xr-x 1 root root 867 Sep 2 2022 zookeeper-security-migration.sh
-rwxr-xr-x 1 root root 1393 Sep 2 2022 zookeeper-server-start.sh
-rwxr-xr-x 1 root root 1366 Sep 2 2022 zookeeper-server-stop.sh
-rwxr-xr-x 1 root root 1019 Sep 2 2022 zookeeper-shell.sh
kafka-topics.sh(主题管理)# 创建 Topic(指定分区数、副本因子)
kafka-topics.sh --bootstrap-server <broker:port> --create --topic <topic_name> --partitions <N> --replication-factor <N>
# 列出所有 Topic
kafka-topics.sh --bootstrap-server <broker:port> --list
# 查看 Topic 详细信息(分区、副本分布、ISR 等)
kafka-topics.sh --bootstrap-server <broker:port> --describe --topic <topic_name>
# 删除 Topic
kafka-topics.sh --bootstrap-server <broker:port> --delete --topic <topic_name>2. kafka-console-producer.sh(控制台生产者)
# 发送普通文本消息
kafka-console-producer.sh --bootstrap-server <broker:port> --topic <topic_name>
# 指定消息 Key(配合 parse.key 使用)
kafka-console-producer.sh --bootstrap-server <broker:port> --topic <topic_name> --property "parse.key=true" --property "key.separator=:"kafka-console-consumer.sh(控制台消费者)# 消费最新消息(从头开始加 --from-beginning)
kafka-console-consumer.sh --bootstrap-server <broker:port> --topic <topic_name>
# 显示消息 Key 和 Value
kafka-console-consumer.sh --bootstrap-server <broker:port> --topic <topic_name> --property print.key=true
# 指定消费者组(避免重复消费)
kafka-console-consumer.sh --bootstrap-server <broker:port> --topic <topic_name> --group <group_id>kafka-consumer-groups.sh(消费者组管理)# 列出所有消费者组
kafka-consumer-groups.sh --bootstrap-server <broker:port> --list
# 查看消费者组详情(Lag、Offset 等)
kafka-consumer-groups.sh --bootstrap-server <broker:port> --describe --group <group_id>
# 重置 Offset 到指定位置(如 earliest、latest 或特定值)
kafka-consumer-groups.sh --bootstrap-server <broker:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --executekafka-configs.sh(动态配置管理)# 查看 Topic 的配置
kafka-configs.sh --bootstrap-server <broker:port> --entity-type topics --entity-name <topic_name> --describe
# 修改 Topic 的消息保留时间
kafka-configs.sh --bootstrap-server <broker:port> --entity-type topics --entity-name <topic_name> --alter --add-config retention.ms=864000006. kafka-acls.sh(权限管理)
# 授予生产者权限
kafka-acls.sh --bootstrap-server <broker:port> --add --allow-principal User:<user> --producer --topic <topic_name>
# 授予消费者权限
kafka-acls.sh --bootstrap-server <broker:port> --add --allow-principal User:<user> --consumer --topic <topic_name> --group <group_id>kafka-server-start.sh(启动 Broker)# 指定配置文件启动
kafka-server-start.sh /path/to/server.propertieskafka-reassign-partitions.sh(分区重分配)# 生成分区重分配计划(JSON 格式)
kafka-reassign-partitions.sh --bootstrap-server <broker:port> --generate --topics-to-move-json-file <plan.json> --broker-list "0,1,2"
# 执行重分配
kafka-reassign-partitions.sh --bootstrap-server <broker:port> --execute --reassignment-json-file <plan.json>三、调试与监控
kafka-dump-log.sh(日志分析)# 查看某个分区日志的头部和尾部消息
kafka-dump-log.sh --files /path/to/partition-log-00000 --print-data-logkafka-producer-perf-test.sh(生产者压测)kafka-producer-perf-test.sh --topic <topic_name> --num-records 100000 --record-size 1000 --throughput 1000 --producer-props bootstrap.servers=<broker:port>kafka-topics.sh, kafka-console-producer.sh, kafka-console-consumer.sh, kafka-consumer-groups.sh, kafka-configs.sh, kafka-server-start.sh.--bootstrap-server(新版替代 --zookeeper)、--describe、--alter、--reset-offsets