Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Kafka发送消息时提示请求数据过大是怎么回事?

Kafka发送消息时提示请求数据过大是怎么回事?

作者头像
张乘辉
发布于 2019-09-25 06:09:07
发布于 2019-09-25 06:09:07
3.6K00
代码可运行
举报
文章被收录于专栏:后端进阶后端进阶
运行总次数:0
代码可运行
今天有个小伙伴跟我反馈,在 Kafka 客户端他明明设置了 batch.size 参数,以提高 producer 的吞吐量,但他发现报了如下错误:

然后我去服务器查看了下 producer 的配置,发现没有配置 max.request.size,默认值为 1048576,而他发送的消息大小为 1575543,因此报了这个异常。

然后接下来他跟我讲他已经在客户端配置了 batch.size 的值为 512000,按照这个值的作用,应该是大于这个值才会进行批量发送消息到 broker:

于是我又得去撸源码,搞清楚 Kafka 发送消息实现细节:

org.apache.kafka.clients.producer.KafkaProducer#doSend:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// ...

// 估算消息的字节大小
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                                                                   compressionType, serializedKey, serializedValue, headers);

// 确保消息大小不超过发送请求最大值 max.request.size,或者发送缓冲池发小 buffer.memory
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

if (transactionManager != null && transactionManager.isTransactional())
  transactionManager.maybeAddPartitionToTransaction(tp);

// 向 RecordAccumulator 追加消息
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                                                                 serializedValue, headers, interceptCallback, remainingWaitMs);

// 如果 batch 已经满了,那么就会唤醒sender线程发送批量消息
if (result.batchIsFull || result.newBatchCreated) {
  log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
  this.sender.wakeup();
}
return result.future;
// ...

org.apache.kafka.clients.producer.KafkaProducer#ensureValidRecordSize:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void ensureValidRecordSize(int size) {
  // 如果发送消息的大小比maxRequestSize大,就会抛异常
  if (size > this.maxRequestSize)
    throw new RecordTooLargeException("The message is " + size +
                                      " bytes when serialized which is larger than the maximum request size you have configured with the " +
                                      ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
                                      " configuration.");
  if (size > this.totalMemorySize)
    throw new RecordTooLargeException("The message is " + size +
                                      " bytes when serialized which is larger than the total memory buffer you have configured with the " +
                                      ProducerConfig.BUFFER_MEMORY_CONFIG +
                                      " configuration.");
}

从以上源码得出结论,Kafka 会首先判断本次消息大小是否大于 maxRequestSize,如果本次消息大小 maxRequestSize,则直接抛出异常,不会继续执行追加消息到 batch。

batch.size 是 Kafka producer 非常重要的参数,它的值对 Producer 的吞吐量有着非常大的影响,因为我们知道,收集到一批消息再发送到 broker,比每条消息都请求一次 broker,性能会有显著的提高,但 batch.size 设置得非常大又会给机器内存带来极大的压力,因此需要在项目中合理地增减 batch.size 值,才能提高 producer 的吞吐量。

这里来个扩展性的问题:

