前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka的运维利器-AdminClient

Kafka的运维利器-AdminClient

作者头像
王知无-import_bigdata
发布2021-11-18 13:44:16
1.9K0
发布2021-11-18 13:44:16
举报
文章被收录于专栏:大数据成神之路

前言

一般情况下,我们都习惯使用kafka-topics.sh脚本来管理主题,但有些时候我们希望将主题管理类的功能集成到公司内部的系统中,打造集管理、监控、运维、告警为一体的生态平台,那么就需要以程序调用API的方式去实现。

Kafka社区于0.11版本正式推出了Java客户端版的AdminClient,并不断地在后续的版本中对它进行完善。

本文主要介绍KafkaAdminClient 的基本使用方式,以及采用这种调用API方式下的创建主题时的合法性验证。

功能

鉴于社区还在不断地完善 AdminClient 的功能,AdminClient 提供的功能有以下几个大类。

  • 主题管理:包括主题的创建、删除和查询。
  • 权限管理:包括具体权限的配置与删除。
  • 配置参数管理:包括 Kafka 各种资源的参数设置、详情查询。所谓的 Kafka 资源,主要有 Broker、主题、用户、Client-id 等。
  • 副本日志管理:包括副本底层日志路径的变更和详情查询。
  • 分区管理:即创建额外的主题分区。
  • 消息删除:即删除指定位移之前的分区消息。
  • Delegation Token 管理:包括 Delegation Token 的创建、更新、过期和详情查询。
  • 消费者组管理:包括消费者组的查询、位移查询和删除。
  • Preferred 领导者选举:推选指定主题分区的 Preferred Broker 为领导者。

工作原理

AdminClient 是一个双线程的设计:前端主线程和后端 I/O 线程。

前端线程负责将用户要执行的操作转换成对应的请求,然后再将请求发送到后端 I/O 线程的队列中。

而后端 I/O 线程(kafka-admin-client-thread)从队列中读取相应的请求,然后发送到对应的 Broker 节点上,之后把执行结果保存起来,以便等待前端线程的获取。

使用

如果你使用的是 Maven,需要增加以下依赖项:

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.5</version>
</dependency>

构建AdminClient

代码语言:javascript
复制
/**
 * 创建AdminClient客户端对象
 */
public static AdminClient createAdminClientByProperties() {

  Properties prop = new Properties();

  // 配置Kafka服务的访问地址及端口号
  prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");

  // 创建AdminClient实例
  return AdminClient.create(prop);
}

/**
 * 创建AdminClient的第二种方式
 */
public static AdminClient createAdminClientByMap(){

  Map<String, Object> conf = Maps.newHashMap();

  // 配置Kafka服务的访问地址及端口号
  conf.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");

  // 创建AdminClient实例
  return AdminClient.create(conf);
}

创建Topic实例

代码语言:javascript
复制
private static final String TOPIC_NAME = "test_topic";

/**
 * 创建Topic实例
 */
public static void createTopic(){
    AdminClient adminClient = AdminSample.adminClient();
    //副本因子
    Short re = 1;
    NewTopic newTopic = new NewTopic(TOPIC_NAME,1,re);
    CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));
    System.out.println("CreateTopicsResult : " + createTopicsResult);
    adminClient.close();
}

查询Topic列表

代码语言:javascript
复制
private static final String TOPIC_NAME = "test_topic";

/**
 * 获取topic列表
 */
public static void topicList() throws Exception {
    AdminClient adminClient = adminClient();

    //是否查看Internal选项
    ListTopicsOptions options = new ListTopicsOptions();
    options.listInternal(true);

    //ListTopicsResult listTopicsResult = adminClient.listTopics();
    ListTopicsResult listTopicsResult = adminClient.listTopics(options);
    Set<String> names = listTopicsResult.names().get();

    //打印names
    names.stream().forEach(System.out::println);

    Collection<TopicListing> topicListings = listTopicsResult.listings().get();
    //打印TopicListing
    topicListings.stream().forEach((topicList) -> {
        System.out.println(topicList.toString());
    });
    adminClient.close();
}

