要获取正在运行的Kafka集群上的所有属性,可以通过Kafka提供的命令行工具和API来实现。
kafka-configs.sh --bootstrap-server <kafka-bootstrap-server> --describe --all
其中,<kafka-bootstrap-server>
是Kafka集群的启动服务器地址。
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TimeoutException;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class KafkaClusterProperties {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "<kafka-bootstrap-servers>");
try (AdminClient adminClient = AdminClient.create(props)) {
// 获取所有topic的描述信息
ListTopicsResult topicsResult = adminClient.listTopics();
Map<String, TopicDescription> topics = topicsResult.namesToListings().get();
// 遍历所有topic,获取每个topic的配置信息
for (Map.Entry<String, TopicDescription> entry : topics.entrySet()) {
String topicName = entry.getKey();
TopicDescription topicDescription = entry.getValue();
System.out.println("Topic: " + topicName);
System.out.println("Partition count: " + topicDescription.partitions().size());
// 获取topic的配置信息
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Collections.singleton(configResource));
Map<ConfigResource, org.apache.kafka.clients.admin.Config> topicConfigs = describeConfigsResult.all().get();
// 打印topic的配置信息
for (Map.Entry<ConfigResource, org.apache.kafka.clients.admin.Config> configEntry : topicConfigs.entrySet()) {
ConfigResource resource = configEntry.getKey();
org.apache.kafka.clients.admin.Config config = configEntry.getValue();
System.out.println("Config: " + resource.name());
System.out.println(" " + config.entries());
}
}
} catch (ExecutionException | InterruptedException | TimeoutException e) {
e.printStackTrace();
}
}
}
其中,<kafka-bootstrap-servers>
是Kafka集群的启动服务器地址。
请注意,上述代码中使用了Kafka的AdminClient来与Kafka集群进行交互,因此需要添加对应的Kafka依赖。
推荐的腾讯云相关产品:腾讯云消息队列 CKafka。CKafka 是腾讯云自研的分布式消息中间件,提供高可靠、高可用、高吞吐的消息服务,完全兼容开源 Kafka 协议,适用于大数据流式计算、消息同步、日志采集、业务解耦、消息通讯等场景。
产品介绍链接地址:https://cloud.tencent.com/product/ckafka
领取专属 10元无门槛券
手把手带您无忧上云