今天我们来聊一聊现在MQ中最火爆的Kafka吧。关于Kafka的内容还是比较多的。本篇大概15000左右字,大家根据自己的需求来看吧。本文的大纲如下图所示:
一、消息队列的作用是什么?
1.1> 消息队列的优点
- 可以实现系统解耦
假设有A系统,那么它会产生出业务数据,这个时候,有B系统和C系统时需要A系统产生的业务数据的。那么,如果没有引入MQ,就需要在代码中硬编码调用B系统和C系统的接口来传输数据。那么加入由引入了D系统和E系统,并且下线了B系统,那么就需要修改代码,添加调用D系统和E系统的代码,并且要删除掉调用B系统的代码。这种设计方式,其实就是A系统要耦合B、C、D、E系统了。违反了低耦合的设计原则。如果我们引入mq,A系统只需要把产生的业务数据发送到MQ即可,下游哪个系统需要这个数据了,就去订阅MQ中的消息。系统A从此不用关系到底谁消费了它的数据,它也可以对下游完全不可见。那么就实现了系统间的解耦。
- 实现异步调用
像我们常见的秒杀系统,假设分为如下几个步骤,分别为:验证功能(用户合法性、防止重复用户下单、防止同用户频繁请求,库存是否足够,活动是否有效...),下单功能,库存扣减功能,支付功能,物流功能,通知功能等等。如果假设每个功能都需要处理200ms,那么总的耗时就是这些功能之和200ms*6=12000ms,如果某个系统处理逻辑较多,就会造成整个串行的业务流程耗时更久。那么,其实在秒杀的场景,我们只需要告诉用户是否抢单成功即可,什么支付啊,物流这些,都可以异步去完成,那么如果我们引入MQ,在用户调用验证功能和下单功能之后,就将抢单结果返回给客户,那么就只需要400ms了。响应速度提升了3倍。所以,针对不同的业务场景,我们就可以采用异步调用的方式,提升系统的响应速度和用户体验。
- 流量削峰
假设我们有3台服务器,里面部署了我们的服务,且我们假设每台服务器可以抗住1000qps,那么,当运营同学做了几个促销方案,在活动期间,招揽了大批的用户,本来平时可以抗住3*1000qps的系统,瞬间来了1万qps,那么我们怎么办呢?我们当然可以选择在活动前期,先准备好10+的机器,这样就可以抗住1万qps,但是,如果其中某台机器出问题了呢?那么就会造成系统的雪崩。而且准备多台服务器,如果高流量没有持续太久,机器也浪费了。所以,针对这种情况,我们可以采取引入mq的方式,如果请求突然增多,消息会积压在mq中,而不会瞬间压垮服务器,那么,在服务器端,就起到了流量削峰的作用。
1.2> 采用MQ的缺点
- 系统可用性降低
如果我们有A、B、C三个系统,我们引入MQ后,A系统生产出来的业务数据会作为消息发送给MQ,然后B系统和C系统对消息进行消费处理。但是,如果A、B、C这三个系统都没有任何异常,而只是MQ突然挂掉了,那么就会造成整个服务的可不用。针对这个问题,我们可以采用配置MQ集群的方式,提高MQ的高可用性。
- 提升系统的复杂度
系统架构设计的一个普遍共识就是,系统中引入的东西越多,那么系统的复杂度就越高,问题也会越多。比如,我们引入了MQ,那么需要保证消息不会被重复消费,要处理消息丢失的情况,如果多消息作为一个业务原子性操作,那么如果保证消息的顺序等等。所以,在日常的系统设计中,只有业务场景真的需要异步处理,我们才会去选择MQ,而不是为了使用MQ而去引入MQ。
- 一致性问题
还是以上面的场景为例,如果A系统发送消息成功后,那么在A系统中,会认为整个流程是成功的。但是,在消费端,如果B系统或C系统有异常,那么整个业务流程其实是失败的。那么A系统的成功和消费端的失败就是不一致了。所以,当引入MQ的时候,我们就需要针对这种情况去做最终一致性的处理。可以根据不同步骤的成功与否,去做补偿或者业务回滚。
二、常用MQ介绍
2.1> Kafka
- Apache Kafka是一个分布式消息发布订阅系统。它最初由LinkedIn公司基于独特的设计实现为一个分布式的日志提交系统(a distributed commit log),之后成为Apache项目的一部分。Kafka性能高效、可扩展良好并且可持久化。它的分区特性,可复制和可容错都是其不错的特性。
2.1.1> 主要特性
- 快速持久化:可以在O(1)的系统开销下进行消息持久化;
- 高吞吐:在一台普通的服务器上既可以达到10W/s的吞吐速率;
- 完全的分布式系统:Broker、Producer和Consumer都原生自动支持分布式,自动实现负载均衡;
- 支持同步和异步复制两种高可用机制;
- 支持数据批量发送和拉取;
- 零拷贝技术(zero-copy):减少IO操作步骤,提高系统吞吐量;
- 数据迁移、扩容对用户透明;
- 无需停机即可扩展机器;
- 其他特性:丰富的消息拉取模型、高效订阅者水平扩展、实时的消息订阅、亿级的消息堆积能力、定期删除机制;
2.1.2>优点
- 客户端语言丰富:支持Java、.Net、PHP、Ruby、Python、Go等多种语言;
- 高性能:单机写入TPS约在100万条/秒,消息大小10个字节;
- 提供完全分布式架构,并有replica机制,拥有较高的可用性和可靠性,理论上支持消息无限堆积;
- 支持批量操作;
- 消费者采用Pull方式获取消息。消息有序,通过控制能够保证所有消息被消费且仅被消费一次;
- 有优秀的第三方KafkaWeb管理界面Kafka-Manager;
- 在日志领域比较成熟,被多家公司和多个开源项目使用。
2.1.3>缺点
- Kafka单机超过64个队列/分区时,Load时会发生明显的飙高现象。队列越多,负载越高,发送消息响应时间变长;
- 使用短轮询方式,实时性取决于轮询间隔时间;
- 消费失败不支持重试;
- 支持消息顺序,但是一台代理宕机后,就会产生消息乱序;
- 社区更新较慢。
2.2> RocketMQ
- RocketMQ出自阿里的开源产品,用Java语言实现,在设计时参考了Kafka,并做出了自己的一些改进,消息可靠性上比Kafka更好。RocketMQ在阿里内部被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。
2.2.1> 主要特性
- 基于 队列模型:具有高性能、高可靠、高实时、分布式等特点;
- Producer、Consumer、队列都支持分布式;
- Producer向一些队列轮流发送消息,队列集合称为Topic。Consumer如果做广播消费,则一个Consumer实例消费这个Topic对应的所有队列;如果做集群消费,则多个Consumer 实例平均消费这个Topic对应的队列集合;
- 能够保证严格的消息顺序;
- 提供丰富的消息拉取模式;
- 高效的订阅者水平扩展能力;
- 实时的消息订阅机制;
- 亿级消息堆积 能力;
- 较少的外部依赖。
2.2.2> 优点
- 单机支持1万以上持久化队列;
- RocketMQ的所有消息都是持久化的,先写入系统PAGECACHE,然后刷盘,可以保证内存与磁盘都有一份数据,而访问时,直接从内存读取。
- 模型简单,接口易用(JMS的接口很多场合并不太实用);
- 性能非常好,可以允许大量堆积消息在Broker中;
- 支持多种消费模式,包括集群消费、广播消费等;
- 各个环节分布式扩展设计,支持主从和高可用;
- 开发度较活跃,版本更新很快。
2.2.3> 缺点
- 支持的 客户端语言不多,目前是Java及C++,其中C++还不成熟;
- RocketMQ社区关注度及成熟度也不及前两者;
- 没有Web管理界面,提供了一个 CLI (命令行界面) 管理工具带来查询、管理和诊断各种问题;
- 没有在MQ核心里实现JMS等接口;
2.3> RabbitMQ
- RabbitMQ于2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
2.3.1> 主要特性
- 可靠性:提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制;
- 灵活的路由:消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做RabbitMQ的插件来使用;
- 消息集群:在相同局域网中的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用;
- 队列高可用:队列可以在集群中的机器上进行镜像,以确保在硬件问题下还保证消息安全;
- 支持多种协议:支持多种消息队列协议;
- 支持多种语言:用Erlang语言编写,支持只要是你能想到的所有编程语言;
- 管理界面:RabbitMQ有一个易用的用户界面,使得用户可以监控和管理消息Broker的许多方面;
- 跟踪机制:如果消息异常,RabbitMQ 提供消息跟踪机制,使用者可以找出发生了什么;
- 插件机制:提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
2.3.2> 优点
- 由于Erlang语言的特性,消息队列性能较好,支持高并发;
- 健壮、稳定、易用、跨平台、支持多种语言、文档齐全;
- 有消息确认机制和持久化机制,可靠性高;
- 高度可定制的路由;
- 管理界面较丰富,在互联网公司也有较大规模的应用,社区活跃度高。
2.3.3> 缺点
- 尽管结合 Erlang 语言本身的并发优势,性能较好,但是不利于做二次开发和维护;
- 实现了代理架构,意味着消息在发送到客户端之前可以在中央节点上排队。此特性使得RabbitMQ易于使用和部署,但是使得其运行速度较慢,因为中央节点 增加了延迟,消息封装后也比较大;需要学习比较复杂的接口和协议,学习和维护成本较高。
2.4> ActiveMQ
- ActiveMQ是由Apache出品,ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现。它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。
2.4.1> 主要特性
- 服从JMS规范:JMS 规范提供了良好的标准和保证,包括:同步 或 异步 的消息分发,一次和仅一次的消息分发,消息接收和订阅等等。遵从JMS规范的好处在于,不论使用什么JMS实现提供者,这些基础特性都是可用的;
- 连接灵活性:ActiveMQ提供了广泛的连接协议,支持的协议有:HTTP/S,IP多播,SSL,TCP,UDP等等。对众多协议的支持让ActiveMQ拥有了很好的灵活性;
- 支持的协议种类多:OpenWire、STOMP、REST、XMPP、AMQP;
- 持久化插件和安全插件:ActiveMQ提供了多种持久化选择。而且,ActiveMQ的安全性也可以完全依据用户需求进行自定义鉴权和授权;
- 支持的客户端语言种类多:除了Java之外,还有:C/C++,.NET,Perl,PHP,Python,Ruby;
- 代理集群:多个ActiveMQ代理可以组成一个集群来提供服务;
- 异常简单的管理:ActiveMQ是以开发者思维被设计的。所以,它并不需要专门的管理员,因为它提供了简单又使用的管理特性。有很多中方法可以监控ActiveMQ不同层面的数据,包括使用在JConsole或者在ActiveMQ的WebConsole中使用JMX。通过处理JMX的告警消息,通过使用命令行脚本,甚至可以通过监控各种类型的日志。
2.4.2> 优点
- 跨平台(JAVA编写与平台无关,ActiveMQ几乎可以运行在任何的JVM上);
- 可以用JDBC:可以将数据持久化到数据库。虽然使用JDBC会降低ActiveMQ的性能,但是数据库一直都是开发人员最熟悉的存储介质;
- 支持JMS规范:支持JMS规范提供的统一接口;
- 支持自动重连和错误重试机制;
- 有安全机制:支持基于shiro,jaas等多种安全配置机制,可以对Queue/Topic进行认证和授权;
- 监控完善:拥有完善的监控,包括WebConsole,JMX,Shell命令行,Jolokia的RESTful API;
- 界面友善:提供的WebConsole可以满足大部分情况,还有很多第三方的组件可以使用,比如hawtio;
2.4.3> 缺点
- 社区活跃度不及RabbitMQ高;
- 根据其他用户反馈,会出莫名其妙的问题,会丢失消息;
- 目前重心放到activemq6.0产品Apollo,对5.x的维护较少;
- 不适合用于上千个队列的应用场景;
2.5> ZeroMQ
- 号称史上最快的消息队列,它实际类似于Socket的一系列接口,他跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。ZMQ用于node与node间的通信,node可以是主机或者是进程。
- 引用官方的说法:“ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD套接字之上的一层封装。ZMQ让编写高性能网络应用程序极为简单和有趣。”
- 特点是:
高性能,非持久化
跨平台:支持Linux、Windows、OS X等
多语言支持;C、C++、Java、.NET、Python等30多种开发语言
可单独部署或集成到应用中使用
可作为Socket通信库使用
- 与RabbitMQ相比,ZMQ并不像是一个传统意义上的消息队列服务器,事实上,它也根本不是一个服务器,更像一个底层的网络通讯库,在Socket API之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的API接口。支持“Request-Reply “,”Publisher-Subscriber“,”Parallel Pipeline”三种基本模型和扩展模型。
- ZeroMQ高性能设计要点:
- 无锁的队列模型
对于跨线程间的交互(用户端和session)之间的数据交换通道pipe,采用无锁的队列算法CAS;在pipe两端注册有异步事件,在读或者写消息到pipe的时,会自动触发读写事件。
- 批量处理的算法
对于传统的消息处理,每个消息在发送和接收的时候,都需要系统的调用,这样对于大量的消息,系统的开销比较大,zeroMQ对于批量的消息,进行了适应性的优化,可以批量的接收和发送消息。
- 多核下的线程绑定,无须CPU切换
区别于传统的多线程并发模式,信号量或者临界区, zeroMQ充分利用多核的优势,每个核绑定运行一个工作者线程,避免多线程之间的CPU切换开销。
2.6> 对比
- 总结:消息队列的选型需要根据具体应用需求而定,ZeroMQ小而美,RabbitMQ大而稳,Kakfa和RocketMQ快而强劲。
2.7> 适用场景
2.7.1> 从公司基础建设力量角度出发
中小型软件公司,建议选RabbitMQ
- 首先:erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便。他的弊端也很明显,虽然RabbitMQ是开源的,然而国内有几个能定制化开发erlang的程序员呢?所幸,RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug,这点对于中小型公司来说十分重要。
- 其次:不考虑Kafka的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以kafka排除。
- 最后:不考虑RocketMQ的原因是,RocketMQ是阿里出品,如果阿里放弃维护RocketMQ,中小型公司一般抽不出人来进行RocketMQ的定制化开发,因此不推荐。
大型软件公司,根据具体使用在RocketMQ和kafka之间二选一。
- 一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。
- 针对RocketMQ,大型软件公司也可以抽出人手对RocketMQ进行定制化开发,毕竟国内有能力改JAVA源码的人,还是相当多的。
- 至于kafka,根据业务场景选择,如果有日志采集功能,肯定是首选kafka了。
2.7.2> 从业务场景角度出发
- RocketMQ定位于非日志的可靠消息传输(日志场景也OK),目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。
- Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
- RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
三、安装
3.1> 安装ZooKeeper
- 首先进入Zookeeper官网 https://zookeeper.apache.org/
- 解压到本地路径,进入ZooKeeper的conf目录下,复制zoo_sample.cfg配置文件,命名为zoo.cfg
# zookeeper时间配置中的基本单位(毫秒)
tickTime=2000
# 允许follower初始化连接到leader最大时长,它表示tickTime时间的倍数 即:initLimit*tickTime
initLimit=10
# 运行follower与leader数据同步最大时长,它表示tickTime时间倍数 即:syncLimit*tickTime
syncLimit=5
# zookeeper数据存储目录及日志保存目录(如果没有指明dataLogDir,则日志也保存在这个文件中)
dataDir=/tmp/zookeeper
# 对客户端提供的端口号
clientPort=2181
# 单个客户端于zookeeper最大并发连接数
maxClientCnxns=60
# 保存的数据快照数量,之外的将会被清除
autopurge.snapRetainCount=3
# 自动出发清除任务时间间隔,以小时为单位。默认为0,表示不自动清除
autopurge.purgeInterval=1
## Metrics Providers
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
3.2> 安装Kafka
- 首先进入Kafka官网下载kafka http://kafka.apache.org/
- 解压,然后进入config目录下,编辑server.properties配置文件
broker.id=0
listeners=PLAINTEXT://localhost:9092
log.dirs=/Users/muse/kafka_2.13-3.0.0/kafka-logs
zookeeper.connect=localhost:2181
- 修改完响应配置后,启动Kafka,并通过查看kafka的进程来判断是否启动起来了
3.3> 安装EFAK
- 首先进入EFAK官网下载Eagle:http://download.kafka-eagle.org/
- 修改EFAK的conf目录下配置文件——system-config.properties
- 配置环境变量KE_HOME,并调用source .zshrc使其立即生效。(如果不配置的话,启动时会报错)
- 访问EFAK界面 http://192.168.1.3:8048,用户名:admin 密码:123456
四、kafka基础知识
4.1> kafka常用的术语如下所示
4.2> 相关指令
https://kafka.apache.org/documentation/#quickstart
4.3> 创建和查看主题
- bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic muse --partitions 1 --replication-factor 1
- ./kafka-topics.sh --list --bootstrap-server localhost:9092
4.4> 开启消息发送端
- ./kafka-console-producer.sh --topic muse --bootstrap-server localhost:9092
- kafka自带了一个producer的命令客户端——kafka-console-producer.sh,它可以从本地文件中读取内容,或者我们也可以从命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一行都会被当做一个独立的消息。
- 使用kafka-console-producer.sh,指定发送到的topic和kafka服务器的地址
- 也可以使用./kafka-console-producer.sh --topic muse --broker-list localhost:9092
4.5> 开启消息接收端
- 【方式一】从最后一条消息的偏移量+1开始消费,即:启动客户端之前的消息是不会被消费的
./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092
- 【方式二】从头开始消费,即:启动客户端之前的消息会被消费
./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --from-beginning
- 消息是会被存储在kafka中的文件里的,并且是顺序存储的,消息有偏移量的概念,所以我们可以指定偏移量去读取某个位置的消息。
- 如果说消息会被存储到kafka中,那么,发到topic=muse的消息存储到哪里了呢?
答:我们上面介绍kafka配置文件server.properties时,配置了一个属性为log.dirs,那么这个路径就是存储路径,如下所示:
【解释】其中的00000000000000000000.log就是消息存储的文件。
4.6> 单播消息
- 一个消费组里,只会有一个消费者能够消费到某个topic中的消息。
- 操作演示
- 首先,打开两个窗口,分别执行如下语句开启消费端,那么就在“museGroup1”消费组中创建了两个Consumer
./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --consumer-property group.id=museGroup1
- 然后:Producer端发送3条消息,我们发现,只有一个Consumer收到了消息。如下所示:
4.7> 多播消息
- 当业务场景中,需要同一个topic下的消息被多个消费者消费,那么我们可以采用创建多个消费组的方式,那么这种方式就是多播消息。
- 操作演示
- 首先,打开两个窗口,在其中一个窗口执行如下指令:
./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --consumer-property group.id=museGroup1
./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --consumer-property group.id=museGroup2
- 最后,Producer端发送3条消息,我们发现,两个Consumer都收到了消息。如下所示:
4.8> 查看消费组信息
【解释】
- CURENT-OFFSET:当前消费组已消费消息的偏移量。
- LOG-END-OFFSET:主题对应分区消息的结束偏移量(水位HW)。
- LAG:当前消费组堆积未消费的消息数量。
- 操作演示
- 首先,我们先看一下消费组museGroup1的具体信息
- 然后,我们关闭museGroup1的所有Consumer,使得这个消费组不具备消费消息的能力
- 最后,我们向消费组museGroup1中发送3条消息,我们再来看一下消费组信息。
五、kafka集群
5.1> 搭建kafka集群
5.1.1> 解压kafka压缩包
- 创建kafka-cluster目录,并解压kafka_2.13-3.0.0.tgz为3份kafka
5.1.2> 修改配置文件server.properties
- 修改kafka1的server.propertis配置文件
broker.id=0
listeners=PLAINTEXT://localhost:9092
log.dirs=/Users/muse/kafka-cluster/kafka1/kafka-logs
- 修改kafka2的server.propertis配置文件
broker.id=1
listeners=PLAINTEXT://localhost:9093
log.dirs=/Users/muse/kafka-cluster/kafka2/kafka-logs
- 修改kafka3的server.propertis配置文件
broker.id=2
listeners=PLAINTEXT://localhost:9094
log.dirs=/Users/muse/kafka-cluster/kafka3/kafka-logs
5.1.3> 启动三个节点
./kafka-cluster/kafka1/bin/kafka-server-start.sh -daemon ./kafka-cluster/kafka1/config/server.properties
./kafka-cluster/kafka2/bin/kafka-server-start.sh -daemon ./kafka-cluster/kafka2/config/server.properties
./kafka-cluster/kafka3/bin/kafka-server-start.sh -daemon ./kafka-cluster/kafka3/config/server.properties
5.1.4> 验证3个Broker是否启动成功
- 首先,可以通过ps -ef | grep kafka来查看进程是否启动
- 其次:在zookeeper中查看/brokers/ids下中是否有相应的brokerId目录生成
5.2> 分区和副本
5.2.1> 分区
- 一个主题中的消息量是非常大的,因此可以通过分区的设置,来分布式存储这些消息。并且分区也可以提供消息并发存储的能力。
- 比如:给一个Topic创建了4个分区,那么Topic中的消息就会分别存放在这4个分区中。
5.2.2> 副本
- 如果分片挂掉了,数据就丢失了。那么为了提高系统的可用性。我们把分片复制多个,这就是副本了。
- 但是,副本的产生,也会随之带来数据一致性的问题,即:有的副本写数据成功,但是有的副本写数据失败。
- Leader
- kafka的读写操作都发生在leader上,leader负责把数据同步给follower。
- 当leader挂掉了,那么经过主从选举,从多个follower中选举产生一个新的leader。
- Follower
- follower接收leader同步过来的数据,它不提供读写(主要是为了保证多副本数据与消费的一致性)
5.2.3> __consumer_offsets-N
- kafka默认创建了一个拥有50个分区的主题,名称为:“__consumer_offsets”,
- consumer会把将消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去的时候,
【key】=consumerGroupID+topic+分区号
【value】=当前offset的值。
- Kafka会定期清理topic里的消息,最后就保留最新的那条数据。
- 因为__consumer_offsets可能会接收高并发的请求,所以默认分配了50个分区(可以通过offsets.topic.num.partitions进行配置),这样可以通过增加机器的方式应对高并发的业务场景。
- 可以通过如下公式确定consumer消费的offset要提交到哪个__consumer_offsets
hash(consumerGroupID)% 主题“__consumer_offsets”的分区数量
5.2.4> 操作演示
- 为名称为muse-rp的Topic创建2个分区(--partitions)3个副本(--replication-factor)
./kafka-topics.sh --create --topic muse-rp --bootstrap-server localhost:9092 --partitions 2 --replication-factor 3
./kafka-topics.sh --describe --topic muse-rp --bootstrap-server localhost:9092
【解释】
“可以同步”和“已同步”的节点都会被存入isr集合中,如果isr中的节点性能较差,那么将会从isr集合中被剔除。
当leader节点挂掉,需要从follower节点列表中选举出主节点时,其实就是从isr集合中选举出来的。
- 我们随便找一个名为kafka1的Broker,来看一下.log文件和.index文件
【解释】
- 由于将Topic=“muse-rp”的主题指定了2个分区
- 所以产生了两个目录muse-rp-0和muse-rp-1,
- 里面存储的00000000000000000000.log就是消息存储的文件。
- 其中00000000000000000000.index存储的是消息的稀疏索引。如下图所示:
5.3> 集群消息的发送与消费
5.3.1> 消息放送
- ./kafka-console-producer.sh --topic muse-rp --bootstrap-server localhost:9092 localhost:9093 localhost:9094
- 发送10个消息
5.3.2> 消息消费
- ./kafka-console-consumer.sh --topic muse-rp --bootstrap-server localhost:9092 localhost:9093 localhost:9094 --consumer-property group.id=museGroup1
- 在消息组museGroup1中开启第一个消费者
5.3.3> 总结
- 一个partition只能被一个消费组中的一个消费者消费,这样设计的目的是保证消息的有序性,但是在多个partition的多个消费者消费的总顺序性是无法得到保证的。
- partition的数量决定了消费组中Consumer的数量,建议同一个消费组中的Consumer数量不要超过partition的数量,否则多余的Consumer就无法消费到消息了
- 但是,如果消费者挂掉了,那么就会触发rebalance机制,会由其他消费者来消费该分区。
- 如下图所示:
六、利用Kafka的Java Client实现消息收发
- 项目中引入kafka-clients的依赖(也可以直接引入spring-kafka的依赖,里面包含了kafka-clients)
6.1> 生产者端
6.1.1> 初始化配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
properties.put(ProducerConfig.ACKS_CONFIG, "1");
【解释】
表示producer不需要等待任何broker确认收到消息的ACK回复,就可以继续发送下一条消息。性能最高,但是最容易丢失消息。
表示至少等待leader已经成功将数据写入本地log,但是不需要等待所有follower都写入成功,就可以继续发送下一条消息。
这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息就会丢失。
需要等待所有min.insync.replicas(默认为1,推荐配置>=2)这个参数配置的副本个数都成功写入日志。
这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。
properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 失败重试3次
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); // 重试间隔300ms
【解释】
- 发送失败重试的次数,默认是间隔100ms。
- 重试能保证消息发送的可靠性,但是也可能造成消息重复发送,所以需要在消费者端做好幂等性处理。
- 配置缓存相关信息
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32*1024*1024); // 设置发送消息的本地缓冲区,默认值为32MB
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024); // 设置批量发送消息的大小,默认值为16KB
properties.put(ProducerConfig.LINGER_MS_CONFIG, 10); // batch等待时间,默认值为0,表示消息必须立即被发送
【解释】
- Producer的消息会先发送到本地缓冲区(BUFFER_MEMORY_CONFIG),而不是发送一次消息连接一次kafka。
- kafka本地线程会从缓冲区去取数据(BATCH_SIZE_CONFIG),然后批量发送到Broker,即:一个批次满足16KB就会发送出去。
- LINGER_MS_CONFIG的默认值为0,表示消息必须立即被发送,但这样会影响性能。
设置10ms也就是说Producer消息发送完后会进入本地的batch中;如果10ms内,这个batch满足了16KB,那么就会随着batch一起被发送出去。
如果10ms内,batch没满,那么也必须要把消息发送出去,不能让消息的发送延迟时间太长。
- 配置key和value的序列化实现类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
6.1.2> 同步消息发送
6.1.3> 异步消息发送
6.2> 消费者端
6.2.1> 初始化配置
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "museGroup");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
【解释】
- offset的重置策略——例如创建一个新的消费组,offset是不存在的,如何对offset赋值消费
latest:默认值,只消费自己启动之后发送到主题的消息。
earliest:第一次从头开始消费,以后按照消费offset记录继续消费。
- 心跳相关配置
/** Consumer给Broker发送心跳的时间间隔 */
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
/** 如果超过10秒没有接收到消费者的心跳,则会把消费者踢出消费组,然后重新进行rebalance操作,把分区分配给其他消费者 */
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10*1000);
/** 一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置 */
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
/** 如果两次poll的时间超出了30秒的时间间隔,kafka会认为整个Consumer的消费能力太弱,会将它踢出消费组。将分区分配给其他消费者 */
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30*1000);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
6.2.2> 自动提交offset
- 自动提交offset
- 当消费者向Broker的log中poll到消息后,默认情况下,会向broker中名称为“__consumer_offsets”的Topic发送offset偏移量。
- 自动提交会出现丢失消息的情况
因为如果Consumer还没消费完poll下来的消息就自动提交了偏移量,那么此时如果Consumer挂掉了,那么下一个消费者会从已经提交的offset的下一个位置开始消费消息。那么之前没有被消费的消息就丢失了。
6.2.3> 手动提交offset
当消费者从kafka的Broker日志文件中poll到消息并且消费完毕之后。再手动提交当前的offset。
七、SpringBoot集成
6.1> 创建Spring项目
- 我们看到,SpringBoot自动引入的kafka版本就是我们所采用的最新版本3.0.0
6.2> 生产者端
spring:
kafka:
producer:
bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 # kafka集群
acks: 1 # 应答级别:多少个分区副本备份完成时,向生产者发送ack确认(可选0、1、-1/all)
retries: 3 # 重试次数
buffer-memory: 33554432 # 生产端缓冲区大小
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
batch-size: 16384 #批量大小,默认16kb
properties:
# 提交延时,当Producer积累的消息达到batch-size或者接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms等于0,表示没当接收到一条消息的时候,就提交给kafka,这个时候batch-size上面配置的值就无效了
linger.ms: 0
private final static String TOPIC_NAME = "muse-rp";
private final static Integer PARTITION_ONE = 0;
private final static Integer PARTITION_TWO = 1;
@Resource
private KafkaTemplate<String, Message> kafkaTemplate;
/**
* 同步阻塞——消息发送
*/
public void testBlockingSendMsg() throws Throwable {
Message message;
for (int i=0; i< 5; i++) {
message = new Message(i, "BLOCKING_MSG_SPRINGBOOT_" + i);
ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(TOPIC_NAME, PARTITION_ONE,
"" + message.getMegId(), message);
SendResult<String, Message> sendResult = future.get();
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
log.info("---BLOCKING_MSG_SPRINGBOOT--- [topic]={}, [partition]={}, [offset]={}", recordMetadata.topic(),
recordMetadata.partition(), recordMetadata.offset());
}
}
/**
* 异步回调——消息发送
*/
public void testNoBlockingSendMsg() {
Message message;
for (int i=0; i< 5; i++) {
message = new Message(i, "NO_BLOCKING_MSG_SPRINGBOOT_" + i);
ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(TOPIC_NAME, PARTITION_TWO,
"" + message.getMegId(), message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {
@Override
public void onFailure(Throwable ex) {
log.error("消息发送失败!", ex);
}
@Override
public void onSuccess(SendResult<String, Message> result) {
RecordMetadata recordMetadata = result.getRecordMetadata();
log.info("---NO_BLOCKING_MSG_SPRINGBOOT---[topic]={}, [partition]={}, [offset]={}",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
}
});
}
}
6.3> 消费者端
spring:
kafka:
consumer:
group-id: museGroup # 消费组id
enable-auto-commit: true # 是否自动提交offset
auto-commit-interval: 1000 # 提交offset延时(接收到消息后多久提交offset)
# 当kafka中没有初始offset或offset超出范围时,将自动重置offset
# earliest:第一次从头开始消费,以后按照消费offset记录继续消费
# latest(默认):只消费自己启动之后发送到主题的消息。
auto-offset-reset: latest
max-poll-records: 500 # 批量消费每次最多消费多少条记录
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
session.timeout.ms: 120000 # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
request.timeout.ms: 180000 # 消费请求超时时间
spring:
json:
trusted:
packages: com.muse.kafkademo.entity
listener:
ack-mode: record
private final static String TOPIC_NAME = "muse-rp";
private final static String CONSUMER_GROUP_NAME = "museGroup";
/**
* 消息消费演示
*/
@KafkaListener(topics = TOPIC_NAME, groupId = CONSUMER_GROUP_NAME)
// public void listenGroup(ConsumerRecord<String, Message> record, Acknowledgment ack) {
public void listenGroup(ConsumerRecord<String, Message> record) {
log.info(" [topic]={}, [partition]={}, [offset]={}, [key]={}, [value]={}", record.topic(),
record.partition(), record.offset(), record.key(), record.value());
// ack.acknowledge(); 需要enable-auto-commit: false才可以
}
/**
* 消息消费多配置参数演示
*/
@KafkaListener(
groupId = "xxGroup", // 消费组
concurrency = "3", // 组里创建3个Consumer
topicPartitions = {
@TopicPartition(topic = "xxTopic1", // 针对主题"xxTopic1",要消费分区0和分区1
partitions = {"0", "1"}),
@TopicPartition(topic = "xxTopic2", // 针对主题"xxTopic2",要消费分区0和分区1,并且针对分区1,offset被设置为100
partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listenGroupPro(ConsumerRecord<String, Message> record) {
Message message = record.value();
log.info("msgId={}, content={}", message.megId, message.content);
}
八、Kafka集群概念补充
8.1> Controller
- Kafka集群中的Broker在ZK中创建临时序号节点,序号最小的节点也就是最先创建的那个节点,将作为集群的Controller,负责管理整个集群中的所有分区和副本的状态。控制器的作用如下:
- 当某个分区的leader副本出现故障时,由控制器负责为该分区选举除新的leader副本。
- 当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。
- 当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知到。
8.2> Rebalance机制
- 当消费者没有指明分区消费时,消费组里的消费者和分区关系发生了变化,那么就会触发rebalance机制。
- 这个机制会重新调整消费者消费哪个分区。
- 在触发rebalance机制之前,消费者消费那个分区有3中策略:
- 1> range
通过公式来计算某个消费者消费那个分区。
大家轮流对分区进行消费。
在触发rebalance之后,在消费者消费的原分区不变的基础上进行调整。
8.3>HW和LEO
- HW(HighWatermark)俗称高水位,取一个partition对应的ISR中最小的LEO(log-end-offset)作为HW;
- Consumer最多只能消费到HW所在的位置,每个副本都有HW,Leader和Follower各自负责更新自己的HW的状态。
- 对于Leader新写入的消息,Consumer不能立刻消费,Leader会等待该消息被所有ISR中的副本同步后更新HW,此时消息才能被Consumer所消费。这样就能保证如果Leader所在的broker失效,该消息仍然可以从新选举的Leader中获取。
- 具体逻辑如下图所示:
九、Kafka常见面试题
9.1> 如何防止消息丢失
- 针对发送方
ack1或者-1/all可以防止消息丢失,如果要做到99.9999%,ack要设置为all,并把min.insync.replicas配置成分区备份数
- 针对接收方
把自动提交修改为手动提交。
9.2> 如何防止消息的重复消费
- 一条消息被消费者消费多次,如果为了不消费到重复的消息,我们需要在消费端增加幂等性处理,例如:
- 通过mysql插入业务id作为主键,因为主键具有唯一性,所以一次只能插入一条业务数据。
- 使用redis或zk的分布式锁,实现对业务数据的幂等操作。
9.3> 如何做到顺序消费
- 针对发送方
在发送时将ack配置为非0,确保消息至少同步到leader之后再返回ack继续发送。但是,只能保证分区内部的消息是顺序的,而无法保证一个Topic下的多个分区总的消息是有序的。
- 针对接收方
消息发送到一个分区中,值配置一个消费组的消费者来接收消息,那么这个Consumer所接收到的消息就是有顺序的了,不过这也就牺牲掉了性能。
9.4> 如何解决消息积压的问题
- 消息积压会导致很多问题,比如:磁盘被打满、Producer发送消息导致kafka性能过慢,然后就有可能发生服务雪崩。解决的方案如下所示:
- 方案1:提升一个Consumer的处理能力。即:在一个消费者中启动多个线程,让多线程加快消费的速度。
- 方案2:提升总体Consumer的处理能力。启动多个消费组,增加Consumer的数量从而提高消费能力。
- 方案3:如果业务运行,设定某个时间内,如果消息仍没有被消费,那么Consumer收到消息后,直接废弃掉,不执行下面的业务逻辑。
9.5> 如何实现延迟队列
- 应用场景:订单创建成功后如果超过30分钟没有付款,则需要取消订单。
- 步骤一:创建一个表示“订单30分钟未支付”的Topic
order_not_paid_30m:延迟30分钟的消息队列。
- 步骤二:Producer发送消息的时候,消息内容要带上订单生成的时间create_time。
- 步骤三:Consumer消费Topic中的消息,如果发现now减去create_time不足30分钟,则不去消费下次,记录当前的offset,不去消费当前以及之后的消息。
- 步骤四:通过记录的offset去获取消息,如果发现消息已经超过30分钟且订单状态是“未支付”,那么则将订单状态设置为“取消”,然后或许下一个offset的消息。