Loading [MathJax]/jax/output/CommonHTML/config.js
部署DeepSeek模型,进群交流最in玩法!
立即加群
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Kafka发送消息时提示请求数据过大是怎么回事?

Kafka发送消息时提示请求数据过大是怎么回事?

作者头像
张乘辉
发布于 2019-09-25 06:09:07
发布于 2019-09-25 06:09:07
3.6K00
代码可运行
举报
文章被收录于专栏:后端进阶后端进阶
运行总次数:0
代码可运行
今天有个小伙伴跟我反馈,在 Kafka 客户端他明明设置了 batch.size 参数,以提高 producer 的吞吐量,但他发现报了如下错误:

然后我去服务器查看了下 producer 的配置,发现没有配置 max.request.size,默认值为 1048576,而他发送的消息大小为 1575543,因此报了这个异常。

然后接下来他跟我讲他已经在客户端配置了 batch.size 的值为 512000,按照这个值的作用,应该是大于这个值才会进行批量发送消息到 broker:

于是我又得去撸源码,搞清楚 Kafka 发送消息实现细节:

org.apache.kafka.clients.producer.KafkaProducer#doSend:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// ...

// 估算消息的字节大小
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                                                                   compressionType, serializedKey, serializedValue, headers);

// 确保消息大小不超过发送请求最大值 max.request.size,或者发送缓冲池发小 buffer.memory
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

if (transactionManager != null && transactionManager.isTransactional())
  transactionManager.maybeAddPartitionToTransaction(tp);

// 向 RecordAccumulator 追加消息
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                                                                 serializedValue, headers, interceptCallback, remainingWaitMs);

// 如果 batch 已经满了,那么就会唤醒sender线程发送批量消息
if (result.batchIsFull || result.newBatchCreated) {
  log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
  this.sender.wakeup();
}
return result.future;
// ...

org.apache.kafka.clients.producer.KafkaProducer#ensureValidRecordSize:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void ensureValidRecordSize(int size) {
  // 如果发送消息的大小比maxRequestSize大,就会抛异常
  if (size > this.maxRequestSize)
    throw new RecordTooLargeException("The message is " + size +
                                      " bytes when serialized which is larger than the maximum request size you have configured with the " +
                                      ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
                                      " configuration.");
  if (size > this.totalMemorySize)
    throw new RecordTooLargeException("The message is " + size +
                                      " bytes when serialized which is larger than the total memory buffer you have configured with the " +
                                      ProducerConfig.BUFFER_MEMORY_CONFIG +
                                      " configuration.");
}

从以上源码得出结论,Kafka 会首先判断本次消息大小是否大于 maxRequestSize,如果本次消息大小 maxRequestSize,则直接抛出异常,不会继续执行追加消息到 batch。

batch.size 是 Kafka producer 非常重要的参数,它的值对 Producer 的吞吐量有着非常大的影响,因为我们知道,收集到一批消息再发送到 broker,比每条消息都请求一次 broker,性能会有显著的提高,但 batch.size 设置得非常大又会给机器内存带来极大的压力,因此需要在项目中合理地增减 batch.size 值,才能提高 producer 的吞吐量。

这里来个扩展性的问题:

