Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >如何开发一个完善的Kafka生产者客户端?

如何开发一个完善的Kafka生产者客户端?

作者头像
码农架构
发布于 2021-02-12 10:08:06
发布于 2021-02-12 10:08:06
1.6K0
举报
文章被收录于专栏:码农架构码农架构

Kafka 起初是 由 LinkedIn 公司采用 Scala 语言开发的一个多分区、多副本且基于 ZooKeeper 协调的分布式消息系统,现已被捐献给 Apache 基金会。目前 Kafka 已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。目前越来越多的开源分布式处理系统如 Cloudera、StormSpark、Flink 等都支持与 Kafka 集成。

Kafka 之所以受到越来越多的青睐,与它所“扮演”的三大角色是分不开的:

  • 消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
  • 存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
  • 流式处理平台: Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。

01

基本概念

一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干 Consumer,以及一个 ZooKeeper 集群,如下图所示。其中 ZooKeeper 是 Kafka 用来负责集群元数据的管理、控制器的选举等操作的。Producer 将消息发送到 Broker,Broker 负责将收到的消息存储到磁盘中,而 Consumer 负责从 Broker 订阅并消费消息。

整个 Kafka 体系结构中引入了以下3个术语:

  1. Producer: 生产者,也就是发送消息的一方。生产者负责创建消息,然后将其投递到 Kafka 中。
  2. Consumer: 消费者,也就是接收消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理。
  3. Broker: 服务代理节点。对于 Kafka 而言,Broker 可以简单地看作一个独立的 Kafka 服务节点或 Kafka 服务实例。大多数情况下也可以将 Broker 看作一台 Kafka 服务器,前提是这台服务器上只部署了一个 Kafka 实例。一个或多个 Broker 组成了一个 Kafka 集群。一般而言,我们更习惯使用首字母小写的 broker 来表示服务代理节点。

在 Kafka 中还有两个特别重要的概念—主题(Topic)与分区(Partition)。Kafka 中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到 Kafka 集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。

02

客户端开发

一个正常的生产逻辑需要具备以下几个步骤:

  1. 配置生产者客户端参数及创建相应的生产者实例。
  2. 构建待发送的消息。
  3. 发送消息。
  4. 关闭生产者实例。

其中构建的消息对象 ProducerRecord,它并不是单纯意义上的消息,它包含了多个属性,原本需要发送的与业务相关的消息体只是其中的一个 value 属性,比如“Hello, Kafka!”只是 ProducerRecord 对象中的一个属性。ProducerRecord 类的定义如下(只截取成员变量)

其中 topic 和 partition 字段分别代表消息要发往的主题和分区号。headers 字段是消息的头部,Kafka 0.11.x 版本才引入这个属性,它大多用来设定一些与应用相关的信息,如无需要也可以不用设置。key 是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。前面提及消息以主题为单位进行归类,而这个 key 可以让消息再进行二次归类,同一个 key 的消息会被划分到同一个分区中。

03

必要的参数设置

在创建真正的生产者实例前需要配置相应的参数,比如需要连接的 Kafka 集群地址。参考在上面客户端代码中的 initConfig()方法,在 Kafka 生产者客户端 KafkaProducer 中有3个参数是必填的。

  • bootstrap.servers:该参数用来指定生产者客户端连接 Kafka 集群所需的 broker 地址清单,具体的内容格式为 host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。注意这里并非需要所有的 broker 地址,因为生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要设置两个以上的 broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka 集群上。
  • key.serializervalue.serializer:broker 端接收的消息必须以字节数组(byte[])的形式存在。代码清单3-1中生产者使用的 KafkaProducer<String, String>和 ProducerRecord<String, String> 中的泛型 <String, String> 对应的就是消息中 key 和 value 的类型,生产者客户端使用这种方式可以让代码具有良好的可读性,不过在发往 broker 之前需要将消息中对应的 key 和 value 做相应的序列化操作来转换成字节数组。key.serializer 和 value.serializer 这两个参数分别用来指定 key 和 value 序列化操作的序列化器,这两个参数无默认值。

在上面的客户端开发中代码中 initConfig() 方法里还设置了一个参数 client.id,这个参数用来设定 KafkaProducer 对应的客户端id,默认值为“”。如果客户端不设置,则 KafkaProducer 会自动生成一个非空字符串,内容形式如“producer-1”、“producer-2”,即字符串“producer-”与数字的拼接。