可能有人会问,如果 producer 发送的消息量非常少,少到不足以填满 batch,因此不足以触发 Sender 线程执行发送消息,那这时怎么办,其实这里还有一个参数与 batch.size 配合使用,叫 linger.ms,这个参数的作用是当达到了 linger.ms 时长后,不管 batch 有没有填满,都会立即发送消息。linger.ms 参数默认值为 0,即默认消息无需批量发送,这时就需要看项目需求来权衡了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-09-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端进阶 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
源码分析 Kafka 消息发送流程(文末附流程图)
从上文 初识 Kafka Producer 生产者,可以通过 KafkaProducer 的 send 方法发送消息,send 方法的声明如下:
丁威
2019/11/12
1.4K0
源码分析 Kafka 消息发送流程(文末附流程图)
Kafka源码系列之0.10版本的Producer源码解析及性能点讲解
一,基础讲解 本文是基于kafka 0.10讲的,kafkaProducer模型和0.8的客户端模型大致是一样的,区别是0.8版本的会为每个Broker(有给定topic分区leader的Broker)创建一个SyncProducer,而0.10的Producer是用一个NioSelector实现实现了多链接的维护的。也是一个后台线程进行发送。基本步骤,也是定期获取元数据,将消息按照key进行分区后归类,每一类发送到正确的Broker上去。 再写kafka文章的原因是0.10版本后跟spark结合有了大的变
Spark学习技巧
2018/01/30
9540
Kafka生产者的使用和原理
本文将学习Kafka生产者的使用和原理,文中使用的kafka-clients版本号为2.6.0。下面进入正文,先通过一个示例看下如何使用生产者API发送消息。
草捏子
2020/09/14
1.1K0
Kafka生产者的使用和原理
kafka实践(十二):生产者(KafkaProducer)源码详解和调试
在前面已经完成win环境下zk(3.4.12版本)的运行,并对kafka源码编译, 参考:本地kafka源码的编译和调试,在idea的run-->debug-->中新增configuration来创建topic:yzg(3分区1备份),本地启动运行效果:
数据社
2020/12/08
8860
kafka实践(十二):生产者(KafkaProducer)源码详解和调试
kafka0.8生产者异常处理
本文简单解析一下kafka0.8.2.2版本中的java producer的异常处理。
code4it
2018/09/17
6560
kafka0.8生产者异常处理
3、深潜 kafka producer —— 核心架构
kafka 自定义了一套网络协议,我们可以使用任意语言来实现这套协议,实现向 kafka 集群 push message 以及从 kafka 集群 pull message 的效果。在 kafka 2.8.0 版本的源码中的 clients 模块就是官方默认提供的 Java 版本 producer、consumer 实现,我们本课时重点关注其中的 producer 部分实现。
杨四正
2021/05/18
6710
原理剖析| 一文搞懂 Kafka Producer(上)
今天给大家带来的是 Kafka Producer 的全方位解析(基于 Apache Kafka 3.72)。考虑到篇幅限制,本文分为上下两篇,上篇将介绍 Kafka Producer 的使用方法与实现原理,下篇将介绍 Kafka Producer 的实现细节与常见问题。
用户10807116
2024/05/27
8500
快速学习-Kafka API
Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。
cwl_java
2020/02/24
7520
快速学习-Kafka API
Kafka 生产者与可靠性保证ACK(2)
消息发送的整体流程,生产端主要由两个线程协调运行。分别是main线程和sender线程(发送线程)。
兜兜毛毛
2021/03/02
6970
Kafka 生产者与可靠性保证ACK(2)
彻底搞懂 Kafka 消息大小相关参数设置的规则
根据 Kafka 消息大小规则设定,生产端自行将 max.request.size 调整为 4M 大小,Kafka 集群为该主题设置主题级别参数 max.message.bytes 的大小为 4M。
张乘辉
2020/05/26
13.2K0
多图详解kafka生产者消息发送过程
KafkaProducer通过解析producer.propeties文件里面的属性来构造自己。 例如 :分区器、Key和Value序列化器、拦截器、RecordAccumulator消息累加器 、元信息更新器、启动发送请求的后台线程
石臻臻的杂货铺[同名公众号]
2022/09/26
6630
Kafka Producer整体架构概述及源码分析
它是一个ConcurrentMap,key是TopicPartition类,代表一个topic的一个partition。value是一个包含ProducerBatch的双端队列。等待Sender线程发送给broker。画张图来看下:
857技术社区
2022/05/17
5740
Kafka Producer整体架构概述及源码分析
初识 Kafka Producer 生产者
根据 KafkaProducer 类上的注释上来看 KafkaProducer 具有如下特征:
丁威
2019/11/06
1K0
初识 Kafka Producer 生产者
kafka学习二 -发送消息
从源码中我们发现在Sender的run方法中,并没有涉及到append追加操作。因此可以看到源码中,如果消息收集器中的消息收集结果为空或者新的消息批次已经创建好,进行sender唤醒,执行wakeup操作的,唤醒Sender线程的。因此可以看到核心代码就是append和sender线程唤醒启动,最终将发送的结果进行返回:
路行的亚洲
2020/09/29
2.4K0
kafka学习二 -发送消息
6 个技术点带你理解 Kafka 高性能背后的原理
Kafka 是一款性能非常优秀的消息队列,每秒处理的消息体量可以达到千万级别。今天来聊一聊 Kafka 高性能背后的技术原理。
jinjunzhu
2024/02/22
6410
6 个技术点带你理解 Kafka 高性能背后的原理
Kafka API操作实践
Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
zeekling
2022/06/17
5840
Kafka API操作实践
多图详解kafka生产者消息发送过程
KafkaProducer通过解析producer.propeties文件里面的属性来构造自己。 例如 :分区器、Key和Value序列化器、拦截器、RecordAccumulator消息累加器 、元信息更新器、启动发送请求的后台线程
石臻臻的杂货铺[同名公众号]
2022/04/30
1.9K0
多图详解kafka生产者消息发送过程
Kafka基础篇学习笔记整理
KafkaProducer会将消息先放入缓冲区中,然后由单独的sender线程异步发送到broker服务端,那么既然消息是批量发送的,那么触发批量发送的条件是什么呢?
大忽悠爱学习
2023/04/23
3.8K0
Kafka基础篇学习笔记整理
「kafka」kafka-clients,java编写生产者客户端及原理剖析
构建的消息对象ProducerRecord并不是单纯意义上的消息,它包含了多个属性,原本需要发送的业务相关的消息体只是其中的一个value属性,比如“hello world”,ProducerRecord的源码如下:
源码之路
2020/09/04
1.7K0
「kafka」kafka-clients,java编写生产者客户端及原理剖析
KafkaProducer源码分析
Broker:Kafka的服务端即Kafka实例,Kafka集群由一个或多个Broker组成,主要负责接收和处理客户端的请求
用户6070864
2019/09/04
6270
KafkaProducer源码分析
推荐阅读
相关推荐
源码分析 Kafka 消息发送流程(文末附流程图)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验