可能有人会问,如果 producer 发送的消息量非常少,少到不足以填满 batch,因此不足以触发 Sender 线程执行发送消息,那这时怎么办,其实这里还有一个参数与 batch.size 配合使用,叫 linger.ms,这个参数的作用是当达到了 linger.ms 时长后,不管 batch 有没有填满,都会立即发送消息。linger.ms 参数默认值为 0,即默认消息无需批量发送,这时就需要看项目需求来权衡了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-09-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端进阶 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
一条SQL更新语句是如何执行的?
这是图解MySQL的第2篇文章,这篇文章会通过一条SQL更新语句的执行流程让大家清楚地明白:
蝉沐风
2022/08/22
6080
一条SQL更新语句是如何执行的?
一条SQL是怎么执行的
对于写语句相对于读语句会稍微复杂,首先要知道MySQL的三个重要日志模块redo log(重做日志)、undo log(撤消日志)、binlog(归档日志),
闻说社
2024/12/12
910
一条SQL是怎么执行的
从Mysql架构看一条更新sql的执行过程
上一篇文章我们讲了一条查询sql时如何执行的. 今天我们继续来看下一条更新sql的执行流程.
架构狂人
2023/10/28
3750
从Mysql架构看一条更新sql的执行过程
一条Update语句的执行过程是怎样的?
这个问题大家在面试的时候大家都背过类似的题,而且网上也有很多答案,这里分享一个大致流程介绍,关于下图的介绍来自这里 执行一条 select 语句,期间发生了什么?。
小许code
2023/06/27
7140
一条Update语句的执行过程是怎样的?
一条 sql 的执行过程详解
如果这条sql是写操作(insert、update、delete),那么大致的过程如下,其中引擎层是属于 InnoDB 存储引擎的,因为InnoDB 是默认的存储引擎,也是主流的,所以这里只说明 InnoDB 的引擎层过程。由于写操作较查询操作更为复杂,所以先看一下写操作的执行图。方便后面解析。
BUG弄潮儿
2020/12/17
1.3K0
一条 sql 的执行过程详解
一条更新sql的完整执行流程(超详细)
查询流程,我们是不是再研究下更新流程、插入流程和删除流程? 一条查询sql的完整执行流程(从连接到引擎,穿插涉及到的知识,超详细) 在数据库里面,我们说的update操作其实包括了更新、插入和删除。如果大家有看过MyBatis的源码,应该知道Executor里面也只有doQuery()和doUpdate。的方法, 没有 doDelete()和 dolnsert()。 更新流程和查询流程有什么不同呢? 取到数据前和查询的基本流程也是一致的,也就是说,它也要经过解析器、优化器的处理,最后交给执行器。 区别就在于拿到符合条件的数据之后的操作。 但是,要学习更新的执行流程,我们需要先知道以下几个名词的含义: 贴图镇此博客(
向着百万年薪努力的小赵
2022/12/02
5190
一条更新sql的完整执行流程(超详细)
详解一条查询select语句和更新update语句的执行流程
前面几篇MySQL系列的文章介绍了索引,事务和锁相关知识,那么今天就让我们来看看当我们执行一条select语句和一条update语句的时候,MySQL要经过哪些步骤,才能返回我们想要的数据。
用户2781897
2020/09/10
2.4K0
MySQL架构与执行流程
  MySQL数据库自己用了也有两三年了,基本上只是掌握增删改查的sql语句,从没有思考过MySQL的内部到底是怎么根据sql查询数据的,包括索引的原理,只知道加了索引查的就快,不知道为什么加上索引效率就会提升,包括索引的限制和优化也知之甚少,所以决定开一专题来学习与记录MySQL。
会说话的丶猫
2020/08/06
8540
MySQL架构与执行流程
面试官:你说说一条更新SQL的执行过程?
在上一篇《面试官:你说说一条查询SQL的执行过程?》中描述了Mysql的架构分层,通过解析器、优化器和执行引擎完成一条SQL查询的过程,那这一篇续上继续说明一条更新SQL的执行过程。
艾小仙
2021/08/25
4590
面试官:你说说一条更新SQL的执行过程?
字节三面:详解一条 SQL 的执行过程
天天和数据库打交道,一天能写上几十条 SQL 语句,但你知道我们的系统是如何和数据库交互的吗?MySQL 如何帮我们存储数据、又是如何帮我们管理事务?....是不是感觉真的除了写几个 「select * from dual」外基本脑子一片空白?金三银四读者福利:整理好的MySQL实战笔记,金三银四面试资料集锦。
Java程序猿
2021/02/23
4010
【我在拉勾训练营学技术】Mysql 架构原理
mysql 数据库作为现在互联网企业首选的数据库,我们程序员就应该对它多一些了解,我在拉勾训练营学到第五阶段啦,了解了mysql 整体架构,记录下来。
程序员爱酸奶
2021/03/21
5500
头条二面: 详解一条 SQL 的执行过程|文末送书
天天和数据库打交道,一天能写上几十条 SQL 语句,但你知道我们的系统是如何和数据库交互的吗?MySQL 如何帮我们存储数据、又是如何帮我们管理事务?....是不是感觉真的除了写几个 「select * from dual」外基本脑子一片空白?这篇文章就将带你走进 MySQL 的世界,让你彻底了解系统到底是如何和 MySQL 交互的,MySQL 在接受到我们发送的 SQL 语句时又分别做了哪些事情。
kunge
2021/01/28
8500
头条二面: 详解一条 SQL 的执行过程|文末送书
MySQL数据库:SQL语句的执行过程
我们的系统在和 MySQL 数据库进行通信前,需要先和数据库建立连接,而这个功能就是由MySQL驱动底层帮我们完成的,建立完连接之后,我们只需要发送 SQL 语句就可以执行 CRUD 了。如下图所示:
全栈程序员站长
2022/06/29
3.7K0
MySQL数据库:SQL语句的执行过程
【MySQL 系列】MySQL 架构篇
Server 层:负责建立连接、分析和执行 SQL。MySQL 大多数的核心功能模块都在这实现,主要包括连接池,执行器、优化器、解析器、预处理器、查询缓存等。另外,所有的内置函数(如日期、时间、数学和加密函数等)和所有跨存储引擎的功能(如存储过程、触发器、视图等)都在 Server 层实现;
栗筝i
2024/03/19
2.5K0
【MySQL 系列】MySQL 架构篇
MySQL架构分析
MySQL架构分析 MySQL 的体系结构 MySQL 的模块详解 **Connectors**:用于支持各种语言与 **SQL** 交互; **Management Services & Utili
编程之心
2021/07/14
6780
MySQL架构分析
MySQL中一条更新的SQL如何执行
MySQL 之 -- 一条更新的 SQL 如何执行,一条更新的 SQL 语句如何执行执行流程一条 SQL 的执行流程如图所示:(图片来源于网络) 如图所示:MySQL 数据库主要分为两个层级:服务层和存储引擎层服务层:server 层包括连接器、查询缓存、分析器、优化器、执行器,包括大多数 MySQL 中的核心功能所有跨存储引擎的功能也在这一层实现,包括存储过程、触发器、视图等。 执行流程 一条 SQL 的执行流程如图所示:(图片来源于网络) 如图所示: MySQL 数据库主要分为两个层级:服务层和存
码农编程进阶笔记
2021/11/01
1.3K0
MySQL中一条更新的SQL如何执行
MySQL 存储引擎(2)原
顾名思义,存储引擎就是用于存储我们的数据的。在关系型数据库中我们一般将数据库存放在表中(Table)。
兜兜毛毛
2020/04/23
5660
MySQL事务(三)InnoDB存储引擎下SQL执行的缓存机制
在 MySQL架构(二)SQL 更新语句是如何执行的?中,小鱼介绍了SQL 更新语句的执行流程,文章中考虑初次介绍MySQL 架构,涉及到服务层的流程并没有展开介绍。
鳄鱼儿
2024/05/28
3000
MySQL事务(三)InnoDB存储引擎下SQL执行的缓存机制
SQL语句执行过程详解
  如果这条sql是写操作(insert、update、delete),那么大致的过程如下,其中引擎层是属于 InnoDB 存储引擎的,因为InnoDB 是默认的存储引擎,也是主流的,所以这里只说明 InnoDB 的引擎层过程。由于写操作较查询操作更为复杂,所以先看一下写操作的执行图。方便后面解析。
用户7353950
2022/05/11
2.4K0
SQL语句执行过程详解
MySQL架构(二)SQL 更新语句是如何执行的?
在上一篇文章中,我们从一个查询语句的执行流程知道了 MySQL 架构可分为 Server 层和存储引擎层,以及各个层级的具体部件。
鳄鱼儿
2024/05/21
3050
MySQL架构(二)SQL 更新语句是如何执行的?
推荐阅读
相关推荐
一条SQL更新语句是如何执行的?
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验