首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Kafka入门与实战

Kafka入门与实战

作者头像
爪哇缪斯
发布2023-05-10 11:06:58
发布2023-05-10 11:06:58
8620
举报
文章被收录于专栏:爪哇缪斯爪哇缪斯

今天我们来聊一聊现在MQ中最火爆的Kafka吧。关于Kafka的内容还是比较多的。本篇大概15000左右字,大家根据自己的需求来看吧。本文的大纲如下图所示: 一、消息队列的作用是什么?

1.1> 消息队列的优点

  • 可以实现系统解耦 假设有A系统,那么它会产生出业务数据,这个时候,有B系统和C系统时需要A系统产生的业务数据的。那么,如果没有引入MQ,就需要在代码中硬编码调用B系统和C系统的接口来传输数据。那么加入由引入了D系统和E系统,并且下线了B系统,那么就需要修改代码,添加调用D系统和E系统的代码,并且要删除掉调用B系统的代码。这种设计方式,其实就是A系统要耦合B、C、D、E系统了。违反了低耦合的设计原则。如果我们引入mq,A系统只需要把产生的业务数据发送到MQ即可,下游哪个系统需要这个数据了,就去订阅MQ中的消息。系统A从此不用关系到底谁消费了它的数据,它也可以对下游完全不可见。那么就实现了系统间的解耦。
  • 实现异步调用 像我们常见的秒杀系统,假设分为如下几个步骤,分别为:验证功能(用户合法性、防止重复用户下单、防止同用户频繁请求,库存是否足够,活动是否有效...),下单功能,库存扣减功能,支付功能,物流功能,通知功能等等。如果假设每个功能都需要处理200ms,那么总的耗时就是这些功能之和200ms*6=12000ms,如果某个系统处理逻辑较多,就会造成整个串行的业务流程耗时更久。那么,其实在秒杀的场景,我们只需要告诉用户是否抢单成功即可,什么支付啊,物流这些,都可以异步去完成,那么如果我们引入MQ,在用户调用验证功能和下单功能之后,就将抢单结果返回给客户,那么就只需要400ms了。响应速度提升了3倍。所以,针对不同的业务场景,我们就可以采用异步调用的方式,提升系统的响应速度和用户体验。
  • 流量削峰 假设我们有3台服务器,里面部署了我们的服务,且我们假设每台服务器可以抗住1000qps,那么,当运营同学做了几个促销方案,在活动期间,招揽了大批的用户,本来平时可以抗住3*1000qps的系统,瞬间来了1万qps,那么我们怎么办呢?我们当然可以选择在活动前期,先准备好10+的机器,这样就可以抗住1万qps,但是,如果其中某台机器出问题了呢?那么就会造成系统的雪崩。而且准备多台服务器,如果高流量没有持续太久,机器也浪费了。所以,针对这种情况,我们可以采取引入mq的方式,如果请求突然增多,消息会积压在mq中,而不会瞬间压垮服务器,那么,在服务器端,就起到了流量削峰的作用。

1.2> 采用MQ的缺点

  • 系统可用性降低 如果我们有A、B、C三个系统,我们引入MQ后,A系统生产出来的业务数据会作为消息发送给MQ,然后B系统和C系统对消息进行消费处理。但是,如果A、B、C这三个系统都没有任何异常,而只是MQ突然挂掉了,那么就会造成整个服务的可不用。针对这个问题,我们可以采用配置MQ集群的方式,提高MQ的高可用性。
  • 提升系统的复杂度 系统架构设计的一个普遍共识就是,系统中引入的东西越多,那么系统的复杂度就越高,问题也会越多。比如,我们引入了MQ,那么需要保证消息不会被重复消费,要处理消息丢失的情况,如果多消息作为一个业务原子性操作,那么如果保证消息的顺序等等。所以,在日常的系统设计中,只有业务场景真的需要异步处理,我们才会去选择MQ,而不是为了使用MQ而去引入MQ。
  • 一致性问题 还是以上面的场景为例,如果A系统发送消息成功后,那么在A系统中,会认为整个流程是成功的。但是,在消费端,如果B系统或C系统有异常,那么整个业务流程其实是失败的。那么A系统的成功和消费端的失败就是不一致了。所以,当引入MQ的时候,我们就需要针对这种情况去做最终一致性的处理。可以根据不同步骤的成功与否,去做补偿或者业务回滚。

二、常用MQ介绍

2.1> Kafka

  • Apache Kafka是一个分布式消息发布订阅系统。它最初由LinkedIn公司基于独特的设计实现为一个分布式的日志提交系统(a distributed commit log),之后成为Apache项目的一部分。Kafka性能高效、可扩展良好并且可持久化。它的分区特性,可复制和可容错都是其不错的特性。

2.1.1> 主要特性

  • 快速持久化:可以在O(1)的系统开销下进行消息持久化;
  • 高吞吐:在一台普通的服务器上既可以达到10W/s的吞吐速率;
  • 完全的分布式系统:Broker、Producer和Consumer都原生自动支持分布式,自动实现负载均衡;
  • 支持同步和异步复制两种高可用机制;
  • 支持数据批量发送和拉取;
  • 零拷贝技术(zero-copy):减少IO操作步骤,提高系统吞吐量;
  • 数据迁移、扩容对用户透明;
  • 无需停机即可扩展机器;
  • 其他特性:丰富的消息拉取模型、高效订阅者水平扩展、实时的消息订阅、亿级的消息堆积能力、定期删除机制;

