package com.bonc.rdpe.kafka110.consumer;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* @Title Subscribe.java
* @Description 订阅多个主题的全部分区
* @Author YangYunhe
* @Date 2018-06-28 09:53:41
*/
public class Subscribe {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("group.id", "dev3-yangyunhe-group001");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String[] topics = new String[]{"dev3-yangyunhe-topic001", "dev3-yangyunhe-topic002"};
// 订阅指定主题的全部分区
consumer.subscribe(Arrays.asList(topics));
try {
while (true) {
/*
* poll() 方法返回一个记录列表。
* 每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
* 我们一般会遍历这个列表,逐条处理这些记录。
* 传给poll() 方法的参数是一个超时时间,用于控制 poll() 方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。
* 如果该参数被设为 0,poll() 会立即返回,否则它会在指定的毫秒数内一直等待 broker 返回数据。
* 而在经过了指定的时间后,即使还是没有获取到数据,poll()也会返回结果。
*/
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = " + record.topic() + ", partition = " + record.partition());
}
}
} finally {
/*
* 在退出应用程序之前使用 close() 方法关闭消费者。
* 网络连接和 socket 也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不再发送心跳并认定它已死亡,
* 因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息。
*/
consumer.close();
}
}
}
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅所有以"dev3"开头的主题的全部分区
Pattern pattern = Pattern.compile("dev3.*");
consumer.subscribe(pattern, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> arg0) {
// TODO nothing:再均衡监听器会在之后的文章中进行讨论
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> arg0) {
// TODO nothing:再均衡监听器会在之后的文章中进行讨论
}
});
...... 省略部分重复代码
try {
# 消费数据
}
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition[] topicPartitions = new TopicPartition[]{
new TopicPartition("dev3-yangyunhe-topic001", 0),
new TopicPartition("dev3-yangyunhe-topic002", 1)
};
// 订阅"dev3-yangyunhe-topic001"的分区0和"dev3-yangyunhe-topic002"的分区1
consumer.assign(Arrays.asList(topicPartitions));
...... 省略部分重复代码
try {
# 消费数据
}
receive.buffer.bytes
send.buffer.bytes
说明:这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有