Kafka 起初是 由 LinkedIn 公司采用 Scala 语言开发的一个多分区、多副本且基于 ZooKeeper 协调的分布式消息系统,现已被捐献给 Apache 基金会。目前 Kafka 已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。目前越来越多的开源分布式处理系统如 Cloudera、Storm、Spark、Flink 等都支持与 Kafka 集成。
Kafka 之所以受到越来越多的青睐,与它所“扮演”的三大角色是分不开的:
基本概念
一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干 Consumer,以及一个 ZooKeeper 集群,如下图所示。其中 ZooKeeper 是 Kafka 用来负责集群元数据的管理、控制器的选举等操作的。Producer 将消息发送到 Broker,Broker 负责将收到的消息存储到磁盘中,而 Consumer 负责从 Broker 订阅并消费消息。
整个 Kafka 体系结构中引入了以下3个术语:
在 Kafka 中还有两个特别重要的概念—主题(Topic)与分区(Partition)。Kafka 中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到 Kafka 集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。
客户端开发
一个正常的生产逻辑需要具备以下几个步骤:
其中构建的消息对象 ProducerRecord,它并不是单纯意义上的消息,它包含了多个属性,原本需要发送的与业务相关的消息体只是其中的一个 value 属性,比如“Hello, Kafka!”只是 ProducerRecord 对象中的一个属性。ProducerRecord 类的定义如下(只截取成员变量)
其中 topic 和 partition 字段分别代表消息要发往的主题和分区号。headers 字段是消息的头部,Kafka 0.11.x 版本才引入这个属性,它大多用来设定一些与应用相关的信息,如无需要也可以不用设置。key 是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。前面提及消息以主题为单位进行归类,而这个 key 可以让消息再进行二次归类,同一个 key 的消息会被划分到同一个分区中。
必要的参数设置
在创建真正的生产者实例前需要配置相应的参数,比如需要连接的 Kafka 集群地址。参考在上面客户端代码中的 initConfig()方法,在 Kafka 生产者客户端 KafkaProducer 中有3个参数是必填的。
在上面的客户端开发中代码中 initConfig() 方法里还设置了一个参数 client.id,这个参数用来设定 KafkaProducer 对应的客户端id,默认值为“”。如果客户端不设置,则 KafkaProducer 会自动生成一个非空字符串,内容形式如“producer-1”、“producer-2”,即字符串“producer-”与数字的拼接。
KafkaProducer 中的参数众多,远非示例 initConfig()方法中的那样只有4个,开发人员可以根据业务应用的实际需求来修改这些参数的默认值,以达到灵活调配的目的。一般情况下,普通开发人员无法记住所有的参数名称,只能有个大致的印象。
在实际使用过程中,如“key.serializer”、“max.request.size”、“interceptor.classes”之类的字符串经常由于人为因素而书写错误。为此,我们可以直接使用客户端中的 org.apache.kafka.clients.producer.ProducerConfig 类来做一定程度上的预防措施,每个参数在 ProducerConfig 类中都有对应的名称,以代码清单3-1中的 initConfig()方法为例,引入 ProducerConfig 后的修改结果如下:
注意到上面的代码中 key.serializer 和 value.serializer 参数对应类的全限定名比较长,也比较容易写错,这里通过 Java 中的技巧来做进一步的改进,相关代码如下:
如此代码便简洁了许多,同时进一步降低了人为出错的可能性。在配置完参数之后,我们就可以使用它来创建一个生产者实例,示例如下:
KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例,也可以将 KafkaProducer 实例进行池化来供其他线程调用。
KafkaProducer 中有多个构造方法,比如在创建 KafkaProducer 实例时并没有设定 key.serializer 和 value.serializer 这两个配置参数,那么就需要在构造方法中添加对应的序列化器,示例如下:
- END -
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有