Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >消息队列-Kafka(1)

消息队列-Kafka(1)

作者头像
lpe234
发布于 2021-04-13 08:22:32
发布于 2021-04-13 08:22:32
1.2K00
代码可运行
举报
文章被收录于专栏:若是烟花若是烟花
运行总次数:0
代码可运行

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

1 概述

1.1 基本概念
1.1.1 Broker 代理

已发布的消息保存在一组服务器中,称为Kafka集群。集群中的每个服务器都是一个Broker。

1.1.2 Topic 主题

通过Topic机制对消息进行分类,可以认为每个Topic就是一个队列。

1.1.3 Partition 分区

每个Topic可以有多个分区,主要为了提高并发而设计。相同Topic下不同Partition可以并发接收消息,同时也能供消费者并发拉取消息。有多少Partition就有多少并发量。

在Kafka服务器上,分区是以文件目录的形式存在的。每个分区目录中,Kafka会按配置大小及配置周期将分区拆分成多个段文件(LogSegment),每个段由三部分组成:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
- 日志文件:*.log
- 位移索引文件:*.index
- 时间索引文件:*.timeindex

其中*.log用于存储消息本身的数据内容,*.index存储消息在文件中的位置(包括消息的逻辑offset和物理存储offset),*.timeindex存储消息创建时间和对应逻辑地址的映射关系。

将分区拆分成多个段是为了控制存储文件大小。可以很方便的通过操作系统mmap机制映射到内存中,提高写入和读取效率。同时还有一个好处就是,当系统要清除过期数据时,可以直接将过期的段文件删除。

如果每个消息都要在index中保存位置信息,index文件自身大小也很容易变的很大。所以Kafka将index设计为稀疏索引来减小index文件的大小。

1.1.4 Replication 副本

消息冗余数量。不能超过集群中Broker的数量。

1.2 基本操作
1.2.1 Topic相关
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 创建Topic 
# --topic 主题名称 避免使用[_][.]号
# --replication-factor 副本数量(不能超过broker节点数)
# --partitions 分区数量(并发)
./bin/kafka-topics.sh --create \
--topic UserDataQueue \
--replication-factor 3 \
--partitions 5 \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094

# 查看Topic
./bin/kafka-topics.sh --list \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094

# 修改Topic
# 删除Topic
1.2.2 Message相关
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 发送消息
# --topic 指定目标Topic
./bin/kafka-console-producer.sh \
--topic UserDataQueue \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094

# 拉取消息
# --from-beginning 从头开始(获取现有的全量数据)
./bin/kafka-console-consumer.sh \
--topic UserDataQueue \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
--from-beginning

2 集群配置

Kafka集群依赖于Zookeeper。

2.1 Zookeeper配置及启动
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 需要修改的参数

# the directory where the snapshot is stored.
dataDir=/kafka/zkdata
# the port at which the clients will connect
clientPort=2182
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 启动
./bin/zookeeper-server-start.sh -daemon /kafka/zookeeper.properties
2.2 Kafka配置及启动
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 需修改参数

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1  # 同一集群内ID必须唯一

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://localhost:9092  # 同一主机的话,端口号不能相同

# A comma separated list of directories under which to store log files
log.dirs=/kafka/data01  # 日志存储目录,需做隔离

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2182  # Zookeeper连接地址,参见2.1 zk配置
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# Kafka启动

# broker-1
./bin/kafka-server-start.sh -daemon /kafka/server01.properties

# broker-2
./bin/kafka-server-start.sh -daemon /kafka/server02.properties

# broker-3
./bin/kafka-server-start.sh -daemon /kafka/server03.properties
2.3 Zookeeper可视化

PrettyZoo 是一个基于 Apache Curator 和 JavaFX 实现的 Zookeeper 图形化管理客户端。

由下图可以看到,集群3个Broker均正常启动。

2.4 Kafka可视化及监控
2.4.1 AKHQ

管理Topic,Topic消息,消费组等的Kafka可视化系统,相关文档:https://akhq.io/

2.4.2 Kafka Eagle

一个简单且高效的监控系统。相关文档:http://www.kafka-eagle.org/index.html

Kafka Eagle 自带监控大屏。

3 与Spring Boot集成

Spring Boot版本:2.4.4。

官方示例:https://github.com/spring-projects/spring-kafka/tree/main/samples

3.1 Spring Boot
3.1.1 添加依赖
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
implementation 'org.springframework.kafka:spring-kafka'
3.1.2 配置文件
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spring:
  kafka:
    bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
    producer:
      client-id: kfk-demo
      retries: 3
