Kafka)消费方式
pull模式不足之处是如果Kafka没有数据,消费者可能会陷入循环中,一直返回空数据。
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
(1)coordinator:辅助实现消费者组的初始化和分区的分配
coordinator节点选择=groupid的hashcode值%50(_consumer_offsets的分区数量)
例如:groupid的hashcode值=1,1%50=1,那么_consumer_offsets主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。
消费者重要参数说明
bootstrap.servers
#向 Kafka 集群建立初始连接用到的 host/port 列表
key.deserializer 和value.deserializer
#指定接收消息的 key 和 value 的反序列化类型。一定要写全类名
group.id
#标记消费者所属的消费者组。
enable.auto.commit
#默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms
#如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。
auto.offset.reset
#当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理?earliest:自动重置偏移量到最早的偏移量。latest:默认,自动重置偏移量为最新的偏移量。none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。anything:向消费者抛异常。
offsets.topic.num.partitions
#__consumer_offsets 的分区数,默认是 50 个分区。
heartbeat.interval.ms
#Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的 1/3。
session.timeout.ms
#Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms
#消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes
#默认 1 个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms
#默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
fetch.max.bytes
#默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。
max.poll.records
#一次 poll 拉取数据返回消息的最大条数,默认是 500 条。
创建一个独立的消费者,消费first主题中的数据
注意:在消费者API代码中必须配置消费者组id。命令行启动消费者不填写消费者组id会被自动填写随机的消费者组id。
package org.zhm.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
/**
* @ClassName CustomConsumer
* @Description TODO
* @Author Zouhuiming
* @Date 2023/6/14 9:08
* @Version 1.0
*/
public class CustomConsumer {
public static void main(String[] args) {
//1、创建消费者的配置对象
Properties properties=new Properties();
//2、给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
//配置反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//配置消费者组(组内任意起名) 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");
// //修改分区分配策略
// properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
//测试:修改分区分配策略
ArrayList<String> startegys=new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,startegys);
//3、创建消费者对象
KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
//注册要消费的主题(可以消费多个主题)
ArrayList<String> topics=new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
//拉取数据打印
while (true){
//设置1s中消费一批数据
ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
//打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
(1)在IDEA中执行消费者程序
(2)在 IDEA 中执行生产者程序 CustomProducerCallback()在控制台观察生成的数据。
(3)在IDEA控制台观察收到的数据
独立消费者案例(订阅分区)
(1)代码编写
package org.zhm.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import javax.lang.model.type.ArrayType;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
/**
* @ClassName CustomConsumerPartition
* @Description TODO
* @Author Zouhuiming
* @Date 2023/6/14 9:18
* @Version 1.0
*/
public class CustomConsumerPartition {
public static void main(String[] args) {
Properties properties=new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//配置消费者组(必须)
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
//消费某个主题的某个分区数据
ArrayList<TopicPartition> topicPartitions=new ArrayList<>();
topicPartitions.add(new TopicPartition("first",0));
kafkaConsumer.assign(topicPartitions);
while (true){
ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
(1)在 IDEA 中执行消费者程序。
(2)在 IDEA 中执行生产者程序 CustomProducerCallback()在控制台观察生成几个 0 号分区的数据。
(3)在 IDEA 控制台,观察接收到的数据,只能消费到 0 号分区数据表示正确。
消费者组案例
1、一个consumer group中有多个consumer组成,一个topic有多个partition组成,现在的问题是到底是由哪个consumer来消费哪个分区的数据。
2、kafka有四种主流的分区分配策略:Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range+CooperativeSticky。kafka可以同时使用多个分区分配策略。
heartbeat.interval.ms
#Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms,也不应该高于session.timeout.ms 的 1/3。
session.timeout.ms
#Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms
#消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
partition.assignment.strategy
#消费者分区分配策略,默认策略是 Range +CooperativeSticky。Kafka 可以同时使用多个分区分配策略。可以选择的策略包括:Range、RoundRobin、Sticky、CooperativeStick
Range是对每个topic而言的。首先对同一个topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
假如现在有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。
通过partition数/consumer数来决定每个消费者应该消费几个分区,如果除不尽,那么前面几个消费者将会多消费1个分区。
2、Range分区分配策略案例
(1)修改主题first为7个分区
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 7
注意:分区数可以增加,但是不能减少
(2)复制 CustomConsumer 类,创建 CustomConsumer2。这样可以由三个消费者CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”,同时启动 3 个消费者。
(3)启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区。
(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
(2)再次重新发送消息观看结果(45s 以后)。
说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。
粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
_consumer_offsets
主题里面采用key和value的方式储存数据。key是group.id+topic+分区号
,value值就是当前offset的值。每隔一段时间,kafka内部会对这个topic进行compact,也就是每个group.id+topic+分区号就保留最新数据。
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。 自动提交offset的相关参数:
虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
auto.offset.reset = earliest | latest | none
默认是 latest。
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比 如MySQL)。
参考文章:https://blog.csdn.net/qq_44804713/article /details/131219996
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有