首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >kafka第三次课!

kafka第三次课!

作者头像
张哥编程
发布2024-12-13 13:59:57
发布2024-12-13 13:59:57
2770
举报
文章被收录于专栏:云计算linux云计算linux

1,课程回顾 2,本章重点 springboot整合kafka springcloud整合kafka 3,具体内容 3.1 springboot整合kafka 3.1.1 pom.xml添加jar org.springframework.kafka spring-kafka 2.8.1

com.alibaba fastjson 1.2.79 注意:此处使用的springboot版本为2.4.1 kafka是编写课件时最新版本2.6.6,不是任意版本都兼容 3.1.2 配置文件application.properties #Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\Windows\System32\drivers\etc\hosts

#生产者配置

#spring整合kafka配置 #连接集群配置 spring.kafka.bootstrap-servers=cluster1:9092,cluster2:9092,cluster3:9092

重试次数

spring.kafka.producer.retries=3

应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)

spring.kafka.producer.acks=-1

批量大小

spring.kafka.producer.batch-size=16384

提交延时

spring.kafka.producer.properties.linger.ms=10

当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka

linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了

生产端缓冲区大小

spring.kafka.producer.buffer-memory = 33554432

Kafka提供的序列化和反序列化类

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

自定义分区器

spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

#自定义topic名称 topicName=topic-deptinfo

#消费者配置 #springboot整合 kafka #消费者配置 #连接集群配置 spring.kafka.bootstrap-servers=cluster1:9092,cluster2:9092,cluster3:9092

默认的消费组ID

spring.kafka.consumer.properties.group.id=defaultConsumerGroup

是否自动提交offset

spring.kafka.consumer.enable-auto-commit=true

提交offset延时(接收到消息后多久提交offset)

spring.kafka.consumer.auto.commit.interval.ms=1000

当kafka中没有初始offset或offset超出范围时将自动重置offset

earliest:重置为分区中最小的offset;

latest:重置为分区中最新的offset(消费分区中新产生的数据);

none:只要有一个分区不存在已提交的offset,就抛出异常;

spring.kafka.consumer.auto-offset-reset=latest

消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)

spring.kafka.consumer.properties.session.timeout.ms=120000

消费请求超时时间

spring.kafka.consumer.properties.request.timeout.ms=180000

Kafka提供的序列化和反序列化类

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

消费端监听的topic不存在时,项目启动会报错(关掉)

spring.kafka.listener.missing-topics-fatal=false

设置批量消费

spring.kafka.listener.type=batch

批量消费每次最多消费多少条消息

spring.kafka.consumer.max-poll-records=50

#自定义topic名称 topicName=topic-deptinfo

3.1.3 生成者代码 package com.aaa.sbm.task; import com.aaa.sbm.entity.Dept; import com.aaa.sbm.service.DeptService; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import javax.annotation.Resource; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /**

  • @ fileName:TimedSendDeptInfoTask
  • @ description:
  • @ author:zhz
  • @ createTime:2022/1/13 9:37
  • @ version:1.0.0 / @Component //不在3层之内,交给IOC处理 @EnableScheduling //开启定时任务 @EnableAsync //开启异步处理 可以在任务方法上使用@Async 该方法多线程处理时,可以异步处理,提高执行效率 @Slf4j public class TimedSendDeptInfoTask { //spring用到了哪些涉及模式 模板模式(封装出来一个通用的工具模板,供你完成什么功能,简化整个操作流程) @Resource private KafkaTemplate kafkaTemplate; //juc包下线程安全的类,可以实现多线程同步自增 private AtomicInteger atomicInteger =new AtomicInteger(); //注入topic名称 @Value(“${topicName}”) private String topicN; /@Resource private DeptService deptService;/ /@Resource private RestTemplate restTemplate;/ //HttpClient /@Resource //模板模式 private RedisTemplate redisTemplate;*/ /**
  • 定制执行任务
  • 每隔3秒 使用多线程发送5条部门信息到kafka中 / // cron=“秒 分 时 日 月 周” / 每 - 范围(3-10 每分钟的3秒到10秒每秒执行一次) , 选项 (3,10,15 每分钟第3秒第10秒第15秒执行) @Scheduled(cron = "/3 * * * * ?") public void timedExecute(){ //实例化固定线程池 ExecutorService executorService = Executors.newFixedThreadPool(5); //lambda表达式 简化代码写法 ()->sendDeptInfo() 左边是参数 右边是执行业务 /* executorService.execute(new Runnable() { @Override public void run() { sendDeptInfo(); System.out.println(“1”); System.out.println(“2”); } });/ /executorService.execute(()-> { sendDeptInfo(); System.out.println(“1”); System.out.println(“2”); });*/ //启动5个线程执行 executorService.execute(()->sendDeptInfo()); executorService.execute(()->sendDeptInfo()); executorService.execute(()->sendDeptInfo()); executorService.execute(()->sendDeptInfo()); executorService.execute(()->sendDeptInfo()); //关闭线程池 executorService.shutdown(); } /**
  • 发送部门信息 */ // @Async //异步处理,提高效率 public void sendDeptInfo(){ log.info(“线程信息为:”+Thread.currentThread().getName()+“,正在执行。。。。。。。。。。。。。。”); int getAndIncrement = atomicInteger.getAndIncrement(); Dept dept =new Dept(getAndIncrement,“dev”+getAndIncrement,“zz”+getAndIncrement); log.info(“要发送的部门信息为:”+dept); //发送部门信息到kafka中 一定要是字符串格式 kafkaTemplate.send(topicN, JSON.toJSONString(dept)); } }