删除topic

代码语言:javascript
复制
private static final String TOPIC_NAME = "test_topic";

/**
 * 删除topic
 */
public static void delTopic() throws Exception {
    AdminClient adminClient = adminClient();
    DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
    deleteTopicsResult.all().get();
}

描述topic

代码语言:javascript
复制
/**
 * 获取topic的描述信息
 */
public static void describeTopics(List<String> topics) throws Exception {
    // 创建AdminClient客户端对象
    AdminClient adminClient = BuildAdminClient.createAdminClientByProperties();

    // 获取Topic的描述信息
    DescribeTopicsResult result = adminClient.describeTopics(topics);

    // 解析描述信息结果, Map<String, TopicDescription> ==> topicName:topicDescription
    Map<String, TopicDescription> topicDescriptionMap = result.all().get();
    topicDescriptionMap.forEach((topicName, description) -> System.out.printf("topic name = %s, desc = %s \n", topicName, description));

    // 关闭资源
    adminClient.close();
}

查看 Topic 的配置信息

除了Kafka自身的配置项外,其内部的Topic也会有非常多的配置项,我们可以通过describeConfigs方法来获取某个Topic中的配置项信息。代码示例:

代码语言:javascript
复制
/**
 * 获取topic的配置信息
 */
public static void describeConfigTopics(List<String> topicNames) throws Exception {
    // 创建AdminClient客户端对象
    AdminClient adminClient = BuildAdminClient.createAdminClientByMap();

    List<ConfigResource> configResources = Lists.newArrayListWithCapacity(64);
    topicNames.forEach(topicName -> configResources.add(
            // 指定要获取的源
            new ConfigResource(ConfigResource.Type.TOPIC, topicName)));

    // 获取topic的配置信息
    DescribeConfigsResult result = adminClient.describeConfigs(configResources);

    // 解析topic的配置信息
    Map<ConfigResource, Config> resourceConfigMap = result.all().get();
    resourceConfigMap.forEach((configResource, config) -> System.out.printf("topic config ConfigResource = %s, Config = %s \n", configResource, config));

    // 关闭资源
    adminClient.close();
}

修改 Topic 的分区数量

在创建Topic时我们需要设定Partition的数量,但如果觉得初始设置的Partition数量太少了,那么就可以使用createPartitions方法来调整Topic的Partition数量,但是需要注意在Kafka中Partition只能增加不能减少。代码示例:

代码语言:javascript
复制
/**
 * 修改topic的分区数量
 * 只能增加不能减少
 */
public static void updateTopicPartition(List<String> topicNames, Integer partitionNum) throws Exception {
    // 创建AdminClient客户端对象
    AdminClient adminClient = BuildAdminClient.createAdminClientByMap();

    // 构建修改分区的topic请求参数
    Map<String, NewPartitions> newPartitions = Maps.newHashMap();
    topicNames.forEach(topicName -> newPartitions.put(topicName, NewPartitions.increaseTo(partitionNum)));

    // 执行修改
    CreatePartitionsResult result = adminClient.createPartitions(newPartitions);

    // get方法是一个阻塞方法,一定要等到createPartitions完成之后才进行下一步操作
    result.all().get();

    // 关闭资源
    adminClient.close();
}

社区于 0.11 版本正式推出了 Java 客户端版的 AdminClient 工具,该工具提供了几十种运维操作,而且它还在不断地演进着。如果可以的话,你最好统一使用 AdminClient 来执行各种 Kafka 集群管理操作,摒弃掉连接 ZooKeeper 的那些工具。另外,建议时刻关注该工具的功能完善情况,毕竟,目前社区对 AdminClient 的变更频率很高。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-11-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 功能
  • 工作原理
  • 使用
    • 构建AdminClient
      • 创建Topic实例
        • 查询Topic列表
          • 删除topic
            • 描述topic
              • 查看 Topic 的配置信息
                • 修改 Topic 的分区数量
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档