ContainerProperties设置了消息监听器、偏移量提交策略、轮询超时等参数,而ConsumerFactory负责创建Kafka原生消费者实例。...Spring Kafka的@KafkaListener支持批量消费模式,通过设置batchListener = true并配置fetch.min.bytes和fetch.max.wait.ms参数,消费者可以一次拉取多条消息进行处理...本节将深入分析消息处理中的常见异常、重试机制、死信队列(DLQ)的实现,并详细讲解 Spring Kafka 提供的错误处理策略,包括 @Retryable 注解和 SeekToCurrentErrorHandler...死信队列(DLQ)的实现 当消息经过多次重试仍无法处理时,应将其转移到死信队列(DLQ),以便后续审计和手动处理。...("重试处理", thrownException); } } } 故障恢复最佳实践 为确保系统可靠性,建议结合以下最佳实践:首先,启用消费者组的偏移量自动提交与手动提交结合,避免重复消费或消息丢失
发送消息时,指定key值,具有相同key的消息会被发送到同一个分区 ---- 如何避免重试导致消息顺序错乱 kafka生产者提供了消息发送的重试机制,也就是说消息发送失败后,kafka生产者会重新发送消息...它还支持一些高级特性,例如: 手动提交偏移量,以确保消息被完全处理后才提交偏移量。 支持批量处理消息,以提高处理效率。 提供了一些错误处理机制,例如重试和错误记录。...data); } } ---- 手动提交和自动提交偏移量 Spring Kafka监听器模式(spring.kafka.listener.type配置属性)有两种: single: 监听器消息参数是一个对象...设置时就提交一次偏移量 COUNT_TIME 超时或超数量 TIME或COUNT ,有一个条件满足时提交偏移量 MANUAL手动提交 手动调用Acknowledgment.acknowledge()进行消费...手动提交消费偏移量 # 禁用自动提交消费offset spring.kafka.consumer.enable-auto-commit: false # offset提交模式为manual_immediate
事务消息 默认情况下,Spring-kafka 自动生成的 KafkaTemplate 实例,是不具有事务消息发送能力的。...模式 手动 ACK 模式,由业务逻辑控制提交偏移量。...比如程序在消费时,有这种语义,特别异常情况下不确认 ack,也就是不提交偏移量,那么你只能使用手动 Ack 模式来做了。...开启手动首先需要关闭自动提交,然后设置下 consumer 的消费模式 spring.kafka.consumer.enable-auto-commit=false spring.kafka.listener.ack-mode...("input value: {}", input); } 消息重试和死信队列的应用 除了上面谈到的通过手动 Ack 模式来控制消息偏移量外,其实 Spring-kafka 内部还封装了可重试消费消息的语义
Ack模式 手动ACK模式,由业务逻辑控制提交偏移量。...比如程序在消费时,有这种语义,特别异常情况下不确认ack,也就是不提交偏移量,那么你只能使用手动Ack模式来做了。...开启手动首先需要关闭自动提交,然后设置下consumer的消费模式 spring.kafka.consumer.enable-auto-commit=false spring.kafka.listener.ack-mode...input) { logger.info("input value: {}", input); } 消息重试和死信队列的应用 除了上面谈到的通过手动Ack模式来控制消息偏移量外...,其实Spring-kafka内部还封装了可重试消费消息的语义,也就是可以设置为当消费数据出现异常时,重试这个消息。
模式 手动ACK模式,由业务逻辑控制提交偏移量。...比如程序在消费时,有这种语义,特别异常情况下不确认ack,也就是不提交偏移量,那么你只能使用手动Ack模式来做了。...开启手动首先需要关闭自动提交,然后设置下consumer的消费模式 spring.kafka.consumer.enable-auto-commit=false spring.kafka.listener.ack-mode...=manual 上面的设置好后,在消费时,只需要在@KafkaListener监听方法的入参加入Acknowledgment 即可,执行到ack.acknowledge()代表提交了偏移量 @KafkaListener...) { logger.info("input value: {}", input); } 消息重试和死信队列的应用 除了上面谈到的通过手动Ack模式来控制消息偏移量外,其实Spring-kafka
错误处理和重试:当消费者在处理消息时遇到错误,例如数据库连接失败或者网络故障,你可以使用ConsumerInterceptor来捕获这些错误并采取适当的措施。...你可以在拦截器中实现自定义的错误处理逻辑,例如记录错误日志、发送告警通知或者进行消息重试。 总之,ConsumerInterceptor为开发人员提供了在消费者端对消息进行拦截、处理和定制的能力。...retries: 0 # 失败重试次数,0表示不启用重试机制 batch-size: 16384 # 发送缓冲区大小,按照字节计算 linger-ms: 1 # 发送延时,单位毫秒...根据注释的描述,它可能会根据设定的规则计算消费失败率,并根据判断跳过或继续消费消息。 总体而言,这段代码定义了一个自定义的Kafka消费者拦截器。拦截器可以在消息消费和提交的过程中执行自定义的逻辑。...它使用了Spring Kafka提供的@KafkaListener注解来指定消费者的相关配置。
: kafka: bootstrap-servers: 127.0.0.1:9092 producer: # 发生错误后,消息重发的次数 ,0为不启用重试机制,默认int...true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: false # 自动提交的时间间隔 在Spring...@Override public void onFailure(Throwable ex) { System.out.println("发送消息失败...{ @Autowired ConsumerFactory consumerFactory; /** * 手动提交的监听器工厂 (使用的消费组工厂必须 kafka.consumer.enable-auto-commit...重复消费和漏消费 如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset(手动提交)过程做原子绑定。
默认情况下,错误处理程序跟踪失败的记录,在10次提交尝试后放弃,并记录失败的记录。但是,我们也可以将失败的消息发送到另一个主题。我们称这是一个毫无意义的话题。...此反序列化器包装委托反序列化器并捕获任何异常。然后将它们转发给侦听器容器,后者将它们直接发送给错误处理程序。异常包含源数据,因此可以诊断问题。...同样,Spring Boot会自动将消息转换器配置到容器中。下面是应用程序片段中的生产端类型映射。...x或更高版本和支持事务的kafka-clients版本(0.11或更高版本),在@KafkaListener方法中执行的任何KafkaTemplate操作都将参与事务,而侦听器容器将在提交事务之前向事务发送偏移量...它还增加了诸如错误处理、重试和记录筛选等功能——而我们只是触及了表面。
调用nack()时,将在对失败和丢弃的记录的分区执行索引和查找之前提交记录的偏移量,以便在下次poll()时重新传递这些偏移量。...>对象,其中包含每个偏移量和每个消息中的其他详细信息,但它必须是唯一的参数(除了使用手动提交时的Acknowledgment和/或Consumer参数)。...spring.kafka.producer.properties.* # 大于零时,启用失败发送的重试次数 spring.kafka.producer.retries spring.kafka.producer.ssl.key-password...,具有先后顺序,与消费者具有对应关系,消费者每消费一条消息,偏移量加1,并记录在消费者本地,并定期的将记录同步到服务端(Broker),这里的同步机制是可以设置的 消息是被持久化的,当组内所有消费者重新订阅主题时...整个发布订阅的实现只使用了跟Kafka相关的@KafkaListener注解接收消息和KafkaTemplate模板发送消息,很是简单。
常见问题 9.1 生产者同步和异步消息 生产者发送消息给 broker,之后 broker 会响应 ack 给生产者,生产者等待接收 ack 信号 3 秒,超时则重试 3 次 生产者 ack 确认配置:...ack = 0:不需要同步消息 ack = 1:则 leader 收到消息,并保存到本地 log 之后才响应 ack 信息 ack 默认配置为 2 9.2 消费者自动提交和手动提交 自动提交:消费者...pull 消息之后马上将自身的偏移量提交到 broker 中,这个过程是自动的 手动提交:消费者 pull 消息时或之后,在代码里将偏移量提交到 broker 二者区别:防止消费者 pull 消息之后挂掉...,在消息还没消费但又提交了偏移量 9.3 消息丢失和重复消费 消息丢失 生产者:配置 ack ,以及配置副本和分区数值一致 消费者:设置手动提交 重复消费 设置唯一主键,Mysql 主键唯一则插入失败...分布式锁 9.4 顺序消费方案 生产者:关闭重试,使用同步发送,成功了再发下一条 消费者:消息发送到一个分区中,只有一个消费组的消费者能接收消息
精确一次语义(Exactly-Once) 幂等生产者:通过唯一 PID 和 Sequence Number 避免消息重复发送。... 发送消息: @Component public class KafkaProducer { @Autowired private KafkaTemplate<String,...常见问题 消息重复消费:启用幂等生产者或消费者手动提交偏移量(enable.auto.commit=false)。 分区数据倾斜:优化 Key 选择策略(如哈希均匀分布)。...五、总结 Kafka 作为分布式消息系统的标杆,与 Spring Boot 集成可快速构建高吞吐、高可靠的实时数据管道。开发者需重点关注: 分区与副本设计:保障数据均匀分布和高可用。...通过 Spring Kafka 的 KafkaTemplate 和 @KafkaListener,可简化消息生产与消费逻辑。
application.yml中引入kafka相关配置 kafka服务配置.png spring: kafka: bootstrap-servers: 172.101.203.33...acks: 1 consumer: # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D...: earliest # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 enable-auto-commit...{ @Override public void onFailure(Throwable throwable) { //发送失败的处理...log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage()); }
kafka 事务 kafka 的事务是从0.11 版本开始支持的,kafka 的事务是基于 Exactly Once 语义的,它能保证生产或消费消息在跨分区和会话的情况下要么全部成功要么全部失败 生产者事务...生产者事务的场景: 一批消息写入 a、b、c 三个分区,如果 ab写入成功而c失败,那么kafka就会根据事务的状态对消息进行回滚,将ab写入的消息剔除掉并通知 Producer 投递消息失败。...可能会给多个topic发送消息,需要保证消息要么全部发送成功要么全部发送失败(操作的原子性); 消费者 消费一个topic,然后做处理再发到另一个topic,这个消费和转发的动作应该在同一事物中; 如果下游消费者只有等上游消息事务提交以后才能读到...事务消息 Spring-kafka自动注册的KafkaTemplate实例是不具有事务消息发送能力的。...,也可以通过手动确认,开启手动首先需要关闭自动提交,然后设置下consumer的消费模式: spring.kafka.consumer.enable-auto-commit=false spring.kafka.listener.ack-mode
但很多时候我们会使用spring-kafka来简化开发,可是spring-kafka原生的配置项并没提供多个kafka配置,因此本文就来聊聊如何将spring-kafka进行改造,使之能支持多个kafka...# 生产者重试的次数 retries: ${KAFKA_PRODUCER_RETRIES:0} # 每次批量发送的数据量...:earliest} # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量...# 生产者重试的次数 retries: ${KAFKA_PRODUCER_RETRIES:0} # 每次批量发送的数据量 batch-size...:earliest} # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
手动提交和自动提交是Kafka两种客户端的偏移量提交方式,提交方式的配置选项是enable.auto.commit,默认情况下该选项为ture。 偏移量提交是什么?...每过5秒,消费者客户端就会自动提交最大偏移量。 如果enable.auto.commit为false代表提交方式为手动提交,我们需要让消费者客户端消费程序执行后提交当前的最大偏移量。...(1)手动提交 手动提交需要消费者客户端在消费消息后手动提交消息,手动提交的方式又分为同步提交、异步提交。...Kafka的消息只有在所有分区副本都同步该消息后,才算是已提交的消息 在分区复制的过程中,首领分区会在发送的数据里加入当前高水位。当前高水位就是复制偏移量,记录了当前已提交消息的最大偏移量。...以上的各种acks情况如果失败的话,我们可以让生产者继续重试发送消息,直到Kafka返回成功。
消息有序性 应用场景: 即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致 充值转账两个渠道在同一个时间进行余额变更,短信通知必须要有顺序 topic分区中消息只能由消费者组中的唯一一个消费者处理...消费者手提提交的三种模式 kafka不会像其他JMS队列那样需要得到消费者的确认,消费者可以使用kafka来追踪消息在分区的位置(偏移量) 消费者会往一个叫做_consumer_offset的特殊主题发送消息...,消息里包含了每个分区的偏移量。...提交偏移量的方式有两种,分别是自动提交偏移量和手动提交 自动提交偏移量 当enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll...()方法接收的最大偏移量提交上去 手动提交 ,当enable.auto.commit被设置为false可以有以下三种提交方式 提交当前偏移量(同步提交) 异步提交 同步和异步组合提交 手动提交
当然我们也可以不手动创建topic,在执行代码kafkaTemplate.send(“topic1”, normalMessage)发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的...###########【初始化生产者配置】########### # 重试次数 spring.kafka.producer.retries=0 # 应答级别:多少个分区副本备份完成时向生产者发送ack...=0 # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size...# 是否自动提交offset spring.kafka.consumer.enable-auto-commit=true # 提交offset延时(接收到消息后多久提交offset) spring.kafka.consumer.auto.commit.interval.ms...-> { System.out.println("发送消息失败:" + failure.getMessage()); }); } @GetMapping("/kafka/callbackTwo
偏移量管理:从提交到监控 偏移量(Offset)管理是Kafka Consumer可靠性的基石。消费者通过提交偏移量来记录消费进度,支持自动提交和手动提交两种模式。...自动提交由参数enable.auto.commit控制,默认每隔5秒提交一次,简单但可能重复消费或丢失消息;手动提交则通过commitSync或commitAsync方法实现,提供更精确的控制,但需开发者处理提交失败的重试逻辑...Spring Boot通过Spring Kafka项目提供了简洁的开发者体验,支持注解驱动的消费者配置,例如@KafkaListener简化了消息监听逻辑。...建议在处理成功后手动提交偏移量,并结合幂等性设计(如为消息生成唯一ID)确保最终一致性。 资源泄漏:线程池或Consumer实例未正确关闭可能导致资源泄漏。...结合2025年的最佳实践,推荐使用重试机制和死信队列(DLQ)处理消费失败的消息: @Service public class SpringKafkaConsumerService { @KafkaListener
}catch (Exception e){ e.printStackTrace(); } -- 异步发送:调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数 //发送消息...所以,如果你想要顺序的处理 Topic 的所有消息,那就只提供一个分区 提交和偏移量 kafka 不会像其他 JMS 队列那样需要得到消费者的确认,消费者可以使用 kafka 来追踪消息在分区的位置(偏移量...如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡 偏移量 如果提交偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理 如果提交的偏移量大于客户端的最后一个消息的偏移量...,那么处于两个偏移量之间的消息将会丢失 偏移量提交方式 -- 自动提交 当 enable.auto.commit 被设置为 true,提交方式就是让消费者自动提交偏移量,每隔 5 秒消费者会自动把从...poll() 方法接收的最大偏移量提交上去 -- 手动提交 当enable.auto.commit被设置为false可以有以下三种提交方式 •提交当前偏移量(同步提交) •异步提交 •同步和异步组合提交
处理流程:开启事务从TopicA拉消息业务处理往TopicB发送结果消息在事务内提交消费offset提交事务->要么TopicB消息+offset同时可见,要么都回滚,下次重试。...;}}3.事务性消费+生产示例说明:从input-topic消费;处理后发到output-topic;整个方法在Kafka事务内执行:发送消息+offset提交要么都成功要么都失败。...出异常时事务回滚:既不提交offset,也不让下游看到半成品消息,从而实现Kafka内的`Exactly-Once`语义。发送消息+offset提交要么都成功要么都失败,是怎么做到的?...Kafka能做到「发送消息+offset提交要么都成功要么都失败」,靠的是它自己的事务协议+幂等生产者+把offset也当成一类数据写入事务日志。...如果因为重试导致同一条消息再次发送,sequence不变,Broker会识别并丢弃重复写。这样解决了「重试导致的重复消息」。