前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

作者头像
小小工匠
发布2021-08-17 16:42:31
2K0
发布2021-08-17 16:42:31
举报
文章被收录于专栏:小工匠聊架构

在这里插入图片描述
在这里插入图片描述

官方说明

https://kafka.apache.org/documentation/

选择对应的版本,我这里选的是 2.4.X

https://kafka.apache.org/24/documentation.html

选择

在这里插入图片描述
在这里插入图片描述

https://kafka.apache.org/24/documentation.html#consumerconfigs

查找 auto.offset.reset

在这里插入图片描述
在这里插入图片描述

让我们来品一品官方的解读


参数解读

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted)

no initial offset || current offset does not exist

no initial offset

举个例子 在消费组ConsumerGroupA里有个消费者A1, 已经消费到了100条数据, 这个时候你又新起了一个消费者, 但是呢这个新起的消费者的消费组和消费组A的名称不同,我们暂且称之为消费组ConsumerGroupB. 根据kafka的机制, 这个新起的消费组中的消费者再消费分区数据的时候,auto.offset.reset参数就起作用了

current offset does not exist

我们知道kafka提供了API可以按照消费offset记录继续消费,如果指定的offset不存在,那么 这个参数也会生效

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer’s group
  • anything else: throw exception to the consumer.

啥意思?。。。。。。。

当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费

  • latest(默认) :只消费自己启动之后发送到主题的消息
  • earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)

Code

在这里插入图片描述
在这里插入图片描述

POM依赖

代码语言:javascript
复制
	<dependencies>
		<dependency>
			<groupId>org.springframework.bootgroupId>
			<artifactId>spring-boot-starter-webartifactId>
		dependency>

		
		<dependency>
			<groupId>org.springframework.kafkagroupId>
			<artifactId>spring-kafkaartifactId>
		dependency>

		<dependency>
			<groupId>org.springframework.bootgroupId>
			<artifactId>spring-boot-starter-testartifactId>
			<scope>testscope>
		dependency>
		<dependency>
			<groupId>junitgroupId>
			<artifactId>junitartifactId>
			<scope>testscope>
		dependency>
	dependencies>

配置文件

在这里插入图片描述
在这里插入图片描述

生产者

代码语言:javascript
复制
 package com.artisan.springkafka.producer;

import com.artisan.springkafka.constants.TOPIC;
import com.artisan.springkafka.domain.MessageMock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.Random;
import java.util.concurrent.ExecutionException;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:25
 * @mark: show me the code , change the world
 */

@Component
public class ArtisanProducerMock {


    @Autowired
    private KafkaTemplate<Object,Object> kafkaTemplate ;


    /**
     * 同步发送
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public SendResult sendMsgSync() throws ExecutionException, InterruptedException {
        // 模拟发送的消息
        Integer id = new Random().nextInt(100);
        MessageMock messageMock = new MessageMock(id,"artisanTestMessage-" + id);
        // 同步等待
       return  kafkaTemplate.send(TOPIC.TOPIC, messageMock).get();
    }



    public ListenableFuture<SendResult<Object, Object>> sendMsgASync() throws ExecutionException, InterruptedException {
        // 模拟发送的消息
        Integer id = new Random().nextInt(100);
        MessageMock messageMock = new MessageMock(id,"messageSendByAsync-" + id);
        // 异步发送消息
        ListenableFuture<SendResult<Object, Object>> result = kafkaTemplate.send(TOPIC.TOPIC, messageMock);
        return result ;

    }

}

消费者

代码语言:javascript
复制
 package com.artisan.springkafka.consumer;

import com.artisan.springkafka.domain.MessageMock;
import com.artisan.springkafka.constants.TOPIC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:33
 * @mark: show me the code , change the world
 */

@Component
public class ArtisanCosumerMock {


    private Logger logger = LoggerFactory.getLogger(getClass());
    private static final String CONSUMER_GROUP_PREFIX = "ConsumerGroup-" ;

    /**
     * 消费者的groupId ,每次启动都通过SPEL 随机一个出来,确保每次都是一个新的消费组 用于测试 auto.offset.reset 参数 设置为 earliest的情况
     * @param messageMock
     */
    @KafkaListener(topics = TOPIC.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPIC.TOPIC +  "-" + "#{T(java.util.UUID).randomUUID()})")
    public void onMessage(MessageMock messageMock){
        logger.info("【接受到消息][线程:{} 消息内容:{}]", Thread.currentThread().getName(), messageMock);
    }

}

看注释


单元测试

代码语言:javascript
复制
 package com.artisan.springkafka.produceTest;

import com.artisan.springkafka.SpringkafkaApplication;
import com.artisan.springkafka.producer.ArtisanProducerMock;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.support.SendResult;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:40
 * @mark: show me the code , change the world
 */

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringkafkaApplication.class)
public class ProduceMockTest {

    private Logger logger = LoggerFactory.getLogger(getClass());


    @Autowired
    private ArtisanProducerMock artisanProducerMock;

    @Test
    public void testSyncSend() throws ExecutionException, InterruptedException {
        SendResult sendResult = artisanProducerMock.sendMsgSync();

        logger.info("testSyncSend Result =  topic:[{}] , partition:[{}], offset:[{}]",
                sendResult.getRecordMetadata().topic(),
                sendResult.getRecordMetadata().partition(),
                sendResult.getRecordMetadata().offset());

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }


    @Test
    public void testAsynSend() throws ExecutionException, InterruptedException {
            artisanProducerMock.sendMsgASync().addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
                @Override
                public void onFailure(Throwable throwable) {
                    logger.info(" 发送异常{}]]", throwable);

                }
                @Override
                public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
                    logger.info("回调结果 Result =  topic:[{}] , partition:[{}], offset:[{}]",
                            objectObjectSendResult.getRecordMetadata().topic(),
                            objectObjectSendResult.getRecordMetadata().partition(),
                            objectObjectSendResult.getRecordMetadata().offset());
                }
            });

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();

    }

}

