Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Kafka - TimeoutException: Expiring 1 record(s) for art-0:120001 ms has passed since batch creation

Kafka - TimeoutException: Expiring 1 record(s) for art-0:120001 ms has passed since batch creation

作者头像
小小工匠
发布于 2023-10-27 08:45:32
发布于 2023-10-27 08:45:32
1.6K00
代码可运行
举报
文章被收录于专栏:小工匠聊架构小工匠聊架构
运行总次数:0
代码可运行

问题描述

报错如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
....
....
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for art-0:120001 ms has passed since batch creation

原因分析

这种情况,肯定要先看网络问题嘛

  • 首先查看本机防火墙的配置

结果都是关闭的 (建议开放特定端口)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@localhost bin]# systemctl status firewalld.service
● firewalld.service - firewalld - dynamic firewall daemon
   Loaded: loaded (/usr/lib/systemd/system/firewalld.service; disabled; vendor preset: enabled)
   Active: inactive (dead)
     Docs: man:firewalld(1)
[root@localhost bin]#

不关闭防火墙,但是建议本机防火墙开放特定端口,可以使用如下命令 (使用root账户)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
firewall-cmd --list-ports

比如

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@localhost bin]# systemctl status  firewalld
● firewalld.service - firewalld - dynamic firewall daemon
   Loaded: loaded (/usr/lib/systemd/system/firewalld.service; disabled; vendor preset: enabled)
   Active: active (running) since Thu 2023-10-26 10:13:10 CST; 2min 36s ago
     Docs: man:firewalld(1)
 Main PID: 40311 (firewalld)
    Tasks: 2
   Memory: 28.1M
   CGroup: /system.slice/firewalld.service
           └─40311 /usr/bin/python2 -Es /usr/sbin/firewalld --nofork --nopid

Oct 26 10:13:10 localhost.localdomain systemd[1]: Starting firewalld - dynamic firewall daemon...
Oct 26 10:13:10 localhost.localdomain systemd[1]: Started firewalld - dynamic firewall daemon.
Oct 26 10:13:10 localhost.localdomain firewalld[40311]: WARNING: AllowZoneDrifting is enabled. This is considered an insecure configurati...it now.
Hint: Some lines were ellipsized, use -l to show in full.
[root@localhost bin]#
[root@localhost bin]#
[root@localhost bin]# firewall-cmd --zone=public --add-port=2181/tcp --permanent
success
[root@localhost bin]# firewall-cmd --zone=public --add-port=9092/tcp --permanent
success
[root@localhost bin]#
[root@localhost bin]# firewall-cmd --list-ports

[root@localhost bin]#
[root@localhost bin]# firewall-cmd --reload
success
[root@localhost bin]# firewall-cmd --list-ports
2181/tcp 9092/tcp
[root@localhost bin]#

  • 接着看看kafka中间件的配置, 问题就在这里

我并没有大改配置,具体的配置可参考 Kafak - 单机/集群快速安装指北(3.x版本)

如下的配置并没有修改

要解决这个问题,修改如上配置即可


Code

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.artisan.pc;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");

        // key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
        	// 同步阻塞 
            RecordMetadata art = kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-" + i)).get();
            System.out.println(art.topic());
            System.out.println("over - " +i);
        }

        // 5. 关闭资源
        kafkaProducer.close();

    }

}