2.1.2>优点

  • 客户端语言丰富:支持Java、.Net、PHP、Ruby、Python、Go等多种语言;
  • 高性能:单机写入TPS约在100万条/秒,消息大小10个字节;
  • 提供完全分布式架构,并有replica机制,拥有较高的可用性和可靠性,理论上支持消息无限堆积;
  • 支持批量操作;
  • 消费者采用Pull方式获取消息。消息有序,通过控制能够保证所有消息被消费且仅被消费一次;
  • 有优秀的第三方KafkaWeb管理界面Kafka-Manager;
  • 在日志领域比较成熟,被多家公司和多个开源项目使用。

2.1.3>缺点

  • Kafka单机超过64个队列/分区时,Load时会发生明显的飙高现象。队列越多,负载越高,发送消息响应时间变长;
  • 使用短轮询方式,实时性取决于轮询间隔时间;
  • 消费失败不支持重试;
  • 支持消息顺序,但是一台代理宕机后,就会产生消息乱序;
  • 社区更新较慢。

2.2> RocketMQ

  • RocketMQ出自阿里的开源产品,用Java语言实现,在设计时参考了Kafka,并做出了自己的一些改进,消息可靠性上比Kafka更好。RocketMQ在阿里内部被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。

2.2.1> 主要特性

  • 基于 队列模型:具有高性能、高可靠、高实时、分布式等特点;
  • Producer、Consumer、队列都支持分布式;
  • Producer向一些队列轮流发送消息,队列集合称为Topic。Consumer如果做广播消费,则一个Consumer实例消费这个Topic对应的所有队列;如果做集群消费,则多个Consumer 实例平均消费这个Topic对应的队列集合;
  • 能够保证严格的消息顺序;
  • 提供丰富的消息拉取模式;
  • 高效的订阅者水平扩展能力;
  • 实时的消息订阅机制;
  • 亿级消息堆积 能力;
  • 较少的外部依赖。

2.2.2> 优点

  • 单机支持1万以上持久化队列;
  • RocketMQ的所有消息都是持久化的,先写入系统PAGECACHE,然后刷盘,可以保证内存与磁盘都有一份数据,而访问时,直接从内存读取。
  • 模型简单,接口易用(JMS的接口很多场合并不太实用);
  • 性能非常好,可以允许大量堆积消息在Broker中;
  • 支持多种消费模式,包括集群消费、广播消费等;
  • 各个环节分布式扩展设计,支持主从和高可用;
  • 开发度较活跃,版本更新很快。

2.2.3> 缺点

  • 支持的 客户端语言不多,目前是Java及C++,其中C++还不成熟;
  • RocketMQ社区关注度及成熟度也不及前两者;
  • 没有Web管理界面,提供了一个 CLI (命令行界面) 管理工具带来查询、管理和诊断各种问题;
  • 没有在MQ核心里实现JMS等接口;

2.3> RabbitMQ

  • RabbitMQ于2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

2.3.1> 主要特性

  • 可靠性:提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制;
  • 灵活的路由:消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做RabbitMQ的插件来使用;
  • 消息集群:在相同局域网中的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用;
  • 队列高可用:队列可以在集群中的机器上进行镜像,以确保在硬件问题下还保证消息安全;
  • 支持多种协议:支持多种消息队列协议;
  • 支持多种语言:用Erlang语言编写,支持只要是你能想到的所有编程语言;
  • 管理界面:RabbitMQ有一个易用的用户界面,使得用户可以监控和管理消息Broker的许多方面;
  • 跟踪机制:如果消息异常,RabbitMQ 提供消息跟踪机制,使用者可以找出发生了什么;
  • 插件机制:提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

2.3.2> 优点

  • 由于Erlang语言的特性,消息队列性能较好,支持高并发;
  • 健壮、稳定、易用、跨平台、支持多种语言、文档齐全;
  • 有消息确认机制和持久化机制,可靠性高;
  • 高度可定制的路由;
  • 管理界面较丰富,在互联网公司也有较大规模的应用,社区活跃度高。

2.3.3> 缺点

  • 尽管结合 Erlang 语言本身的并发优势,性能较好,但是不利于做二次开发和维护;
  • 实现了代理架构,意味着消息在发送到客户端之前可以在中央节点上排队。此特性使得RabbitMQ易于使用和部署,但是使得其运行速度较慢,因为中央节点 增加了延迟,消息封装后也比较大;需要学习比较复杂的接口和协议,学习和维护成本较高。

2.4> ActiveMQ

  • ActiveMQ是由Apache出品,ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现。它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。

2.4.1> 主要特性

  • 服从JMS规范:JMS 规范提供了良好的标准和保证,包括:同步 或 异步 的消息分发,一次和仅一次的消息分发,消息接收和订阅等等。遵从JMS规范的好处在于,不论使用什么JMS实现提供者,这些基础特性都是可用的;
  • 连接灵活性:ActiveMQ提供了广泛的连接协议,支持的协议有:HTTP/S,IP多播,SSL,TCP,UDP等等。对众多协议的支持让ActiveMQ拥有了很好的灵活性;
  • 支持的协议种类多:OpenWire、STOMP、REST、XMPP、AMQP;
  • 持久化插件和安全插件:ActiveMQ提供了多种持久化选择。而且,ActiveMQ的安全性也可以完全依据用户需求进行自定义鉴权和授权;
  • 支持的客户端语言种类多:除了Java之外,还有:C/C++,.NET,Perl,PHP,Python,Ruby;
  • 代理集群:多个ActiveMQ代理可以组成一个集群来提供服务;
  • 异常简单的管理:ActiveMQ是以开发者思维被设计的。所以,它并不需要专门的管理员,因为它提供了简单又使用的管理特性。有很多中方法可以监控ActiveMQ不同层面的数据,包括使用在JConsole或者在ActiveMQ的WebConsole中使用JMX。通过处理JMX的告警消息,通过使用命令行脚本,甚至可以通过监控各种类型的日志。