3.1.3 消息发送
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@RestController
public class IndexController {

    @Autowired
    KafkaTemplate<Object, Object> kafkaTemplate;

    @GetMapping
    public String index() {
        int rdm = new Random().nextInt(1000);
        kafkaTemplate.send("UserDataQueue", new UserData("", rdm));
        return "hello world";
    }

    @GetMapping("index2")
    public String index2() {
		// 发送字符串方式
        kafkaTemplate.send("UserDataTopic", new Gson().toJson(new UserData("apple", 23)));
        return "ok";
    }
}
3.1.4 消息接收
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Component
@KafkaListener(
        id = "kfk-demo-userdata",
        topics = {"UserDataQueue"},
        groupId = "kfk-demo-group",
        clientIdPrefix = "kfk-demo-client")
public class KfkListener {

    @KafkaHandler
    public void process(@Payload UserData ud,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        System.out.println(String.format("topic: %s, partition: %d, userData: %s", topic, partition, ud));
    }

    @KafkaHandler(isDefault = true)
    public void process(Object obj) {
        System.out.println(obj);
    }
}

// 接收字符串方式
@Slf4j
@Component
@KafkaListener(id = "kfk-demo2", topics = {"UserDataTopic"})
public class KfkUserDataTopicListener {

    @KafkaHandler
    public void process(String userDataStr) {
        UserData userData = new Gson().fromJson(userDataStr, UserData.class);
        log.info("username: {}, age: {}", userData.getUsername(), userData.getAge());
    }
}
3.1.5 Topic自动创建
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
public class KafkaConfig {

    @Bean
    public NewTopic userDataTopic() {
        return new NewTopic("UserDataTopic", 3, (short) 1);
    }
}

