
在 Kafka 中,消费者在订阅主题时,可以选择 subscribe 或 assign 两种方法。两者的主要区别在于 消费者的分区分配 和 使用场景。下面是它们的详细区别和使用示例:
subscribe 和 assign 的区别subscribe:subscribe 方法订阅一个或多个主题时,Kafka 会将消费者加入指定的消费者组,并由 协调者(Coordinator) 自动管理分区的分配。assign:assign 方法时,它直接指定要消费的分区列表,而不是订阅一个主题。这种方式不会使用消费者组,也不涉及分区的自动分配和重平衡。assign 不会有负载均衡,也不会触发重平衡。img
subscribe 和 assign 的使用场景subscribe 适用场景:assign 适用场景:assign 允许开发者自行设计负载均衡策略。subscribe 和 assign 的示例代码subscribeimport org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题 "my-topic" 和 "another-topic"
consumer.subscribe(Arrays.asList("my-topic", "another-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis());
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}在这个例子中,消费者通过 subscribe 订阅了 my-topic 和 another-topic 主题。Kafka 会根据当前消费者组内的消费者数量自动分配分区,并在消费者数量变化时进行重平衡。
assignimport org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Properties;
import java.util.Collections;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 手动指定主题和分区,例如 "my-topic" 的第 0 分区
TopicPartition partition0 = new TopicPartition("my-topic", );
consumer.assign(Collections.singletonList(partition0));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis());
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}在此例中,消费者通过 assign 方法直接订阅了 my-topic 的第 0 分区。这样不会触发消费者组的重平衡,适合需要精确控制的消费场景。
方法 | 适用场景 | 分配机制 | 是否支持重平衡 |
|---|---|---|---|
subscribe | 自动负载均衡的分布式消费 | 由 Kafka 自动分配分区 | 是 |
assign | 手动控制分区消费 | 消费者指定具体分区 | 否(无消费者组概念) |
在大多数情况下,如果需要自动分区分配和管理,可以选择 subscribe;如果需要精确控制每个分区的消费,可以选择 assign。