2.4.2> 优点

  • 跨平台(JAVA编写与平台无关,ActiveMQ几乎可以运行在任何的JVM上);
  • 可以用JDBC:可以将数据持久化到数据库。虽然使用JDBC会降低ActiveMQ的性能,但是数据库一直都是开发人员最熟悉的存储介质;
  • 支持JMS规范:支持JMS规范提供的统一接口;
  • 支持自动重连和错误重试机制;
  • 有安全机制:支持基于shiro,jaas等多种安全配置机制,可以对Queue/Topic进行认证和授权;
  • 监控完善:拥有完善的监控,包括WebConsole,JMX,Shell命令行,Jolokia的RESTful API;
  • 界面友善:提供的WebConsole可以满足大部分情况,还有很多第三方的组件可以使用,比如hawtio;

2.4.3> 缺点

  • 社区活跃度不及RabbitMQ高;
  • 根据其他用户反馈,会出莫名其妙的问题,会丢失消息;
  • 目前重心放到activemq6.0产品Apollo,对5.x的维护较少;
  • 不适合用于上千个队列的应用场景;

2.5> ZeroMQ

  • 号称史上最快的消息队列,它实际类似于Socket的一系列接口,他跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。ZMQ用于node与node间的通信,node可以是主机或者是进程。
  • 引用官方的说法:“ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD套接字之上的一层封装。ZMQ让编写高性能网络应用程序极为简单和有趣。”
  • 特点是: 高性能,非持久化 跨平台:支持Linux、Windows、OS X等 多语言支持;C、C++、Java、.NET、Python等30多种开发语言 可单独部署或集成到应用中使用 可作为Socket通信库使用
  • 与RabbitMQ相比,ZMQ并不像是一个传统意义上的消息队列服务器,事实上,它也根本不是一个服务器,更像一个底层的网络通讯库在Socket API之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的API接口。支持“Request-Reply “,”Publisher-Subscriber“,”Parallel Pipeline”三种基本模型和扩展模型。
  • ZeroMQ高性能设计要点:
  • 无锁的队列模型 对于跨线程间的交互(用户端和session)之间的数据交换通道pipe,采用无锁的队列算法CAS;在pipe两端注册有异步事件,在读或者写消息到pipe的时,会自动触发读写事件。
  • 批量处理的算法 对于传统的消息处理,每个消息在发送和接收的时候,都需要系统的调用,这样对于大量的消息,系统的开销比较大,zeroMQ对于批量的消息,进行了适应性的优化,可以批量的接收和发送消息。
  • 多核下的线程绑定,无须CPU切换 区别于传统的多线程并发模式,信号量或者临界区, zeroMQ充分利用多核的优势,每个核绑定运行一个工作者线程,避免多线程之间的CPU切换开销。

2.6> 对比

  • 总结:消息队列的选型需要根据具体应用需求而定,ZeroMQ小而美,RabbitMQ大而稳,Kakfa和RocketMQ快而强劲。

2.7> 适用场景

2.7.1> 从公司基础建设力量角度出发

中小型软件公司,建议选RabbitMQ

  • 首先:erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便。他的弊端也很明显,虽然RabbitMQ是开源的,然而国内有几个能定制化开发erlang的程序员呢?所幸,RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug,这点对于中小型公司来说十分重要。
  • 其次:不考虑Kafka的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以kafka排除。
  • 最后:不考虑RocketMQ的原因是,RocketMQ是阿里出品,如果阿里放弃维护RocketMQ,中小型公司一般抽不出人来进行RocketMQ的定制化开发,因此不推荐。

大型软件公司,根据具体使用在RocketMQ和kafka之间二选一。

  • 一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。
  • 针对RocketMQ,大型软件公司也可以抽出人手对RocketMQ进行定制化开发,毕竟国内有能力改JAVA源码的人,还是相当多的。
  • 至于kafka,根据业务场景选择,如果有日志采集功能,肯定是首选kafka了。

2.7.2> 从业务场景角度出发

  • RocketMQ定位于非日志的可靠消息传输(日志场景也OK),目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。
  • Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
  • RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

三、安装

3.1> 安装ZooKeeper

  • 首先进入Zookeeper官网 https://zookeeper.apache.org/
  • 下载最新版本ZooKeeper
  • 解压到本地路径,进入ZooKeeper的conf目录下,复制zoo_sample.cfg配置文件,命名为zoo.cfg
  • zoo.cfg配置文件中各配置项的含义如下所示:
代码语言:javascript
复制
# zookeeper时间配置中的基本单位(毫秒)
tickTime=2000

# 允许follower初始化连接到leader最大时长,它表示tickTime时间的倍数 即:initLimit*tickTime
initLimit=10

# 运行follower与leader数据同步最大时长,它表示tickTime时间倍数 即:syncLimit*tickTime
syncLimit=5

