在使用Spring Boot连接和消费Kafka主题时,可以通过以下步骤在一个消费者类中连续阅读两个Kafka主题:
application.properties
或application.yml
文件中配置Kafka连接信息,包括Kafka服务器地址、端口号等。@KafkaListener
注解标记该类为Kafka消费者,并指定要监听的主题。@Component
public class KafkaConsumer {
@KafkaListener(topics = "topic1")
public void consumeTopic1(String message) {
// 处理topic1的消息
System.out.println("Received message from topic1: " + message);
}
@KafkaListener(topics = "topic2")
public void consumeTopic2(String message) {
// 处理topic2的消息
System.out.println("Received message from topic2: " + message);
}
}
@EnableKafka
注解启用Kafka消费者功能,并配置Kafka消费者工厂。@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@SpringBootApplication
注解标记该类为Spring Boot应用程序的入口,并启动应用程序。@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
通过以上步骤,你可以在一个消费者类中连续阅读两个Kafka主题。当有消息发送到topic1
或topic2
时,对应的消费者方法将被调用,并处理接收到的消息。
领取专属 10元无门槛券
手把手带您无忧上云