1,课程回顾 2,本章重点 springboot整合kafka springcloud整合kafka 3,具体内容 3.1 springboot整合kafka 3.1.1 pom.xml添加jar org.springframework.kafka spring-kafka 2.8.1
com.alibaba fastjson 1.2.79 注意:此处使用的springboot版本为2.4.1 kafka是编写课件时最新版本2.6.6,不是任意版本都兼容 3.1.2 配置文件application.properties #Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\Windows\System32\drivers\etc\hosts
#生产者配置
#spring整合kafka配置 #连接集群配置 spring.kafka.bootstrap-servers=cluster1:9092,cluster2:9092,cluster3:9092
spring.kafka.producer.retries=3
spring.kafka.producer.acks=-1
spring.kafka.producer.batch-size=16384
spring.kafka.producer.properties.linger.ms=10
spring.kafka.producer.buffer-memory = 33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#自定义topic名称 topicName=topic-deptinfo
#消费者配置 #springboot整合 kafka #消费者配置 #连接集群配置 spring.kafka.bootstrap-servers=cluster1:9092,cluster2:9092,cluster3:9092
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto.commit.interval.ms=1000
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.properties.session.timeout.ms=120000
spring.kafka.consumer.properties.request.timeout.ms=180000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.missing-topics-fatal=false
#自定义topic名称 topicName=topic-deptinfo
3.1.3 生成者代码 package com.aaa.sbm.task; import com.aaa.sbm.entity.Dept; import com.aaa.sbm.service.DeptService; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import javax.annotation.Resource; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /**
3.1.4 消费者代码 package com.aaa.sbm.util; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; /**
3.1.5 测试 消费消息 启动消费者项目,观察控制台 生产信息 直接启动生成者,观察控制台
3.2 springcloud整合kafka(以121讲课项目为例子) 3.2.1 添加jar 父项目: org.springframework.kafka spring-kafka 2.2.14.RELEASE
<!-- fastjson的jar包-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.18</version>
</dependency>注意:这里的springboot2.1.11只可以和spring-kafka2.2.*的版本匹配,否则会报异常 微服务: org.projectlombok lombok org.springframework.kafka spring-kafka com.alibaba fastjson 3.2.2 生成者配置application.properties 图片: https://shimo.im/fake.png
#springboot 整合kafka #Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\Windows\System32\drivers\etc\hosts spring.kafka.bootstrap-servers=kafka1:9092,kafka2:9092,kafka3:9092
#生产者配置
spring.kafka.producer.retries=0
spring.kafka.producer.acks=1
spring.kafka.producer.batch-size=16384
spring.kafka.producer.properties.linger.ms=0
spring.kafka.producer.buffer-memory = 33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
3.2.3 消费者配置application.properties
#springboot 整合kafka #Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\Windows\System32\drivers\etc\hosts spring.kafka.bootstrap-servers=kafka1:9092,kafka2:9092,kafka3:9092
#消费者配置
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto.commit.interval.ms=1000
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.properties.session.timeout.ms=120000
spring.kafka.consumer.properties.request.timeout.ms=180000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.missing-topics-fatal=false
3.2.4 生产者代码 图片: https://shimo.im/fake.png @Autowired private KafkaTemplate<String, String> kafkaTemplate; //发送消息方法 @GetMapping(“productOrder”) public String send() { Order order =new Order(); order.setId(100); order.setMemberUsername(“测试生产者”); order.setShopId(1001); //log.info(“+++++++++++++++++++++ message = {}”, JSON.toJSONString(dept)); //topic-dept为主题 kafkaTemplate.send(“topic-order”, JSON.toJSONString(order)); return “suc”; } 3.2.5 消费者代码 图片: https://shimo.im/fake.png package com.aaa.ss.util;
import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;
import java.util.Optional;
/**
3.2.6 测试
1,生产者,地址栏请求(具体要看业务需求,讲课只位了测试效果)
http://localhost:2221/order/productOrder
2,观察消费者项目图片: https://shimo.im/fake.png 3,还可以使用命令查看后台topic 图片: https://shimo.im/fake.png
4,知识点总结 5,本章面试题