# zookeeper数据存储目录及日志保存目录(如果没有指明dataLogDir,则日志也保存在这个文件中)
dataDir=/tmp/zookeeper

# 对客户端提供的端口号
clientPort=2181

# 单个客户端于zookeeper最大并发连接数
maxClientCnxns=60

# 保存的数据快照数量,之外的将会被清除
autopurge.snapRetainCount=3

# 自动出发清除任务时间间隔,以小时为单位。默认为0,表示不自动清除
autopurge.purgeInterval=1

## Metrics Providers
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
  • 启动zookeeper
  • 查看zookeeper的运行状态

3.2> 安装Kafka

  • 首先进入Kafka官网下载kafka http://kafka.apache.org/
  • 解压,然后进入config目录下,编辑server.properties配置文件
  • server.properties配置项解析
  • 我们需要关注如下几个配置内容
  • broker的序号

broker.id=0

  • 当前kafka的监听地址

listeners=PLAINTEXT://localhost:9092

  • 日志的存储路径

log.dirs=/Users/muse/kafka_2.13-3.0.0/kafka-logs

  • zookeeper的服务地址

zookeeper.connect=localhost:2181

  • 修改完响应配置后,启动Kafka,并通过查看kafka的进程来判断是否启动起来了

3.3> 安装EFAK

  • 首先进入EFAK官网下载Eagle:http://download.kafka-eagle.org/
  • 修改EFAK的conf目录下配置文件——system-config.properties
  • 配置环境变量KE_HOME,并调用source .zshrc使其立即生效。(如果不配置的话,启动时会报错)
  • 启动EFAK
  • 启动成功界面
  • 访问EFAK界面 http://192.168.1.3:8048,用户名:admin 密码:123456
  • 管理界面如下所示:

四、kafka基础知识

4.1> kafka常用的术语如下所示

4.2> 相关指令

https://kafka.apache.org/documentation/#quickstart

4.3> 创建和查看主题

  • bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic muse --partitions 1 --replication-factor 1
  • ./kafka-topics.sh --list --bootstrap-server localhost:9092

4.4> 开启消息发送端

  • ./kafka-console-producer.sh --topic muse --bootstrap-server localhost:9092
  • kafka自带了一个producer的命令客户端——kafka-console-producer.sh,它可以从本地文件中读取内容,或者我们也可以从命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一行都会被当做一个独立的消息。
  • 使用kafka-console-producer.sh,指定发送到的topic和kafka服务器的地址
  • 也可以使用./kafka-console-producer.sh --topic muse --broker-list localhost:9092

4.5> 开启消息接收端

  • 【方式一】从最后一条消息的偏移量+1开始消费,即:启动客户端之前的消息是不会被消费的

./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092

  • 【方式二】从头开始消费,即:启动客户端之前的消息会被消费

./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --from-beginning

  • 消息是会被存储在kafka中的文件里的,并且是顺序存储的,消息有偏移量的概念,所以我们可以指定偏移量去读取某个位置的消息。
  • 如果说消息会被存储到kafka中,那么,发到topic=muse的消息存储到哪里了呢?

答:我们上面介绍kafka配置文件server.properties时,配置了一个属性为log.dirs,那么这个路径就是存储路径,如下所示:

【解释】其中的00000000000000000000.log就是消息存储的文件。

4.6> 单播消息

  • 一个消费组里,只会有一个消费者能够消费到某个topic中的消息。
  • 操作演示
  • 首先,打开两个窗口,分别执行如下语句开启消费端,那么就在“museGroup1”消费组中创建了两个Consumer

./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --consumer-property group.id=museGroup1

  • 然后:Producer端发送3条消息,我们发现,只有一个Consumer收到了消息。如下所示:
    • 生产者
  • 消费者1(消费组:museGroup1)
  • 消费者2(消费组:museGroup1)
  • 处理流程如下图所示:

4.7> 多播消息

  • 当业务场景中,需要同一个topic下的消息被多个消费者消费,那么我们可以采用创建多个消费组的方式,那么这种方式就是多播消息。
  • 操作演示
  • 首先,打开两个窗口,在其中一个窗口执行如下指令:

./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --consumer-property group.id=museGroup1

  • 然后:在另一个窗口执行如下指令

./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --consumer-property group.id=museGroup2

  • 最后,Producer端发送3条消息,我们发现,两个Consumer都收到了消息。如下所示:
    • 生产者
  • 消费者1(消费组:museGroup1)
  • 消费者2(消费组:museGroup2)
  • 处理流程如下图所示:

4.8> 查看消费组信息

  • 查看当前主题下有哪些消费组
  • 查看消费组中的具体信息

【解释】

  • CURENT-OFFSET:当前消费组已消费消息的偏移量。
  • LOG-END-OFFSET:主题对应分区消息的结束偏移量(水位HW)。
  • LAG:当前消费组堆积未消费的消息数量。
  • 操作演示
  • 首先,我们先看一下消费组museGroup1的具体信息
  • 然后,我们关闭museGroup1的所有Consumer,使得这个消费组不具备消费消息的能力
  • 最后,我们向消费组museGroup1中发送3条消息,我们再来看一下消费组信息。

五、kafka集群

5.1> 搭建kafka集群

5.1.1> 解压kafka压缩包

  • 创建kafka-cluster目录,并解压kafka_2.13-3.0.0.tgz为3份kafka

5.1.2> 修改配置文件server.properties

  • 修改kafka1的server.propertis配置文件
