在Spring Boot Kafka中为同一个消费者工厂bean设置不同的消费者组id,可以通过配置不同的消费者工厂bean来实现。
首先,需要在Spring Boot的配置文件中配置Kafka的相关属性,包括Kafka的地址、端口、消费者组id等。可以使用spring.kafka.consumer.bootstrap-servers
配置Kafka的地址和端口,使用spring.kafka.consumer.group-id
配置消费者组id。
接下来,在代码中创建多个消费者工厂bean,并为每个消费者工厂bean设置不同的消费者组id。可以使用@Bean
注解将消费者工厂bean注入到Spring容器中。
示例代码如下:
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, String> consumerFactory1() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConsumerFactory<String, String> consumerFactory2() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-2");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory1() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory1());
return factory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory2() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory2());
return factory;
}
}
在上述代码中,创建了两个消费者工厂bean,分别为consumerFactory1
和consumerFactory2
,并为每个消费者工厂bean设置了不同的消费者组id。
接下来,在消费者类中使用@KafkaListener
注解指定使用哪个消费者工厂bean,并指定要监听的主题。
示例代码如下:
@Component
public class KafkaConsumer {
@KafkaListener(topics = "topic1", containerFactory = "kafkaListenerContainerFactory1")
public void consumeMessage1(String message) {
// 处理消息
}
@KafkaListener(topics = "topic2", containerFactory = "kafkaListenerContainerFactory2")
public void consumeMessage2(String message) {
// 处理消息
}
}
在上述代码中,consumeMessage1
方法使用kafkaListenerContainerFactory1
作为消费者工厂bean,监听名为topic1
的主题;consumeMessage2
方法使用kafkaListenerContainerFactory2
作为消费者工厂bean,监听名为topic2
的主题。
通过以上配置,就可以为同一个消费者工厂bean设置不同的消费者组id。
领取专属 10元无门槛券
手把手带您无忧上云