KafkaProducer 中的参数众多,远非示例 initConfig()方法中的那样只有4个,开发人员可以根据业务应用的实际需求来修改这些参数的默认值,以达到灵活调配的目的。一般情况下,普通开发人员无法记住所有的参数名称,只能有个大致的印象。

在实际使用过程中,如“key.serializer”、“max.request.size”、“interceptor.classes”之类的字符串经常由于人为因素而书写错误。为此,我们可以直接使用客户端中的 org.apache.kafka.clients.producer.ProducerConfig 类来做一定程度上的预防措施,每个参数在 ProducerConfig 类中都有对应的名称,以代码清单3-1中的 initConfig()方法为例,引入 ProducerConfig 后的修改结果如下:

注意到上面的代码中 key.serializer 和 value.serializer 参数对应类的全限定名比较长,也比较容易写错,这里通过 Java 中的技巧来做进一步的改进,相关代码如下:

如此代码便简洁了许多,同时进一步降低了人为出错的可能性。在配置完参数之后,我们就可以使用它来创建一个生产者实例,示例如下:

KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例,也可以将 KafkaProducer 实例进行池化来供其他线程调用。

KafkaProducer 中有多个构造方法,比如在创建 KafkaProducer 实例时并没有设定 key.serializer 和 value.serializer 这两个配置参数,那么就需要在构造方法中添加对应的序列化器,示例如下:

- END -

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码农架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
「kafka」kafka-clients,java编写生产者客户端及原理剖析
构建的消息对象ProducerRecord并不是单纯意义上的消息,它包含了多个属性,原本需要发送的业务相关的消息体只是其中的一个value属性,比如“hello world”,ProducerRecord的源码如下:
源码之路
2020/09/04
1.6K0
「kafka」kafka-clients,java编写生产者客户端及原理剖析
开发 Kafka 消费者客户端需要注意哪些事项?
在了解了消费者与消费组之间的概念之后,我们就可以着手进行消费者客户端的开发了。在 Kafka 的历史中,消费者客户端同生产者客户端一样也经历了两个大版本:第一个是于 Kafka 开源之初使用 Scala 语言编写的客户端,我们可以称之为旧消费者客户端或 Scala 消费者客户端;第二个是从 Kafka 0.9.x 版本开始推出的使用 Java 编写的客户端,我们可以称之为新消费者客户端或 Java 消费者客户端,它弥补了旧客户端中存在的诸多设计缺陷。
码农架构
2021/02/05
1.2K0
开发 Kafka 消费者客户端需要注意哪些事项?
Kafka - 3.x Producer 生产者最佳实践
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。 所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置
小小工匠
2023/10/28
4300
Kafka - 3.x Producer 生产者最佳实践
Kafka基础(二):生产者相关知识汇总
本文章部分内容摘自 朱忠华老师的《深入理解Kafka:核心设计与实践原理》,也特别推荐广大读者购买阅读。
create17
2019/06/24
9580
Kafka 生产者与可靠性保证ACK(2)
消息发送的整体流程,生产端主要由两个线程协调运行。分别是main线程和sender线程(发送线程)。
兜兜毛毛
2021/03/02
6940
Kafka 生产者与可靠性保证ACK(2)
消息队列之Kafka-生产者
KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例,也可以将 KafkaProducer 实例进行池化来供其他线程调用。
conanma
2022/04/08
5040
Kafka 详解(三)------Producer生产者
  在第一篇博客我们了解到一个kafka系统,通常是生产者Producer 将消息发送到 Broker,然后消费者 Consumer 去 Broker 获取,那么本篇博客我们来介绍什么是生产者Producer。