代码语言:javascript
复制
broker.id=0
listeners=PLAINTEXT://localhost:9092
log.dirs=/Users/muse/kafka-cluster/kafka1/kafka-logs
  • 修改kafka2的server.propertis配置文件
代码语言:javascript
复制
broker.id=1
listeners=PLAINTEXT://localhost:9093
log.dirs=/Users/muse/kafka-cluster/kafka2/kafka-logs
  • 修改kafka3的server.propertis配置文件
代码语言:javascript
复制
broker.id=2
listeners=PLAINTEXT://localhost:9094
log.dirs=/Users/muse/kafka-cluster/kafka3/kafka-logs

5.1.3> 启动三个节点

  • 启动kafka1,kafka2和kafka3
代码语言:javascript
复制
./kafka-cluster/kafka1/bin/kafka-server-start.sh -daemon ./kafka-cluster/kafka1/config/server.properties
./kafka-cluster/kafka2/bin/kafka-server-start.sh -daemon ./kafka-cluster/kafka2/config/server.properties
./kafka-cluster/kafka3/bin/kafka-server-start.sh -daemon ./kafka-cluster/kafka3/config/server.properties

5.1.4> 验证3个Broker是否启动成功

  • 首先,可以通过ps -ef | grep kafka来查看进程是否启动
  • 其次:在zookeeper中查看/brokers/ids下中是否有相应的brokerId目录生成

5.2> 分区和副本

5.2.1> 分区

  • 一个主题中的消息量是非常大的,因此可以通过分区的设置,来分布式存储这些消息。并且分区也可以提供消息并发存储的能力。
  • 比如:给一个Topic创建了4个分区,那么Topic中的消息就会分别存放在这4个分区中。

5.2.2> 副本

  • 如果分片挂掉了,数据就丢失了。那么为了提高系统的可用性。我们把分片复制多个,这就是副本了。
  • 但是,副本的产生,也会随之带来数据一致性的问题,即:有的副本写数据成功,但是有的副本写数据失败。
  • Leader
  • kafka的读写操作都发生在leader上,leader负责把数据同步给follower。
  • 当leader挂掉了,那么经过主从选举,从多个follower中选举产生一个新的leader。
  • Follower
  • follower接收leader同步过来的数据,它不提供读写(主要是为了保证多副本数据与消费的一致性)

5.2.3> __consumer_offsets-N

  • kafka默认创建了一个拥有50个分区的主题,名称为:“__consumer_offsets”,
  • consumer会把将消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去的时候, 【key】=consumerGroupID+topic+分区号 【value】=当前offset的值
  • Kafka会定期清理topic里的消息,最后就保留最新的那条数据。
  • 因为__consumer_offsets可能会接收高并发的请求,所以默认分配了50个分区(可以通过offsets.topic.num.partitions进行配置),这样可以通过增加机器的方式应对高并发的业务场景。
  • 可以通过如下公式确定consumer消费的offset要提交到哪个__consumer_offsets

hash(consumerGroupID)% 主题“__consumer_offsets”的分区数量

5.2.4> 操作演示

  • 为名称为muse-rp的Topic创建2个分区(--partitions)3个副本(--replication-factor)

./kafka-topics.sh --create --topic muse-rp --bootstrap-server localhost:9092 --partitions 2 --replication-factor 3

  • 查看分区和副本分布情况

./kafka-topics.sh --describe --topic muse-rp --bootstrap-server localhost:9092

【解释】

  • Isr

“可以同步”和“已同步”的节点都会被存入isr集合中,如果isr中的节点性能较差,那么将会从isr集合中被剔除。

当leader节点挂掉,需要从follower节点列表中选举出主节点时,其实就是从isr集合中选举出来的。

  • 分区和副本的分布如下图所示:
  • 我们随便找一个名为kafka1的Broker,来看一下.log文件和.index文件

【解释】

  • 由于将Topic=“muse-rp”的主题指定了2个分区
  • 所以产生了两个目录muse-rp-0muse-rp-1
  • 里面存储的00000000000000000000.log就是消息存储的文件。
  • 其中00000000000000000000.index存储的是消息的稀疏索引。如下图所示:

5.3> 集群消息的发送与消费

5.3.1> 消息放送

  • ./kafka-console-producer.sh --topic muse-rp --bootstrap-server localhost:9092 localhost:9093 localhost:9094
  • 发送10个消息

5.3.2> 消息消费

  • ./kafka-console-consumer.sh --topic muse-rp --bootstrap-server localhost:9092 localhost:9093 localhost:9094 --consumer-property group.id=museGroup1
  • 在消息组museGroup1中开启第一个消费者
  • 在消息组museGroup1中开启第二个消费者

5.3.3> 总结

  • 一个partition只能被一个消费组中的一个消费者消费,这样设计的目的是保证消息的有序性,但是在多个partition的多个消费者消费的总顺序性是无法得到保证的。
  • partition的数量决定了消费组中Consumer的数量,建议同一个消费组中的Consumer数量不要超过partition的数量,否则多余的Consumer就无法消费到消息了
  • 但是,如果消费者挂掉了,那么就会触发rebalance机制,会由其他消费者来消费该分区。
  • 如下图所示:

六、利用Kafka的Java Client实现消息收发

  • 项目中引入kafka-clients的依赖(也可以直接引入spring-kafka的依赖,里面包含了kafka-clients)

6.1> 生产者端

6.1.1> 初始化配置

  • 创建配置对象Properties
