前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Kafka 发送消息过程中拦截器的用途?

Kafka 发送消息过程中拦截器的用途?

原创
作者头像
码农架构
修改于 2021-02-05 06:42:27
修改于 2021-02-05 06:42:27
9331
举报
文章被收录于专栏:码农架构码农架构

消息在通过 send() 方法发往 broker 的过程中,有可能需要经过拦截、序列化器 和 分区器 的一系列作用之后才能被真正地发往 broker。

拦截器是早在 Kafka 0.10.0.0 中就已经引入的一个功能,Kafka 一共有两种拦截器:生产者拦截器和消费者拦截器。这里主要讲述生产者拦截器的相关内容

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。

生产者拦截器的使用也很方便,主要是自定义实现 org.apache.kafka.clients.producer. ProducerInterceptor 接口。ProducerInterceptor 接口中包含3个方法:

KafkaProducer 在将消息序列化和计算分区之前会调用生产者拦截器的onSend() 方法来对消息进行相应的定制化操作。一般来说最好不要修改消息 ProducerRecord 的 topic、key 和 partition 等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏差。比如修改 key 不仅会影响分区的计算,同样会影响 broker 端日志压缩(Log Compaction)的功能。

KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement() 方法,优先于用户设定的 Callback 之前执行。这个方法运行在 Producer 的I/O线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。

close() 方法主要用于在关闭拦截器时执行一些资源的清理工作。在这3个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。

ProducerInterceptor 接口与 Partitioner 接口一样,它也有一个同样的父接口 Configurable,具体的内容可以参见 Partitioner 接口的相关介绍。

下面通过一个示例来演示生产者拦截器的具体用法,ProducerInterceptorPrefix 中通过 onSend() 方法来为每条消息添加一个前缀“prefix1-”,并且通过 onAcknowledgement() 方法来计算发送消息的成功率。ProducerInterceptorPrefix 类的具体实现如代码

实现自定义的 ProducerInterceptorPrefix 之后,需要在 KafkaProducer 的配置参数 interceptor.classes 中指定这个拦截器,此参数的默认值为“”。示例如下:

然后使用指定了 ProducerInterceptorPrefix 的生产者连续发送10条内容为“kafka”的消息,在发送完之后客户端打印出如下信息:

如果消费这10条消息,会发现消费了的消息都变成了“prefix1-kafka”,而不是原来的“kafka”。

KafkaProducer 中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes 参数配置的拦截器的顺序来一一执行(配置的时候,各个拦截器之间使用逗号隔开)。下面我们再添加一个自定义拦截器 ProducerInterceptorPrefixPlus,它只实现了 Interceptor 接口中的 onSend() 方法,主要用来为每条消息添加另一个前缀“prefix2-”,具体实现如下:

此时生产者再连续发送10条内容为“kafka”的消息,那么最终消费者消费到的是10条内容为“prefix2-prefix1-kafka”的消息。如果将 interceptor.classes 配置中的两个拦截器的位置互换:

那么最终消费者消费到的消息为“prefix1-prefix2-kafka”。