参考文档

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
(三)Kafka系列:与Kafka的第一次亲密接触
便于大家对本章内容的理解,我重新整理了一下Kafka中的部分重要概念,以表格的方式呈现出来,请见下表所示:
爪哇缪斯
2023/09/05
2110
(三)Kafka系列:与Kafka的第一次亲密接触
Kafka教程_图解kafka
推荐【Kafka教程】https://bigbird.blog.csdn.net/article/details/108770504 推荐【rabbitmq教程】https://bigbird.blog.csdn.net/article/details/81436980 推荐【Flink教程】https://blog.csdn.net/hellozpc/article/details/109413465 推荐【SpringBoot教程】https://blog.csdn.net/hellozpc/article/details/107095951 推荐【SpringCloud教程】https://blog.csdn.net/hellozpc/article/details/83692496 推荐【Mybatis教程】https://blog.csdn.net/hellozpc/article/details/80878563 推荐【SnowFlake教程】https://blog.csdn.net/hellozpc/article/details/108248227 推荐【并发限流教程】https://blog.csdn.net/hellozpc/article/details/107582771 推荐【JVM面试与调优教程】https://bigbird.blog.csdn.net/article/details/113888604
全栈程序员站长
2022/11/03
2.1K1
Kafka教程_图解kafka
SpringBoot3集成Kafka
Kafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集成和关键任务应用,基于Zookeeper协调的处理平台,也是一种消息系统,具有更好的吞吐量、内置分区、复制和容错,这使得它成为大规模消息处理应用程序的一个很好的解决方案;
知了一笑
2023/09/01
9340
SpringBoot3集成Kafka
SpringBoot开发案例之整合Kafka实现消息队列
前言 最近在做一款秒杀的案例,涉及到了同步锁、数据库锁、分布式锁、进程内队列以及分布式消息队列,这里对SpringBoot集成Kafka实现消息队列做一个简单的记录。 Kafka简介 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚
小柒2012
2018/06/07
1.3K0
超详细的Kafka教程-从部署到开发到原理都有讲解
在说Kafka之前,假设你有一定的消息队列的知识。知道消息队列的模式(点对点模式,发布/订阅模式),也知道消息队列的优点,如果不知道没关系,去百度或者Google搜索都有相关详细的资料。那么我们接下来说说Kafka。
Lvshen
2022/05/05
10.2K0
超详细的Kafka教程-从部署到开发到原理都有讲解
一文读懂springboot整合kafka
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
QGS
2024/05/03
11.8K0
kafka 集群配置_kafka集群原理
kafka是一个高吞吐的分布式消息队列系统。特点是生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据。消息列队常见场景:系统之间解耦合、峰值压力缓冲、异步通信。
全栈程序员站长
2022/09/27
1.3K0
分布式专题|想进入大厂,你得会点kafka
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目
AI码师
2020/12/11
6360
分布式专题|想进入大厂,你得会点kafka
kafka也没那么难--kafka的安装与简单使用
前短时间在腾讯云上买了一个linux 服务器,决心把kafka这一模快的知识补充起来啦。所以就搞起来。
程序员爱酸奶
2020/03/13
9490
Kafka安装启动入门教程
本文讲如何安装启动kafka,并进行测试,其中zookeepr是kafka自带的,本文基本按照官网文档进行安装启动的,并提出可能会出现的问题。官方文档:http://kafka.apache.org/quickstart 本文虚拟机系统:centos7,不过其他版本的Linux系统是一样的~
小明互联网技术分享社区
2022/10/31
1K2
Kafka安装启动入门教程
kafka教程(一)
kafka是用于构建实时数据管道和流应用程序。具有横向扩展,容错,wicked fast(变态快)等优点,并已在成千上万家公司运行。
用户3467126
2019/07/03
7800
kafka教程(一)
Apache Kafka 消息队列
依赖Zookeeper(帮助Kafka 集群存储信息,帮助消费者存储消费的位置信息)
收心
2022/01/19
7280
Apache Kafka 消息队列
Kafka,ZK集群开发或部署环境搭建及实验
本文属于原创,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net 或者我的CSDN http://blog.csdn.net/freeape
别打名名
2019/12/23
1.3K0
Kafka,ZK集群开发或部署环境搭建及实验
【消息队列 MQ 专栏】消息队列之 Kafka
Kafka 最早是由 LinkedIn 公司开发一种分布式的基于发布/订阅的消息系统,之后成为 Apache 的顶级项目。主要特点如下:
芋道源码
2018/07/31
4.1K0
【消息队列 MQ 专栏】消息队列之 Kafka
Kafka消息队列
Kafka 是一个分布式、支持分区,多副本的基于 zookeeper 的消息队列。使用消息队列,是应用 A 将要处理的信息发送到消息队列然后继续下面的任务,需要该信息的应用 B 从消息队列里面获取信息再做处理,这样做像是多此一举,应用 A 直接发信息给应用 B 不就可以了吗?存在即合理,使用消息队列其作用如下:
晚上没宵夜
2022/05/09
8840
Kafka消息队列
Kafka 快速起步
主要内容: 1. kafka 安装、启动 2. 消息的 生产、消费 3. 配置启动集群 4. 集群下的容错测试 5. 从文件中导入数据,并导出到文件 单机示例 安装 tar -xzf kafka_2.10-0.10.1.1.tgz cd kafka_2.10-0.10.1.1 启动 > bin/zookeeper-server-start.sh \ config/zookeeper.properties > bin/kafka-server-start.sh \ config/server.properti
dys
2018/04/04
9800
Kafka 常用运维脚本
集群管理 (1)启动 broker $ bin/kafka-server-start.sh daemon <path>/server.properties (2)关闭 broker $ bin/kafka-server-stop.sh topic 管理 kafka-topics.sh 脚本 # 创建主题 $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 64 --replication-factor 3 --topi
张乘辉
2020/08/27
1.4K0
Kafka单节点至集群的安装部署及注意事项
kafka简介 kafka的重要作用: 发布和订阅 像消息传递系统一样读写数据流。 处理 编写实时响应事件的可伸缩流处理应用程序 存储系统 将数据流安全地存储在分布式的,副本的,容错存储系统。kafk
Spark学习技巧
2018/01/31
1.8K0
Kafka单节点至集群的安装部署及注意事项
Kafka快速上手(2017.9官方翻译)
  为了帮助国人更好了解、上手kafka,特意翻译、修改了个文档。官方Wiki : http://kafka.apache.org/quickstart
sunsky
2020/08/20
8410
Kafka快速上手(2017.9官方翻译)
光速入门消息队列Kafka
传统单体应用逐渐被SOA架构、微服务体系架构所替代,如此一来系统数目爆炸级增长,原来在一个系统之间的数据交互演变成跨系统、跨区域。
青山师
2023/05/05
4750
光速入门消息队列Kafka
相关推荐
(三)Kafka系列:与Kafka的第一次亲密接触
更多 >
LV.0
这个人很懒,什么都没有留下~
作者相关精选
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验