3.1.4 消费者代码 package com.aaa.sbm.util; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; /**

  • @ fileName:KafkaConsumer
  • @ description:工具类用来监控topic-deptinof,不停的获取message
  • @ author:zhz
  • @ createTime:2022/1/13 10:26
  • @ version:1.0.0 */ @Component @Slf4j public class KafkaConsumer { /**
  • 消费消息方法 借助 @KafkaListener指定消费的topic 如果该topic有信息都回被拉取pull 到参数中
  • @param record */ @KafkaListener(topics = {“${topicName}”}) //监听注解 监听指定的topic public void pullKafkaMsg(ConsumerRecord<?,?> record){ //jdk8之后封装的专门处理空值一个类,有效防止空指针异常 Optional<?> optional = Optional.ofNullable(record.value()); // isPresent等同于if(record!=null) if(optional.isPresent()){ log.info(“接受到的信息为:”+ record); log.info(“接受到的部门信息为:”+ optional.get()); } } }

3.1.5 测试 消费消息 启动消费者项目,观察控制台 生产信息 直接启动生成者,观察控制台

3.2 springcloud整合kafka(以121讲课项目为例子) 3.2.1 添加jar 父项目: org.springframework.kafka spring-kafka 2.2.14.RELEASE

代码语言:javascript
复制
        <!-- fastjson的jar包-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.18</version>
        </dependency>

注意:这里的springboot2.1.11只可以和spring-kafka2.2.*的版本匹配,否则会报异常 微服务: org.projectlombok lombok org.springframework.kafka spring-kafka com.alibaba fastjson 3.2.2 生成者配置application.properties 图片: https://shimo.im/fake.png

#springboot 整合kafka #Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\Windows\System32\drivers\etc\hosts spring.kafka.bootstrap-servers=kafka1:9092,kafka2:9092,kafka3:9092

#生产者配置

重试次数

spring.kafka.producer.retries=0

应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)

spring.kafka.producer.acks=1

批量大小

spring.kafka.producer.batch-size=16384

提交延时

spring.kafka.producer.properties.linger.ms=0

当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka

linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了

生产端缓冲区大小

spring.kafka.producer.buffer-memory = 33554432

Kafka提供的序列化和反序列化类

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

自定义分区器

spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

3.2.3 消费者配置application.properties

#springboot 整合kafka #Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\Windows\System32\drivers\etc\hosts spring.kafka.bootstrap-servers=kafka1:9092,kafka2:9092,kafka3:9092

#消费者配置

默认的消费组ID

spring.kafka.consumer.properties.group.id=defaultConsumerGroup

是否自动提交offset

spring.kafka.consumer.enable-auto-commit=true

提交offset延时(接收到消息后多久提交offset)

