首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

如何在Spring Kafka中关闭offsets提交,以便在本地存储offsets?

在Spring Kafka中关闭offsets提交,以便在本地存储offsets,可以通过以下步骤实现:

  1. 创建一个自定义的KafkaConsumerFactory,用于配置Kafka消费者的属性。在该工厂中,设置enable.auto.commit属性为false,以禁用自动提交offsets。
代码语言:txt
复制
@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁用自动提交
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}
  1. 创建一个KafkaListenerContainerFactory,用于配置Kafka监听容器的属性。在该工厂中,设置AckModeMANUAL_IMMEDIATE,以手动提交offsets。
代码语言:txt
复制
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); // 手动提交
    return factory;
}
  1. 创建一个Kafka消息监听器,用于处理接收到的消息。在该监听器中,通过调用Acknowledgment对象的acknowledge()方法手动提交offsets。
代码语言:txt
复制
@KafkaListener(topics = "topic-name", groupId = "group-id")
public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
    // 处理消息
    System.out.println("Received message: " + record.value());
    
    // 手动提交offsets
    acknowledgment.acknowledge();
}

通过以上步骤,我们成功地关闭了offsets的自动提交,并在消息处理完成后手动提交offsets,从而实现了在本地存储offsets的目的。

在腾讯云中,可以使用腾讯云的消息队列CMQ作为Kafka的替代方案。CMQ提供了可靠的消息传递服务,具有高可用性和可伸缩性。您可以使用腾讯云CMQ的SDK来实现类似的功能。具体的腾讯云CMQ产品介绍和使用方法,请参考腾讯云官方文档:腾讯云消息队列 CMQ

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

  • Kafka-15.实现-分发

    Kafka消费者跟踪它在每个分区消费的最大偏移量,并且能够提交偏移量,以便在重新启动的时候可以从这些偏移量中恢复。Kafka提供了在指定broker(针对该组)中将给定消费者组的所有偏移量存储为group coordinator的选项。即,改消费者组中的任何消费者实例应将其偏移量提交和提取发送给该group coordinator。消费者可以通过任何Kafka broker发出FindCoordinatorRequest并读取包含包含协调器详细信息的FindCoordinatorResponse来查找其协调器。然后,消费者可以继续从coordinator broker处理提交或者获取偏移量。在coordinator 移动的情况下,消费者需要重新发现coordinator。偏移调教可以由消费者实例自动或手动完成。

    02

    kafka0.8--0.11各个版本特性预览介绍

    kafka-0.8.2 新特性 producer不再区分同步(sync)和异步方式(async),所有的请求以异步方式发送,这样提升了客户端效率。producer请求会返回一个应答对象,包括偏移量或者错误信。这种异步方地批量的发送消息到kafka broker节点,因而可以减少server端资源的开销。新的producer和所有的服务器网络通信都是异步地,在ack=-1模式下需要等待所有的replica副本完成复制时,可以大幅减少等待时间。   在0.8.2之前,kafka删除topic的功能存在bug。   在0.8.2之前,comsumer定期提交已经消费的kafka消息的offset位置到zookeeper中保存。对zookeeper而言,每次写操作代价是很昂贵的,而且zookeeper集群是不能扩展写能力的。在0.8.2开始,可以把comsumer提交的offset记录在compacted topic(__comsumer_offsets)中,该topic设置最高级别的持久化保证,即ack=-1。__consumer_offsets由一个三元组< comsumer group, topic, partiotion> 组成的key和offset值组成,在内存也维持一个最新的视图view,所以读取很快。 kafka可以频繁的对offset做检查点checkpoint,即使每消费一条消息提交一次offset。   在0.8.1中,已经实验性的加入这个功能,0.8.2中可以广泛使用。auto rebalancing的功能主要解决broker节点重启后,leader partition在broker节点上分布不均匀,比如会导致部分节点网卡流量过高,负载比其他节点高出很多。auto rebalancing主要配置如下, controlled.shutdown.enable ,是否在在关闭broker时主动迁移leader partition。基本思想是每次kafka接收到关闭broker进程请求时,主动把leader partition迁移到其存活节点上,即follow replica提升为新的leader partition。如果没有开启这个参数,集群等到replica会话超时,controller节点才会重现选择新的leader partition,这些leader partition在这段时间内也不可读写。如果集群非常大或者partition 很多,partition不可用的时间将会比较长。   1)可以关闭unclean leader election,也就是不在ISR(IN-Sync Replica)列表中的replica,不会被提升为新的leader partition。unclean.leader.election=false时,kafka集群的持久化力大于可用性,如果ISR中没有其它的replica,会导致这个partition不能读写。   2)设置min.isr(默认值1)和 producer使用ack=-1,提高数据写入的持久性。当producer设置了ack=-1,如果broker发现ISR中的replica个数小于min.isr的值,broker将会拒绝producer的写入请求。max.connections.per.ip限制每个客户端ip发起的连接数,避免broker节点文件句柄被耗光。

    02
    领券