前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >你的消息太大,kafka受不了

你的消息太大,kafka受不了

作者头像
小四的技术之旅
发布2023-09-05 19:18:30
2630
发布2023-09-05 19:18:30
举报
文章被收录于专栏:小四的技术文章

前言

上周在进行自测的时候,kafka抛出一个RecordTooLargeException异常,从名字我们可以直接看出是消息太大了,导致发不出去而抛出异常,那么怎么应该怎么解决这个问题呢,其实很简单,要么将消息拆分得小一点,要么调节kafka层面的参数,依然它抛出这个异常,那么就证明超过了某个参数的阈值,由此我们可以有两种方式来处理这个问题,但是一切还要从我们的业务背景和数据结构去看这个问题。

业务背景

我们这边会将数据写入文件,通过FTP的方式,没产生数据,就往FTP里面追加,而这些数据都是需要保证不丢失的,由于业务的发展,我这边需要专门去处理这些文件,然后通过kafka投递给下游系统,所以自然需要解析文件,还得一条一条的解析后发送。

问题出现

一开始我看到文件都比较小,所以处理方式是只有这个文件的数据全部解析完成并成功投递kafka,那么我这边才记录这个文件处理成功,但是处理了很多个大文件过后,发现数据条数对不上,看日志是RecordTooLargeException异常,因为上面的处理方式是文件处理完成并全部投递到kafka才记录文件解析完成,所以这是有问题的,一个大文件可能有即使上百万条数据,难免会遇到很大的数据,所以只要一条没解析成功,那么后面的数据就不去解析了,这个文件就不算解析成功,所以应该要设计容错,并对数据进行监控和补偿。

处理问题

在得知是某些数据过大的问题,我就DEBUG去看源码,在kafka生产端的KafkaProducer类中,发现问题出在下面这方法中。

ensureValidRecordSize方法就是对消息的大小进行判断的,参数size就是我们所发送的消息的字节数,maxRequestSize就是允许消息的最大字节,因为没有进行设置,所以这个值使用的是默认值,默认为1M,所以就应该将maxRequestSize这个参数进行重新设置。

因为我们使用的是SpringBoot开发,于是通过yml方式配置,但是发现spring-kafka没提示这个属性,于是只有写一个Kafka的配置类,然后再读取yml文件内容进行配置

「配置类」

「yml文件」

通过上面的配置后,我们看到我将max.request.size参数的值设置为10M,这需要根据实际情况来,因为我在处理的过程中发现像比较大的数据行也只有个别。

如果在实际使用过程中数据比较大,那么可能需要拆分数据,不过如果数据不能拆分,那么我们应该考虑消息压缩方式,将数据压缩后再发送,然后在消费者进行解压,不过这种压缩是我们自己实现的,并不是kafka层面的压缩,kafka本身也提供了压缩功能,有兴趣可以了解一下。

扩展

上面设置了max.request.size参数,我们在上面的截图代码中看到第二个判断中有一个参数totalMemorySize,这个值是缓冲区大小,我们发送的消息并不会马上发送kafka服务端,而是会先放在内存缓冲区,然后kafka通过一个线程去取,然后发送,可通过buffer.memory设置,这个值的默认值为32M,所以我们在设置max.request.size的时候也要考虑一下这个值。

总结

有必要对kafka进行比较深一点的学习,这样在出现问题的时候能够快速定位,并且合理解决,当然,在业务处理的时候要充分考虑可能出现的问题,做好容错和相应的补偿方案。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-08-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 刘牌 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 业务背景
  • 问题出现
  • 处理问题
  • 扩展
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档