spring.kafka.consumer.auto.commit.interval.ms=1000

当kafka中没有初始offset或offset超出范围时将自动重置offset

earliest:重置为分区中最小的offset;

latest:重置为分区中最新的offset(消费分区中新产生的数据);

none:只要有一个分区不存在已提交的offset,就抛出异常;

spring.kafka.consumer.auto-offset-reset=latest

消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)

spring.kafka.consumer.properties.session.timeout.ms=120000

消费请求超时时间

spring.kafka.consumer.properties.request.timeout.ms=180000

Kafka提供的序列化和反序列化类

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

消费端监听的topic不存在时,项目启动会报错(关掉)

spring.kafka.listener.missing-topics-fatal=false

设置批量消费

spring.kafka.listener.type=batch

批量消费每次最多消费多少条消息

spring.kafka.consumer.max-poll-records=50

3.2.4 生产者代码 图片: https://shimo.im/fake.png @Autowired private KafkaTemplate<String, String> kafkaTemplate; //发送消息方法 @GetMapping(“productOrder”) public String send() { Order order =new Order(); order.setId(100); order.setMemberUsername(“测试生产者”); order.setShopId(1001); //log.info(“+++++++++++++++++++++ message = {}”, JSON.toJSONString(dept)); //topic-dept为主题 kafkaTemplate.send(“topic-order”, JSON.toJSONString(order)); return “suc”; } 3.2.5 消费者代码 图片: https://shimo.im/fake.png package com.aaa.ss.util;

import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;

import java.util.Optional;

/**

  • @ fileName:KafkaConsumer
  • @ description:
  • @ author:zhz
  • @ createTime:2021/2/20 17:20 */ @Component @Slf4j public class KafkaConsumer { @KafkaListener(topics = {“topic-order”}) public void consumer(ConsumerRecord<?, ?> record){ Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info(“----------------- record =” + record); log.info(“------------------ message =” + message); } } }
代码语言:javascript
复制
    3.2.6  测试
            1,生产者,地址栏请求(具体要看业务需求,讲课只位了测试效果)
             http://localhost:2221/order/productOrder
            2,观察消费者项目

图片: https://shimo.im/fake.png 3,还可以使用命令查看后台topic 图片: https://shimo.im/fake.png

4,知识点总结 5,本章面试题

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-10-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 重试次数
  • 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
  • 批量大小
  • 提交延时
  • 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
  • linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
  • 生产端缓冲区大小
  • Kafka提供的序列化和反序列化类
  • 自定义分区器
  • spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
  • 默认的消费组ID
  • 是否自动提交offset
  • 提交offset延时(接收到消息后多久提交offset)
  • 当kafka中没有初始offset或offset超出范围时将自动重置offset
  • earliest:重置为分区中最小的offset;
  • latest:重置为分区中最新的offset(消费分区中新产生的数据);
  • none:只要有一个分区不存在已提交的offset,就抛出异常;
  • 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
  • 消费请求超时时间
  • Kafka提供的序列化和反序列化类
  • 消费端监听的topic不存在时,项目启动会报错(关掉)
  • 设置批量消费
  • spring.kafka.listener.type=batch
  • 批量消费每次最多消费多少条消息
  • spring.kafka.consumer.max-poll-records=50
  • 重试次数
  • 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
  • 批量大小
  • 提交延时
  • 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
  • linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
  • 生产端缓冲区大小
  • Kafka提供的序列化和反序列化类
  • 自定义分区器
  • spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
  • 默认的消费组ID
  • 是否自动提交offset
  • 提交offset延时(接收到消息后多久提交offset)
  • 当kafka中没有初始offset或offset超出范围时将自动重置offset
  • earliest:重置为分区中最小的offset;
  • latest:重置为分区中最新的offset(消费分区中新产生的数据);
  • none:只要有一个分区不存在已提交的offset,就抛出异常;
  • 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
  • 消费请求超时时间
  • Kafka提供的序列化和反序列化类
  • 消费端监听的topic不存在时,项目启动会报错(关掉)
  • 设置批量消费
  • spring.kafka.listener.type=batch
  • 批量消费每次最多消费多少条消息
  • spring.kafka.consumer.max-poll-records=50
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档