auto.offset.reset //earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 //latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 //none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
练习 :在kafka集群中创建18BD-40主题 副本为2个,分区为3个 生产者设置: 消息确认机制 为all 重试次数 为1 批量处理消息字节数 为16384 设置缓冲区大小 为 33554432 设置每条数据生产延迟1ms 设置key的序列化为org.apache.kafka.common.serialization.StringSerializer 设置value的序列化为org.apache.kafka.common.serialization.StringSerializer 数据分发策略为轮询方式发送到每个分区中
消费者设置: 消费者组id为test 设置自动提交偏移量 设置当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer 设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer 消费指定分区0和分区2中的数据
模拟生产者,请写出代码向18BD-40主题中生产数据test0-test99 模拟消费者,请写出代码把18BD-40主题中的0和2号分区的数据消费掉 ,打印输出到控制台
public static void main(String[] args) {
pro();
cons();
}
public static void pro() {
//1、配置kafka集群
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "hadoopt1:9092,hadoopt2:9092,hadoopt3:9092");
//消息确认机制
props.put("acks", "all");
//重试机制
props.put("retries", 1);
//批量发送的大小
props.put("batch.size", 16384);
//消息延迟
props.put("linger.ms", 1);
//批量的缓冲区大小
props.put("buffer.memory", 33554432);
//kafka数据中key value的序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2、获取producer实例
KafkaProducer kafkaProducer = new KafkaProducer(props);
//3、通过实例发送数据到kafka集群
for (int i = 0; i < 100; i++) {
ProducerRecord producerRecord = new ProducerRecord("18BD-40","test"+i);
kafkaProducer.send(producerRecord);
}
kafkaProducer.close();
}
public static void cons() {
//1
Properties props = new Properties();
//指定kafka服务器
props.put("bootstrap.servers", "hadoopt3:9092,hadoopt3:9092,hadoopt3:9092");
//消费组
props.put("group.id", "test");
//以下两行代码 ---消费者手动提交offset值
props.put("enable.auto.commit", "true");
//自动提交的周期
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
//设置key value的序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer kafkaConsumer = new KafkaConsumer(props);
//3、设置读取的topic
TopicPartition topicPartition2 = new TopicPartition("18BD-40", 2);
TopicPartition topicPartition0 = new TopicPartition("18BD-40", 0);
kafkaConsumer.assign(Arrays.asList(topicPartition2,topicPartition0));
while (true) {
//4、拉取数据,并输出
//获取所有数据
ConsumerRecords consumerRecords = kafkaConsumer.poll(1000);
//遍历所有数据
for (ConsumerRecord consumerRecord : consumerRecords) {
//获取一条数据内的信息
System.out.println("数据是 "+consumerRecord.value()+" 分区是:"+consumerRecord.partition());
}
}
}