如果拦截链中的某个拦截器的执行需要依赖于前一个拦截器的输出,那么就有可能产生“副作用”。设想一下,如果前一个拦截器由于异常而执行失败,那么这个拦截器也就跟着无法继续执行。在拦截链中,如果某个拦截器执行失败,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
1 条评论
热度
最新
这篇文章对你有帮助吗?作为一名程序工程师,在评论区留下你的困惑或你的见解,大家一起来交流吧!
这篇文章对你有帮助吗?作为一名程序工程师,在评论区留下你的困惑或你的见解,大家一起来交流吧!
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
揭秘Kafka拦截器的神奇操作
在消息传递的舞台上,拦截器就像是一群守护神,负责保卫信息的流转。这些守门者在系统中扮演着至关重要的角色,为数据的安全和处理创造奇迹。本文将带你走进这个神奇的领域,探寻拦截器的神奇之处。
一只牛博
2025/05/30
390
揭秘Kafka拦截器的神奇操作
Kafka教程_图解kafka
推荐【Kafka教程】https://bigbird.blog.csdn.net/article/details/108770504 推荐【rabbitmq教程】https://bigbird.blog.csdn.net/article/details/81436980 推荐【Flink教程】https://blog.csdn.net/hellozpc/article/details/109413465 推荐【SpringBoot教程】https://blog.csdn.net/hellozpc/article/details/107095951 推荐【SpringCloud教程】https://blog.csdn.net/hellozpc/article/details/83692496 推荐【Mybatis教程】https://blog.csdn.net/hellozpc/article/details/80878563 推荐【SnowFlake教程】https://blog.csdn.net/hellozpc/article/details/108248227 推荐【并发限流教程】https://blog.csdn.net/hellozpc/article/details/107582771 推荐【JVM面试与调优教程】https://bigbird.blog.csdn.net/article/details/113888604
全栈程序员站长
2022/11/03
2.3K1
Kafka教程_图解kafka
kafka拦截器实现队列插队效果
突然出现一个任务需要对kafka处理的数据进行插队操作(内心小崩溃。。。),研究了一下,还是可以使用拦截器进行实现这样的效果的。
conanma
2022/04/11
3650
Kafka Producer拦截器(Interceptor)
本篇主要讲述的是Kafka Producer端的拦截器,它主要用来对消息进行拦截或者修改,也可以用于Producer的Callback回调之前进行相应的预处理。
JavaEdge
2021/10/18
3260
kafka系列之Producer 拦截器
Producer 拦截器 拦截器(interceptor)是个相当新的功能,它是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
程序狗
2021/12/20
3970
Kafka Producer拦截器(Interceptor)
本篇主要讲述Kafka Producer端拦截器,对消息进行拦截或修改,也可用于Producer的Callback回调之前进行预处理。
JavaEdge
2021/12/07
6300
Kafka Producer拦截器(Interceptor)
多图详解kafka生产者消息发送过程
KafkaProducer通过解析producer.propeties文件里面的属性来构造自己。 例如 :分区器、Key和Value序列化器、拦截器、RecordAccumulator消息累加器 、元信息更新器、启动发送请求的后台线程
石臻臻的杂货铺[同名公众号]
2022/09/26
6620
多图详解kafka生产者消息发送过程
KafkaProducer通过解析producer.propeties文件里面的属性来构造自己。 例如 :分区器、Key和Value序列化器、拦截器、RecordAccumulator消息累加器 、元信息更新器、启动发送请求的后台线程
石臻臻的杂货铺[同名公众号]
2022/04/30
1.9K0
多图详解kafka生产者消息发送过程
看完这篇Kafka,你也许就会了Kafka[通俗易懂]
Kafka是一种消息队列,主要用来处理大量数据状态下的消息队列,一般用来做日志的处理。既然是消息队列,那么Kafka也就拥有消息队列的相应的特性了。
全栈程序员站长
2022/08/24
17.2K4
看完这篇Kafka,你也许就会了Kafka[通俗易懂]
Kafka体系架构详细分解
我的个人博客排版更舒服: https://www.luozhiyun.com/archives/260
luozhiyun
2020/03/19
8640
数据源管理 | Kafka集群环境搭建,消息存储机制详解
Kafka集群中有一个broker会被选举为Controller,Controller依赖Zookeeper环境,管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。
知了一笑
2020/06/16
6320
Kafka基础(二):生产者相关知识汇总
本文章部分内容摘自 朱忠华老师的《深入理解Kafka:核心设计与实践原理》,也特别推荐广大读者购买阅读。
create17
2019/06/24
9940
「kafka」kafka-clients,java编写生产者客户端及原理剖析
构建的消息对象ProducerRecord并不是单纯意义上的消息,它包含了多个属性,原本需要发送的业务相关的消息体只是其中的一个value属性,比如“hello world”,ProducerRecord的源码如下:
源码之路
2020/09/04
1.7K0
「kafka」kafka-clients,java编写生产者客户端及原理剖析
kafka生产者Producer、消费者Consumer的拦截器interceptor
1、Producer的拦截器interceptor,和consumer端的拦截器interceptor是在kafka0.10版本被引入的,主要用于实现clients端的定制化控制逻辑,生产者拦截器可以用在消息发送前做一些准备工作,使用场景,如下所示:
别先生
2021/01/13
1.7K0
Kafka快速入门系列(11) | Kafka中如何自定义Interceptor及其原理
  Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。   对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
不温卜火
2020/10/28
6730
Kafka快速入门系列(11) | Kafka中如何自定义Interceptor及其原理
源码分析 Kafka 消息发送流程(文末附流程图)
从上文 初识 Kafka Producer 生产者,可以通过 KafkaProducer 的 send 方法发送消息,send 方法的声明如下:
丁威
2019/11/12
1.4K0
源码分析 Kafka 消息发送流程(文末附流程图)
连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka?
在介绍Producer端原理之前,大家先对其整体架构有一个大致的了解,图示如下所示:
爪哇缪斯
2023/09/05
1980
连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka?
Kafka扩展内容
Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor。
matt
2022/10/25
3530
Kafka消息分区&producer拦截器&无消息丢失(八)
上篇文章说了,acks,1代表什么都不管,即使配置了回调也不会起作用,0代表不会等待replic副本里的不会持久化,只要broker leader持久化成功则返回给producer。-1代表all,则表示全部持久化成功才返回成功给producer,Retries,batch.size:kafka,linger.ms,buffer.memory,compression.type等参数。
用户9919783
2022/12/14
4150
消息队列之Kafka-生产者
KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例,也可以将 KafkaProducer 实例进行池化来供其他线程调用。
conanma
2022/04/08
5190
相关推荐
揭秘Kafka拦截器的神奇操作
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档