代码语言:javascript
复制
Properties properties = new Properties();
  • 配置kafka的Broker列表
代码语言:javascript
复制
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
  • 发出消息持久化机制参数
代码语言:javascript
复制
properties.put(ProducerConfig.ACKS_CONFIG, "1");

【解释】

  • acks=0

表示producer不需要等待任何broker确认收到消息的ACK回复,就可以继续发送下一条消息。性能最高,但是最容易丢失消息。

  • acks=1

表示至少等待leader已经成功将数据写入本地log,但是不需要等待所有follower都写入成功,就可以继续发送下一条消息。

这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息就会丢失。

  • acks=-1/all

需要等待所有min.insync.replicas(默认为1,推荐配置>=2)这个参数配置的副本个数都成功写入日志。

这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。

  • 配置失败重试机制
代码语言:javascript
复制
properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 失败重试3次
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); // 重试间隔300ms

【解释】

  • 发送失败重试的次数,默认是间隔100ms
  • 重试能保证消息发送的可靠性,但是也可能造成消息重复发送,所以需要在消费者端做好幂等性处理
  • 配置缓存相关信息
代码语言:javascript
复制
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32*1024*1024); // 设置发送消息的本地缓冲区,默认值为32MB
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024); //  设置批量发送消息的大小,默认值为16KB
properties.put(ProducerConfig.LINGER_MS_CONFIG, 10); // batch等待时间,默认值为0,表示消息必须立即被发送

【解释】

  • Producer的消息会先发送到本地缓冲区(BUFFER_MEMORY_CONFIG),而不是发送一次消息连接一次kafka。
  • kafka本地线程会从缓冲区去取数据(BATCH_SIZE_CONFIG),然后批量发送到Broker,即:一个批次满足16KB就会发送出去。
  • LINGER_MS_CONFIG的默认值为0,表示消息必须立即被发送,但这样会影响性能。 设置10ms也就是说Producer消息发送完后会进入本地的batch中;如果10ms内,这个batch满足了16KB,那么就会随着batch一起被发送出去。 如果10ms内,batch没满,那么也必须要把消息发送出去,不能让消息的发送延迟时间太长。
  • 配置key和value的序列化实现类
代码语言:javascript
复制
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

6.1.2> 同步消息发送

  • 同步消息发送代码如下所示

6.1.3> 异步消息发送


6.2> 消费者端

6.2.1> 初始化配置

  • 创建配置对象Properties
代码语言:javascript
复制
Properties properties = new Properties();
  • 配置kafka的Broker列表
代码语言:javascript
复制
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");

  • 配置消费组
代码语言:javascript
复制
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "museGroup");
  • offset的重置策略
代码语言:javascript
复制
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

【解释】

  • offset的重置策略——例如创建一个新的消费组,offset是不存在的,如何对offset赋值消费 latest:默认值,只消费自己启动之后发送到主题的消息。 earliest:第一次从头开始消费,以后按照消费offset记录继续消费。
  • 心跳相关配置
代码语言:javascript
复制
/** Consumer给Broker发送心跳的时间间隔 */
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);

/** 如果超过10秒没有接收到消费者的心跳,则会把消费者踢出消费组,然后重新进行rebalance操作,把分区分配给其他消费者 */
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10*1000);
  • poll相关配置
代码语言:javascript
复制
/** 一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置 */
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

/** 如果两次poll的时间超出了30秒的时间间隔,kafka会认为整个Consumer的消费能力太弱,会将它踢出消费组。将分区分配给其他消费者 */
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30*1000);
  • 配置key和value的反序列化实现类
代码语言:javascript
复制
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

6.2.2> 自动提交offset

  • 自动提交offset
  • 当消费者向Broker的log中poll到消息后,默认情况下,会向broker中名称为“__consumer_offsets”的Topic发送offset偏移量。
  • 自动提交会出现丢失消息的情况

因为如果Consumer还没消费完poll下来的消息就自动提交了偏移量,那么此时如果Consumer挂掉了,那么下一个消费者会从已经提交的offset的下一个位置开始消费消息。那么之前没有被消费的消息就丢失了。

6.2.3> 手动提交offset

  • 手动提交offset

当消费者从kafka的Broker日志文件中poll到消息并且消费完毕之后。再手动提交当前的offset。

七、SpringBoot集成

6.1> 创建Spring项目

  • 创建SpringBoot项目,引入kafka依赖
  • 我们看到,SpringBoot自动引入的kafka版本就是我们所采用的最新版本3.0.0

6.2> 生产者端

  • 初始化生产者配置
代码语言:javascript
复制
spring:
  kafka:
    producer:
      bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 # kafka集群
      acks: 1 # 应答级别:多少个分区副本备份完成时,向生产者发送ack确认(可选0、1、-1/all)
      retries: 3 # 重试次数
      buffer-memory: 33554432 # 生产端缓冲区大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      batch-size: 16384 #批量大小,默认16kb
      properties:
        # 提交延时,当Producer积累的消息达到batch-size或者接收到消息linger.ms后,生产者就会将消息提交给kafka
        # linger.ms等于0,表示没当接收到一条消息的时候,就提交给kafka,这个时候batch-size上面配置的值就无效了
        linger.ms: 0
  • 同步消息发送
代码语言:javascript
复制
private final static String TOPIC_NAME = "muse-rp";
private final static Integer PARTITION_ONE = 0;
private final static Integer PARTITION_TWO = 1;