测试

看结果之前,先看看当前topic中的数据

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

earliest

earliest 按照预期,新起了一个ConsumerGroup , 肯定会从头消费 ,让我们来验证下

代码语言:javascript
复制
2021-02-23 16:54:38.014  INFO 5684 --- [           main] c.a.s.produceTest.ProduceMockTest        : testSyncSend Result =  topic:[MOCK_TOPIC] , partition:[0], offset:[51]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage9'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=68, name='artisanTestMessage-68'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=48, name='artisanTestMessage-48'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=49, name='artisanTestMessage-49'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=81, name='artisanTestMessage-81'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=60, name='artisanTestMessage-60'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=51, name='artisanTestMessage-51'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=51, name='artisanTestMessage-51'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=60, name='artisanTestMessage-60'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=6, name='artisanTestMessage-6'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=23, name='artisanTestMessage-23'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=37, name='messageSendByAsync-37'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=25, name='messageSendByAsync-25'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=16, name='messageSendByAsync-16'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=57, name='messageSendByAsync-57'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=79, name='messageSendByAsync-79'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=19, name='messageSendByAsync-19'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=42, name='messageSendByAsync-42'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=70, name='messageSendByAsync-70'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=24, name='messageSendByAsync-24'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=84, name='messageSendByAsync-84'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=56, name='messageSendByAsync-56'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=82, name='messageSendByAsync-82'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=25, name='messageSendByAsync-25'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=6, name='messageSendByAsync-6'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=94, name='messageSendByAsync-94'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=20, name='messageSendByAsync-20'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=52, name='messageSendByAsync-52'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=24, name='messageSendByAsync-24'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=32, name='messageSendByAsync-32'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=13, name='messageSendByAsync-13'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=45, name='messageSendByAsync-45'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=29, name='messageSendByAsync-29'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=18, name='messageSendByAsync-18'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=13, name='messageSendByAsync-13'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=57, name='messageSendByAsync-57'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=53, name='messageSendByAsync-53'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=46, name='messageSendByAsync-46'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=33, name='messageSendByAsync-33'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=49, name='artisanTestMessage-49'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=99, name='artisanTestMessage-99'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=83, name='artisanTestMessage-83'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=61, name='artisanTestMessage-61'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=79, name='artisanTestMessage-79'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=99, name='artisanTestMessage-99'}]
2021-02-23 16:54:38.075  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=43, name='artisanTestMessage-43'}]

latest(默认)

代码语言:javascript
复制
2021-02-23 16:55:21.541  INFO 21472 --- [           main] c.a.s.produceTest.ProduceMockTest        : testSyncSend Result =  topic:[MOCK_TOPIC] , partition:[0], offset:[52]
2021-02-23 16:55:21.587  INFO 21472 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=79, name='artisanTestMessage-79'}]

none

代码语言:javascript
复制
2021-02-23 16:55:44.726 ERROR 19956 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-ConsumerGroup-MOCK_TOPIC-d52dc4e5-535f-4151-ad5a-85b7c0488b9d)-1, groupId=ConsumerGroup-MOCK_TOPIC-d52dc4e5-535f-4151-ad5a-85b7c0488b9d)] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on invocation of onPartitionsAssigned for partitions [MOCK_TOPIC-0]

org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [MOCK_TOPIC-0]
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:671) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2422) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1718) [kafka-clients-2.6.0.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.seekPartitions(KafkaMessageListenerContainer.java:1031) [spring-kafka-2.6.4.jar:2.6.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$3800(KafkaMessageListenerContainer.java:462) [spring-kafka-2.6.4.jar:2.6.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:2631) ~[spring-kafka-2.6.4.jar:2.6.4]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) [kafka-clients-2.6.0.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1269) [spring-kafka-2.6.4.jar:2.6.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1160) [spring-kafka-2.6.4.jar:2.6.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1073) [spring-kafka-2.6.4.jar:2.6.4]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_261]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_261]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]

2021-02-23 16:55:44.726 ERROR 19956 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : No offset and no reset policy

org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [MOCK_TOPIC-0]
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:671) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2422) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1718) ~[kafka-clients-2.6.0.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.seekPartitions(KafkaMessageListenerContainer.java:1031) ~[spring-kafka-2.6.4.jar:2.6.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$3800(KafkaMessageListenerContainer.java:462) ~[spring-kafka-2.6.4.jar:2.6.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:2631) ~[spring-kafka-2.6.4.jar:2.6.4]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1269) ~[spring-kafka-2.6.4.jar:2.6.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1160) ~[spring-kafka-2.6.4.jar:2.6.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1073) ~[spring-kafka-2.6.4.jar:2.6.4]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_261]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_261]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]

2021-02-23 16:55:44.726 ERROR 19956 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Fatal consumer exception; stopping container
2021-02-23 16:55:44.730  INFO 19956 --- [ntainer#0-0-C-1] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService
2021-02-23 16:55:44.749  INFO 19956 --- [           main] c.a.s.produceTest.ProduceMockTest        : testSyncSend Result =  topic:[MOCK_TOPIC] , partition:[0], offset:[53]

exception

在这里插入图片描述
在这里插入图片描述

后两种设置,看看就行

懂了么,老兄 ~


源码地址

https://github.com/yangshangwei/boot2/tree/master/springkafka_auto.offset.reset

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/02/23 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 官方说明
  • 参数解读
  • Code
    • POM依赖
      • 配置文件
        • 生产者
          • 消费者
            • 单元测试
              • 测试
                • earliest
                • latest(默认)
                • none
                • exception
            • 源码地址
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档