Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >一文读懂springboot整合kafka

一文读懂springboot整合kafka

原创
作者头像
QGS
发布于 2024-05-03 06:12:35
发布于 2024-05-03 06:12:35
17.2K0
举报
文章被收录于专栏:QGS星球QGS星球

安装kafka

启动Kafka本地环境需Java 8+以上

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据

Kafka启动方式有Zookeeper和Kraft,两种方式只能选择其中一种启动,不能同时使用。

Kafka下载https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz

解压tar -xzf kafka_2.13-3.7.0.tgz

一、Zookeeper启动Kafka(kafka内置zookeeper)

Kafka依赖Zookeeper

1、启动Zookeeper 2、启动Kafka

使用kafka自带Zookeeper启动

./zookeeper-server-start.sh ../config/zookeeper.properties &

./zookeeper-server-stop.sh ../config/zookeeper.properties

./kafka-server-start.sh ../config/server.properties &

./kafka-server-stop.sh ../config/server.properties

二、Zookeeper服务器启动Kafka

Zookeeper服务器安装

https://zookeeper.apache.org/

https://dlcdn.apache.org/zookeeper/zookeeper-3.9.2/apache-zookeeper-3.9.2-bin.tar.gz

tar zxvf apache-zookeeper-3.9.2-bin.tar.gz

配置Zookeeper服务器

cp zoo_sample.cfg zoo.cfg

启动Zookeeper服务器

./zkServer.sh start

修改Zookeeper端口

Zoo.cfg添加内容

admin.serverPort=8099

apache-zookeeper-3.9.2-bin/bin目录下重启Zookeeper

Zookeeper服务器启动kafka

/opt/kafka_2.13-3.7.0/bin目录下

./kafka-server-start.sh ../config/server.properties &

Kafka配置文件server.properties

三、使用KRaft启动Kafka

UUID通用唯一识别码(Universally Unique Identifier)

1、生成Cluster UUID(集群UUID):./kafka-storage.sh random-uuid

2.格式化kafka日志目录:./kafka-storage.sh format -t 3pMJGNJcT0uLIBsZhbucjQ -c ../config/kraft/server.properties

3.启动kafka:./kafka-server-start.sh ../config/kraft/server.properties &

springboot集成kafka

创建topic时,若不指定topic的分区(partition)数量使,则默认为1个分区(partition)

修改server.properties文件

vim server.properties

listeners=PLAINTEXT://0.0.0.0:9092

advertised.listeners=PLAINTEXT://192.168.68.133:9092

springboot加入依赖kafka

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

加入spring-kafka依赖后,springboot自动装配好kafkaTemplate的Bean

代码语言:java
AI代码解释
复制
application.yml配置连接kafka
spring:
 kafka:
 bootstrap-servers: 192.168.68.133:9092

生产者

发送消息

代码语言:java
AI代码解释
复制
@Resource
private KafkaTemplate<String,String> kafkaTemplate;

@Test
void kafkaSendTest(){
 kafkaTemplate.send("kafkamsg01","hello kafka");
}

消费者

接收消息

代码语言:java
AI代码解释
复制
@Component
public class KafkaConsumer {

 @KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
 public void consume(String message){
 System.out.println("接收到消息:"+message);
 }

}

若没有配置groupid

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.

代码语言:java
AI代码解释
复制
@Component
public class KafkaConsumer {

 @KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
 public void consume(String message){
 System.out.println("接收到消息:"+message);
 }

}

想从第一条消息开始读取(若同组的消费者已经消费过该主题,并且kafka已经保存了该消费者组的偏移量,则设置auto.offset.reset设置为earliest不生效,需要手动修改偏移量或使用新的消费者组)

application.yml需要将auto.offset.reset设置为earliest

代码语言:java
AI代码解释
复制
spring:
  kafka:
  bootstrap-servers: 192.168.68.133:9092
  consumer:
  auto-offset-reset: earliest

Earliest:将偏移量重置为最早的偏移量

Latest: 将偏移量重置为最新的偏移量

None: 没有为消费者组找到以前的偏移量,向消费者抛出异常

Exception: 向消费者抛出异常

脚本重置消费者组偏移量

代码语言:txt
AI代码解释
复制
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group 123 --topic kafkamsg01 --reset-offsets --to-earliest –execute

重置完成