@Resource
private KafkaTemplate<String, Message> kafkaTemplate;

/**
 * 同步阻塞——消息发送
 */
public void testBlockingSendMsg() throws Throwable {
    Message message;
    for (int i=0; i< 5; i++) {
        message = new Message(i, "BLOCKING_MSG_SPRINGBOOT_" + i);
        ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(TOPIC_NAME, PARTITION_ONE,
                "" + message.getMegId(), message);
        SendResult<String, Message> sendResult = future.get();
        RecordMetadata recordMetadata = sendResult.getRecordMetadata();
        log.info("---BLOCKING_MSG_SPRINGBOOT--- [topic]={}, [partition]={}, [offset]={}", recordMetadata.topic(),
                recordMetadata.partition(), recordMetadata.offset());
    }
}
  • 异步消息发送
代码语言:javascript
复制
/**
 * 异步回调——消息发送
 */
public void testNoBlockingSendMsg() {
    Message message;
    for (int i=0; i< 5; i++) {
        message = new Message(i, "NO_BLOCKING_MSG_SPRINGBOOT_" + i);
        ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(TOPIC_NAME, PARTITION_TWO,
                "" + message.getMegId(), message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("消息发送失败!", ex);
            }

            @Override
            public void onSuccess(SendResult<String, Message> result) {
                RecordMetadata recordMetadata = result.getRecordMetadata();
                log.info("---NO_BLOCKING_MSG_SPRINGBOOT---[topic]={}, [partition]={}, [offset]={}",
                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
            }
        });
    }
}

6.3> 消费者端

  • 初始化消费者配置
代码语言:javascript
复制
spring:
  kafka:
    consumer:
      group-id: museGroup # 消费组id
      enable-auto-commit: true # 是否自动提交offset
      auto-commit-interval: 1000 # 提交offset延时(接收到消息后多久提交offset)
      # 当kafka中没有初始offset或offset超出范围时,将自动重置offset
      # earliest:第一次从头开始消费,以后按照消费offset记录继续消费
      # latest(默认):只消费自己启动之后发送到主题的消息。
      auto-offset-reset: latest
      max-poll-records: 500 # 批量消费每次最多消费多少条记录
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        session.timeout.ms: 120000 # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
        request.timeout.ms: 180000 # 消费请求超时时间
        spring:
          json:
            trusted:
              packages: com.muse.kafkademo.entity

    listener:
      ack-mode: record
  • 消费消息
代码语言:javascript
复制
private final static String TOPIC_NAME = "muse-rp";
private final static String CONSUMER_GROUP_NAME = "museGroup";

/**
 * 消息消费演示
 */
@KafkaListener(topics = TOPIC_NAME, groupId = CONSUMER_GROUP_NAME)
// public void listenGroup(ConsumerRecord<String, Message> record, Acknowledgment ack) {
public void listenGroup(ConsumerRecord<String, Message> record) {
    log.info(" [topic]={}, [partition]={}, [offset]={}, [key]={}, [value]={}", record.topic(),
            record.partition(), record.offset(), record.key(), record.value());
    // ack.acknowledge(); 需要enable-auto-commit: false才可以
}
  • 消息消费多配置参数演示
代码语言:javascript
复制
/**
 * 消息消费多配置参数演示
 */
