Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spring Boot整合kafka

Spring Boot整合kafka

作者头像
闻说社
发布于 2025-04-17 10:05:47
发布于 2025-04-17 10:05:47
12900
代码可运行
举报
运行总次数:0
代码可运行

Spring Boot整合kafka

此处简单记录一下 SpringBoot 和 Kafka 的整合。 先初始化一个SpringBoot工程

搭建kafka环境

这里从用docker方式搭建kafka,kafka需要注册到注册中心上,所以要先安装zookeeper zookeeper的docker-compose.yaml文件

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
yaml 代码解读复制代码version: '3.1'

services:
  zookeeper:
    image: zookeeper
    container_name: zookeeper
    volumes:
      - ./config:/conf
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1

接下来是kafka的docker-compose.yaml文件

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
yaml 代码解读复制代码version: '3.7'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    volumes:
      - ./data:/data
    ports:
      - 2182:2181

  kafka:
    image: wurstmeister/kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_CREATE_TOPICS: "kafeidou:2:0"   #kafka启动后初始化一个有2个partition(分区)0个副本名叫kafeidou的topic
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    volumes:
      - ./kafka-logs:/kafka
    depends_on:
      - zookeeper

引入依赖

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
xml 代码解读复制代码<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.properties中添加配置

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Propreties 代码解读复制代码# 生产者配置
spring.kafka.bootstrap-servers=127.0.0.1:14993
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=1
# key的序列化方式为String
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# value的序列化方式为字节数组
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 消费者配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.consumer.group-id=story-has-you
# 自动偏移量设置
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者改成手动提交
spring.kafka.consumer.enable-auto-commit=false
# 手动ack
spring.kafka.advertised.listener.ack-mode=manual
spring.kafka.listener.ack-mode=manual
spring.kafka.listener.missing-topics-fatal=false

新增配置类

kafkaConfig.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
java 代码解读复制代码import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

/**
 * The type Kafka config.
 *
 * @author fangxi created by 2021/6/15
 */
@Configuration
public class KafkaConfig {

    /**
     * The Kafka template.
     */
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    /**
     * 初始化topic
     *
     * @return the new topic
     */
    @Bean
    public NewTopic changChunFawRealFaultInfo() {
        return new NewTopic("story-has-you", 1, (short) 1);
    }


    /**
     * 配置kafka的重试次数
     *
     * @param configurer           the configurer
     * @param kafkaConsumerFactory the kafka consumer factory
     * @return the concurrent kafka listener container factory
     */
    @Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        // 最大重试次数3次
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0, 3)));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

生产者发送

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
java 代码解读复制代码@Autowired
private KafkaTemplate<String, byte[]> kafkaTemplate;

kafkaTemplate.send("story-has-you", "hello");

消费者监听

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
java 代码解读复制代码
/**
    * On message.
    *
    * @param records the records
    */
@KafkaListener(topics = "story-has-you", groupId = "story-has-you", containerFactory = "kafkaListenerContainerFactory")
public void qingDaoMessage(ConsumerRecord<String, byte[]> records, Acknowledgment ack) {
    try {
		String data = new String(records.value());
        if (data == null) {
            return;
        }
		// 打印hello
        log.info("从kafka接收到消息, {}", data)
    } catch (Exception e) {
        log.error("kafka处理消息异常", e);
    } finally {
		// 手动ack, 通知kafka已经消费
        ack.acknowledge();
    }

}

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验