我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
手搭手Kafka2.13发送和消费消息
Kafka下载https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
QGS
2024/04/23
4520
单机单实例部署Kafka及测试
在我们做和kafka开发相关的工作时,往往希望独立部署一套kafka测试环境。而kafka部署时,不能只是简单安装kafka自身组件,还要安装zookeeper、JDK之类的辅助软件。这让其部署变得不是那么方便。本文将使用kafka和zookeeper的官方编译包来做部署。
方亮
2024/05/24
4520
单机单实例部署Kafka及测试
springboot整合kafka入门
producer: 生产者,负责发布消息到kafka cluster(kafka集群)中。生产者可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等。
全栈程序员站长
2022/11/15
8230
当Spring邂逅Kafka,有趣的知识增加了
Kafka起初是由LinkedIn公司采用Scala语言开发的一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。
翊君
2022/03/08
1.2K0
Spring Boot + Kafka 实现消息队列
在开始编码前,需要先安装并启动 Kafka 服务。Kafka 依赖于 ZooKeeper 进行集群协调,因此需要先启动 ZooKeeper。
技术文章分析
2025/08/08
2960
【应用进阶】Kafka的部署和案例
这两天学习MQ在项目中的使用,就自己搭建了一个测试环境,在笔记本电脑搭建,使用的win10系统。不废话,开撸。
用户5640963
2021/06/29
5480
重磅里程碑:Kafka4.x生产级集群部署、扩容、缩容实践案例总结记录
Apache Kafka 4.0 是一个重要的里程碑,标志着首个完全剔除 Apache ZooKeeper® 即可运行的重大版本发布。通过KRaft 模式运行,Kafka 简化部署和管理工作,消除维护独立 ZooKeeper 集群的复杂性,打破ZooKeeper自身缺陷造成Kafka集群规模的限制。欢迎关注微信公众号:大数据从业者
用户9421738
2025/04/17
1.8K1
重磅里程碑:Kafka4.x生产级集群部署、扩容、缩容实践案例总结记录
SpringBoot3集成Kafka
Kafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集成和关键任务应用,基于Zookeeper协调的处理平台,也是一种消息系统,具有更好的吞吐量、内置分区、复制和容错,这使得它成为大规模消息处理应用程序的一个很好的解决方案;
知了一笑
2023/09/01
1.1K0
SpringBoot3集成Kafka
kafka也没那么难--kafka的安装与简单使用
前短时间在腾讯云上买了一个linux 服务器,决心把kafka这一模快的知识补充起来啦。所以就搞起来。
程序员爱酸奶
2020/03/13
1.1K0
Spring Boot 整合 Kafka:构建高效消息驱动应用
你可以使用 Spring Initializr 快速创建一个 Spring Boot 项目。
技术文章分析
2025/08/08
8930
kafka教程(一)
kafka是用于构建实时数据管道和流应用程序。具有横向扩展,容错,wicked fast(变态快)等优点,并已在成千上万家公司运行。
用户3467126
2019/07/03
8250
kafka教程(一)
SpringBoot2 整合Kafka组件,应用案例和流程详解
Kafka是由Apache开源,具有分布式、分区的、多副本的、多订阅者,基于Zookeeper协调的分布式处理平台,由Scala和Java语言编写。通常用来搜集用户在应用服务中产生的动作日志数据,并高速的处理。日志类的数据需要高吞吐量的性能要求,对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
知了一笑
2020/02/26
7160
SpringBoot整合kafka
(adsbygoogle =window.adsbygoogle ||[]).push({});
猿码优创
2019/07/27
1.2K0
轻松上手 Spring Boot & Kafka 实战!
Kafka需要依赖zookeeper,并且自身集成了zookeeper,zookeeper至少需要3个节点保证集群高可用,下面是在单机linux下创建kafka3个节点伪集群模式。
Java技术栈
2020/04/27
1K0
Kafka入门与实战
今天我们来聊一聊现在MQ中最火爆的Kafka吧。关于Kafka的内容还是比较多的。本篇大概15000左右字,大家根据自己的需求来看吧。本文的大纲如下图所示: 一、消息队列的作用是什么?
爪哇缪斯
2023/05/10
8620
Kafka入门与实战
Kafka消息队列
Kafka 是一个分布式、支持分区,多副本的基于 zookeeper 的消息队列。使用消息队列,是应用 A 将要处理的信息发送到消息队列然后继续下面的任务,需要该信息的应用 B 从消息队列里面获取信息再做处理,这样做像是多此一举,应用 A 直接发信息给应用 B 不就可以了吗?存在即合理,使用消息队列其作用如下:
晚上没宵夜
2022/05/09
1.1K0
Kafka消息队列
SpringBoot开发案例之整合Kafka实现消息队列
前言 最近在做一款秒杀的案例,涉及到了同步锁、数据库锁、分布式锁、进程内队列以及分布式消息队列,这里对SpringBoot集成Kafka实现消息队列做一个简单的记录。 Kafka简介 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚
小柒2012
2018/06/07
1.4K0
kafka安装及使用---Kafka从入门到精通(二)
前面说了kafka的topic有分区的概念,每个分区又有leader 和 follower,kafka听过ack机制保证消息的可靠性。
keying
2022/07/29
8630
kafka安装及使用---Kafka从入门到精通(二)
SpringBoot系列之集成kafka实现事件发布
事件发布订阅实现,我们经常使用到spring框架提供的ApplicationEventPublisher,基于kafka的特性,我们也可以简单实现类似的效果
SmileNicky
2022/01/04
1.1K0
SpringBoot系列之集成kafka实现事件发布
一文告诉你SparkStreaming如何整合Kafka!
关于SparkStreaming从理论到实战的部分,博主已经在前面的博客中介绍了。本篇博客,为大家带来的是SparkStreaming整合Kafka的教程!
大数据梦想家
2021/01/27
8170
一文告诉你SparkStreaming如何整合Kafka!
相关推荐
手搭手Kafka2.13发送和消费消息
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档