@KafkaListener(
        groupId = "xxGroup",                    // 消费组
        concurrency = "3",                      // 组里创建3个Consumer
        topicPartitions = {
            @TopicPartition(topic = "xxTopic1", // 针对主题"xxTopic1",要消费分区0和分区1
                    partitions = {"0", "1"}),
            @TopicPartition(topic = "xxTopic2", // 针对主题"xxTopic2",要消费分区0和分区1,并且针对分区1,offset被设置为100
                    partitions = "0",
                    partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listenGroupPro(ConsumerRecord<String, Message> record) {
    Message message = record.value();
    log.info("msgId={}, content={}", message.megId, message.content);
}

八、Kafka集群概念补充

8.1> Controller

  • Kafka集群中的Broker在ZK中创建临时序号节点,序号最小的节点也就是最先创建的那个节点,将作为集群的Controller,负责管理整个集群中的所有分区和副本的状态。控制器的作用如下:
  • 当某个分区的leader副本出现故障时,由控制器负责为该分区选举除新的leader副本。
  • 当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。
  • 当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知到。

8.2> Rebalance机制

  • 当消费者没有指明分区消费时,消费组里的消费者和分区关系发生了变化,那么就会触发rebalance机制。
  • 这个机制会重新调整消费者消费哪个分区。
  • 在触发rebalance机制之前,消费者消费那个分区有3中策略:
  • 1> range

通过公式来计算某个消费者消费那个分区。

  • 2> 轮询

大家轮流对分区进行消费。

  • 3> sticky

在触发rebalance之后,在消费者消费的原分区不变的基础上进行调整。

8.3>HW和LEO

  • HW(HighWatermark)俗称高水位,取一个partition对应的ISR中最小的LEO(log-end-offset)作为HW;
  • Consumer最多只能消费到HW所在的位置,每个副本都有HW,Leader和Follower各自负责更新自己的HW的状态。
  • 对于Leader新写入的消息,Consumer不能立刻消费,Leader会等待该消息被所有ISR中的副本同步后更新HW,此时消息才能被Consumer所消费。这样就能保证如果Leader所在的broker失效,该消息仍然可以从新选举的Leader中获取。
  • 具体逻辑如下图所示:

九、Kafka常见面试题

9.1> 如何防止消息丢失

  • 针对发送方 ack1或者-1/all可以防止消息丢失,如果要做到99.9999%,ack要设置为all,并把min.insync.replicas配置成分区备份数
  • 针对接收方 把自动提交修改为手动提交。

9.2> 如何防止消息的重复消费

  • 一条消息被消费者消费多次,如果为了不消费到重复的消息,我们需要在消费端增加幂等性处理,例如:
  • 通过mysql插入业务id作为主键,因为主键具有唯一性,所以一次只能插入一条业务数据。
  • 使用redis或zk的分布式锁,实现对业务数据的幂等操作。

9.3> 如何做到顺序消费

  • 针对发送方 在发送时将ack配置为非0,确保消息至少同步到leader之后再返回ack继续发送。但是,只能保证分区内部的消息是顺序的,而无法保证一个Topic下的多个分区总的消息是有序的。
  • 针对接收方 消息发送到一个分区中,值配置一个消费组的消费者来接收消息,那么这个Consumer所接收到的消息就是有顺序的了,不过这也就牺牲掉了性能。

9.4> 如何解决消息积压的问题

  • 消息积压会导致很多问题,比如:磁盘被打满、Producer发送消息导致kafka性能过慢,然后就有可能发生服务雪崩。解决的方案如下所示:
  • 方案1:提升一个Consumer的处理能力。即:在一个消费者中启动多个线程,让多线程加快消费的速度。
  • 方案2:提升总体Consumer的处理能力。启动多个消费组,增加Consumer的数量从而提高消费能力。
  • 方案3:如果业务运行,设定某个时间内,如果消息仍没有被消费,那么Consumer收到消息后,直接废弃掉,不执行下面的业务逻辑。

9.5> 如何实现延迟队列

  • 应用场景:订单创建成功后如果超过30分钟没有付款,则需要取消订单。
  • 步骤一:创建一个表示“订单30分钟未支付”的Topic order_not_paid_30m:延迟30分钟的消息队列。
  • 步骤二:Producer发送消息的时候,消息内容要带上订单生成的时间create_time。
  • 步骤三:Consumer消费Topic中的消息,如果发现now减去create_time不足30分钟,则不去消费下次,记录当前的offset,不去消费当前以及之后的消息。
  • 步骤四:通过记录的offset去获取消息,如果发现消息已经超过30分钟且订单状态是“未支付”,那么则将订单状态设置为“取消”,然后或许下一个offset的消息。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-12-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 爪哇缪斯 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.1> 消息队列的优点
  • 1.2> 采用MQ的缺点
  • 二、常用MQ介绍
    • 2.1> Kafka
      • 2.1.1> 主要特性
      • 2.1.2>优点
      • 2.1.3>缺点
    • 2.2> RocketMQ
      • 2.2.1> 主要特性
      • 2.2.2> 优点
      • 2.2.3> 缺点
    • 2.3> RabbitMQ
      • 2.3.1> 主要特性
      • 2.3.2> 优点
      • 2.3.3> 缺点
    • 2.4> ActiveMQ
      • 2.4.1> 主要特性
      • 2.4.2> 优点
      • 2.4.3> 缺点
    • 2.5> ZeroMQ
    • 2.6> 对比
    • 2.7> 适用场景
      • 2.7.1> 从公司基础建设力量角度出发
      • 2.7.2> 从业务场景角度出发
  • 三、安装
    • 3.1> 安装ZooKeeper
    • 3.2> 安装Kafka
    • 3.3> 安装EFAK
  • 四、kafka基础知识
    • 4.1> kafka常用的术语如下所示
    • 4.2> 相关指令
    • 4.3> 创建和查看主题
    • 4.4> 开启消息发送端
    • 4.5> 开启消息接收端
    • 4.6> 单播消息
    • 4.7> 多播消息
    • 4.8> 查看消费组信息
  • 五、kafka集群
    • 5.1> 搭建kafka集群
      • 5.1.1> 解压kafka压缩包
      • 5.1.2> 修改配置文件server.properties
      • 5.1.3> 启动三个节点
      • 5.1.4> 验证3个Broker是否启动成功
    • 5.2> 分区和副本
      • 5.2.1> 分区
      • 5.2.2> 副本
      • 5.2.3> __consumer_offsets-N
      • 5.2.4> 操作演示
    • 5.3> 集群消息的发送与消费
      • 5.3.1> 消息放送
      • 5.3.2> 消息消费
      • 5.3.3> 总结
  • 六、利用Kafka的Java Client实现消息收发
    • 6.1> 生产者端
      • 6.1.1> 初始化配置
      • 6.1.2> 同步消息发送
      • 6.1.3> 异步消息发送
    • 6.2> 消费者端
      • 6.2.1> 初始化配置
      • 6.2.2> 自动提交offset
      • 6.2.3> 手动提交offset
  • 七、SpringBoot集成
    • 6.1> 创建Spring项目
    • 6.2> 生产者端
    • 6.3> 消费者端
  • 八、Kafka集群概念补充
    • 8.1> Controller
    • 8.2> Rebalance机制
    • 8.3>HW和LEO
  • 九、Kafka常见面试题
    • 9.1> 如何防止消息丢失
    • 9.2> 如何防止消息的重复消费
    • 9.3> 如何做到顺序消费
    • 9.4> 如何解决消息积压的问题
    • 9.5> 如何实现延迟队列
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档