IT可乐
2019/07/02
1K0
Java 实现 Kafka Producer
在本文章中,我们创建一个简单的 Java 生产者示例。我们会创建一个名为 my-topic Kafka 主题(Topic),然后创建一个使用该主题发送记录的 Kafka 生产者。Kafka 发送记录可以使用同步方式,也可以使用异步方式。
smartsi
2020/10/16
3.8K0
Kafka系列2:深入理解Kafka生产者
上篇聊了Kafka概况,包含了Kafka的基本概念、设计原理,以及设计核心。本篇单独聊聊Kafka的生产者,包括如下内容:
王金龙
2020/02/20
9990
玩转Kafka的生产者——分区器与多线程
上篇文章学习kafka的基本安装和基础概念,本文主要是学习kafka的常用API。其中包括生产者和消费者,
Janti
2018/08/20
1.8K0
玩转Kafka的生产者——分区器与多线程
初识 Kafka Producer 生产者
根据 KafkaProducer 类上的注释上来看 KafkaProducer 具有如下特征:
丁威
2019/11/06
1K0
初识 Kafka Producer 生产者
我们在学习Kafka的时候,到底在学习什么?
我在之前《Kafka源码阅读的一些小提示》写了一些关于Kafka源码阅读的注意事项。
大数据真好玩
2021/09/18
3270
我们在学习Kafka的时候,到底在学习什么?
Kafka - 3.x Kafka 生产者分区技巧全面指北
消息在通过 send()方法发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。
小小工匠
2023/10/28
4410
Kafka - 3.x Kafka 生产者分区技巧全面指北
Apache Kafka 生产者 API 详解
Apache Kafka 是一个高吞吐量、低延迟的分布式流处理平台,用于构建实时数据管道和流应用。在 Kafka 中,生产者负责将消息发布到 Kafka 集群。本文将详细演示 Kafka 生产者 API 的使用,包括配置、消息发送、错误处理和性能优化等内容。
九转成圣
2024/08/05
1160
03 Confluent_Kafka权威指南 第三章: Kafka 生产者:向kafka写消息
无论你将kafka当作一个队列、消息总线或者数据存储平台,你都需要通过一个生产者向kafka写入数据,通过一个消费者从kafka读取数据。或者开发一个同时具备生产者和消费者功能的程序来使用kafka。 例如,在信用卡交易处理系统中,有一个客户端的应用程序(可能是一个在线商店)在支付事物发生之后将每个事物信息发送到kafka。另外一个应用程序负责根据规则引擎去检查该事物,确定该事物是否被批准还是被拒绝。然后将批准/拒绝的响应写回kafka。之后kafka将这个事物的响应回传。第三个应用程序可以从kafka中读取事物信息和其审批状态,并将他们存储在数据库中,以便分析人员桑后能对决策进行检查并改进审批规则引擎。 apache kafka提供了内置的客户端API,开发者在开发与kafka交互的应用程序时可以使用这些API。 在本章中,我们将学习如何使用kafka的生产者。首先对其设计理念和组件进行概述。我们将说明如何创建kafkaProducer和ProducerRecord对象。如何发送信息到kafka,以及如何处理kafak可能返回的错误。之后,我们将回顾用于控制生产者行为的重要配置选项。最后,我们将深入理解如何使用不同的分区方法和序列化。以及如何编写自己的序列化器和分区器。 在第四章我们将对kafka消费者客户端和消费kafka数据进行阐述。
冬天里的懒猫
2020/08/03
3K0
【kafka系列】kafka之生产者发送消息实践
进入实战之前先熟悉一下topic的相关命令,使用终端命令查询创建一个新topic,用于后期实战; 特别注意:以下命令全部依据kafka文件目录中操作; 如果尚未安装kafka,请移步《centos7系统安装kafka》
沁溪源
2022/05/06
1.1K0
【kafka系列】kafka之生产者发送消息实践
Kafka 新版生产者 API
把消息发送给服务器,但并不关心它是否正常到达。大多数情况下,消息会正常到达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息。
CoderJed
2018/09/13
2.1K0
Kafka 新版生产者 API
Kafka 开发实战
其中KafkaProducer是⽤于发送消息的类,ProducerRecord类⽤于封装 Kafka 的消息。
用户7353950
2022/06/23
4570
Kafka 开发实战
进击消息中间件系列(五):Kafka 生产者 Producer
在消息发生的过程中,设计到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列线程将消息发给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。
民工哥
2023/08/22
3740
进击消息中间件系列(五):Kafka 生产者 Producer
基于kafka_2.11-2.1.0实现的生产者和消费者代码样例
遇到的坑,一开始报的错误莫名其妙,一开始以为使用的jar包版本问题,又是报slf4j的错误,又是报log4j的错误,又是报空指针的异常。最后百度意外遇到了可能是本地没有将ip地址放到hosts文件里面,果然是这个问题。
别先生
2019/06/03
2K0
相关推荐
「kafka」kafka-clients,java编写生产者客户端及原理剖析
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档