可以看消费者的控制台程序,输出正常

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-10-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Kafka - 3.x Producer 生产者最佳实践
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。 所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置
小小工匠
2023/10/28
4500
Kafka - 3.x Producer 生产者最佳实践
玩转Kafka的生产者——分区器与多线程
上篇文章学习kafka的基本安装和基础概念,本文主要是学习kafka的常用API。其中包括生产者和消费者,
Janti
2018/08/20
1.8K0
玩转Kafka的生产者——分区器与多线程
5分钟带你体验一把 Kafka
新建一个名为 zk-single-kafka-single.yml 的文件,文件内容如下:
Guide哥
2020/05/07
9420
5分钟带你体验一把 Kafka
Linux系列(三):firewalld管理防火墙常用命令
1. 查看防火墙的状态 [root@localhost HMK]# firewall-cmd --state 查看防火墙的运行状态 not running [root@localhost HMK]# systemctl status firewalld.service 查看防火墙服务是否开启 ● firewalld.service - firewalld - dynamic firewall daemon Loaded: loaded (/usr/lib/systemd/system/firew
冰霜
2022/03/15
9920
Kafka - 异步/同步发送API
该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。
小小工匠
2023/10/27
4930
Kafka - 异步/同步发送API
进击消息中间件系列(五):Kafka 生产者 Producer
在消息发生的过程中,设计到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列线程将消息发给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。
民工哥
2023/08/22
4030
进击消息中间件系列(五):Kafka 生产者 Producer
Linux端口的开启的两种方法需要掌握
显示 success 表示成功 –zone=public 表示作用域为公共的 –add-port=443/tcp 添加 tcp 协议的端口端口号为 443 –permanent 永久生效,如果没有此参数,则只能维持当前 服 务生命周期内,重新启动后失效;
入门笔记
2022/11/14
1.6K0
Linux端口的开启的两种方法需要掌握
Kafka及Spring Cloud Stream
下载kafka http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
HUC思梦
2020/09/03
1.3K0
Kafka 2.3 Producer (0.9以后版本适用)
kafka0.9版本以后用java重新编写了producer,废除了原来scala编写的版本。
大数据流动
2019/09/11
4510
Kafka - 3.x Kafka 生产者分区技巧全面指北
消息在通过 send()方法发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。
小小工匠
2023/10/28
4780
Kafka - 3.x Kafka 生产者分区技巧全面指北
Centos7 安装 Keepalived+Nginx 双机热备HA 的正确姿势 并开机自启 实践笔记
我使用centos7X64最小化安装 CentOS-7-x86_64-Minimal-1708
cookily
2020/09/11
2.1K0
SpringBoot连接kafka——JavaDemo
Spring Boot是一个用于快速构建基于Spring框架的Java应用程序的框架。Kafka是一种分布式流处理平台,用于实时传输和处理大规模数据。通过Spring Boot与Kafka的连接,可以轻松地在Spring应用程序中使用Kafka进行数据流处理。
小明爱吃火锅
2023/10/06
8970
Kafka学习(四)-------- Kafka核心之Producer
通过https://www.cnblogs.com/tree1123/p/11243668.html 已经对consumer有了一定的了解。producer比consumer要简单一些。
大数据流动
2019/08/08
4470
搞它!!!Linux--深入介绍firewalld防火墙管理工具
firewall-cmd是firewalld防火墙自带的字符管理工具,可以用来设置firewalld防火墙的各种规则
不吃小白菜
2020/09/03
2.7K0
搞它!!!Linux--深入介绍firewalld防火墙管理工具
Centos7 安装 Nginx 的正确姿势 并设置开机自启 实践笔记
我使用centos7X64最小化安装 CentOS-7-x86_64-Minimal-1708
cookily
2020/09/11
1.4K0
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。
鱼找水需要时间
2023/02/16
2.8K0
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
CentOS 6和CentOS 7防火墙的关闭
CentOS6.5查看防火墙的状态: [linuxidc@localhost ~]$service iptable status   显示结果: [linuxidc@localhost ~]$service iptable status Redirecting to /bin/systemctl status  iptable.service ● iptable.service    Loaded: not-found (Reason: No such file or directory)    Activ
庞小明
2018/07/06
7.5K0
Centos7 安装 Tomcat8 伪集群 的正确姿势 并设置开机自启 实践笔记
我使用centos7X64最小化安装 CentOS-7-x86_64-Minimal-1708
cookily
2020/09/11
4690
【kafka系列】kafka之生产者发送消息实践
进入实战之前先熟悉一下topic的相关命令,使用终端命令查询创建一个新topic,用于后期实战; 特别注意:以下命令全部依据kafka文件目录中操作; 如果尚未安装kafka,请移步《centos7系统安装kafka》
沁溪源
2022/05/06
1.1K0
【kafka系列】kafka之生产者发送消息实践
CentOS 防火墙 firewall-cmd 命令
I Teach You 我教你
2023/07/18
6670
推荐阅读
相关推荐
Kafka - 3.x Producer 生产者最佳实践
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档