消息队列(Message Queue,简称 MQ)是阿里巴巴集团中间件技术部自主研发的专业消息中间件。用于保证异构应用之间的消息传递。应用程序通过MQ接口进行互连通信,可以不必关心网络上的通信细节,从而将更多的注意力集中于应用本身。
分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点。
对象
•消息 (Message) •队列 (Queue)
•队列管理器(QueueManager)
•通道 (Channel)
•监听器(Listener)
关系:队列管理器是负责向应用程序提供消息服务的机构,我们可以把队列管理器比作是数据库,队列是其中的一张表,消息表中的一条记录。
1、消息(Message)
消息是 WebSphereMQ中最小的概念,本质上就是一条数据,它能被一个或多个应用程序所理解,是应用程序之间传递的信息载体。
消息可以大致分成两部分:应用数据体和消息数据头。消息数据头是对消息属性的描述,这段信息往往被队列管理器用来确定对消息的处理。消息数据头可以由应用程序或系统的消息服务程序共同产生,它包含了消息在传送中的必要信息,如目标队列管理器的名字,目标队列的名字,以及消息的其它一些属性。
消息可以分成持久(Persistent)消息和非持久 (Non-Persistent)消息。所谓“持久”的意思,就是在 WebSphere MQ 队列管理器重启动后,消息是否仍然能保持。
2、队列 (Queue):分本地队列、远程队列、模型队列
本地队列按功能可分成初始化队列,传输队列,目标队列和死信队列。 –初始化队列用做消息触发功能。
–传输队列只是暂存待传的消息,条件许可的情况下,通过管道将消息传送其它的队列管理器。
–目标队列是消息的目的地,可以长期存放消息。
–如果消息不能送达目标队列,也不能再路由出去,则被自动放入死信队列保存。
远程队列
–用来指定远端队列管理器中的队列。使用了远程队列定义,程序就不需要知道目标队列的位置。
模型队列
-定义了一套本地队列的属性集合,一旦打开模型队列,队列管理器会按这些属性动态地创建出一个本地队列。
3、队列管理器(QueueManager) WebSphere MQ中的队列管理器可以含有很多个队列,但一个队列只能属于一个队列管理器。一个操作系统平台可以创建一个队列管理器,也可以创建多个队列管理器。队列管理器、队列、通道等等都是WebSphere MQ的对象,所有的对象都有各自的属性,有些属性必须在对象创建的时候指定,有些可以在创建以后更改。
4、通道 (Channel) 通道是两个队列管理器之间的一种单向的点对点的通信连接,消息在通道中只能单向流动。如果需要双向交流,可以建立一对通道,一来一去。站在队列管理器的角度,这一对通道可以按消息的流向分成输入通道和输出通道。通过配置,对于放入本地传输队列中的消息,队列管理器会自动将其通过输出通道发出,送入对方的远程目标队列。
5、监听器(Listener)
同步接收消息:同步读取即主动读取方式
异步接收消息异步读取则需要设定Listener,在消息到达后,自动调用Listener的onMessage()方法。
JMS开发
jms即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
•JMS 中有一系列的类:ConnectionFactory,Connection,Session,MessageProducer,MessageConsumer,Message
•在 JMS 编程时,程序首先要找出ConnectionFactory,以此创建Connection,再建立Session,以后所有的操作都以Session 为基础。找出Queue 或Queue (统称Destination),以此创建QueueSender或QueuePublisher (统称MessageProducer),在该对象上发送或发布消息。也可以在Destination 基础上创建QueueReceiver 或 QueueSubscriber (统称MessageConsumer),在该对象上接收或订阅消息。
前阵子从支付宝转账1万块钱到余额宝,这是日常生活的一件普通小事,但作为互联网研发人员的职业病,我就思考支付宝扣除1万之后,如果系统挂掉怎么办,这时余额宝账户并没有增加1万,数据就会出现不一致状况了。
上述场景在各个类型的系统中都能找到相似影子,比如在电商系统中,当有用户下单后,除了在订单表插入一条记录外,对应商品表的这个商品数量必须减1吧,怎么保证?!在搜索广告系统中,当用户点击某广告后,除了在点击事件表中增加一条记录外,还得去商家账户表中找到这个商家并扣除广告费吧,怎么保证?!等等,相信大家或多或多少都能碰到相似情景。
本质上问题可以抽象为:当一个表数据更新后,怎么保证另一个表的数据也必须要更新成功。
还是以支付宝转账余额宝为例,假设有
从支付宝转账1万块钱到余额宝的动作分为两步:
如何确保支付宝余额宝收支平衡呢?
有人说这个很简单嘛,可以用事务解决。
非常正确,如果你使用spring的话一个注解就能搞定上述事务功能。
如果系统规模较小,数据表都在一个数据库实例上,上述本地事务方式可以很好地运行,但是如果系统规模较大,比如支付宝账户表和余额宝账户表显然不会在同一个数据库实例上,他们往往分布在不同的物理节点上,这时本地事务已经失去用武之地。
既然本地事务失效,分布式事务自然就登上舞台。
两阶段提交协议(Two-phase Commit,2PC)经常被用来实现分布式事务。一般分为协调器C和若干事务执行者Si两种角色,这里的事务执行者就是具体的数据库,协调器可以和事务执行器在一台机器上。
1) 我们的应用程序(client)发起一个开始请求到TC;
2) TC先将<prepare>消息写到本地日志,之后向所有的Si发起<prepare>消息。以支付宝转账到余额宝为例,TC给A的prepare消息是通知支付宝数据库相应账目扣款1万,TC给B的prepare消息是通知余额宝数据库相应账目增加1w。为什么在执行任务前需要先写本地日志,主要是为了故障后恢复用,本地日志起到现实生活中凭证 的效果,如果没有本地日志(凭证),出问题容易死无对证;
3) Si收到<prepare>消息后,执行具体本机事务,但不会进行commit,如果成功返回<yes>,不成功返回<no>。同理,返回前都应把要返回的消息写到日志里,当作凭证。
4) TC收集所有执行器返回的消息,如果所有执行器都返回yes,那么给所有执行器发生送commit消息,执行器收到commit后执行本地事务的commit操作;如果有任一个执行器返回no,那么给所有执行器发送abort消息,执行器收到abort消息后执行事务abort操作。
注:TC或Si把发送或接收到的消息先写到日志里,主要是为了故障后恢复用。如某一Si从故障中恢复后,先检查本机的日志,如果已收到<commit >,则提交,如果<abort >则回滚。如果是<yes>,则再向TC询问一下,确定下一步。如果什么都没有,则很可能在<prepare>阶段Si就崩溃了,因此需要回滚。
现如今实现基于两阶段提交的分布式事务也没那么困难了,如果使用java,那么可以使用开源软件atomikos(http://www.atomikos.com/)来快速实现。
不过但凡使用过的上述两阶段提交的同学都可以发现性能实在是太差,根本不适合高并发的系统。为什么?
正是由于分布式事务存在很严重的性能问题,大部分高并发服务都在避免使用,往往通过其他途径来解决数据一致性问题。
如果仔细观察生活的话,生活的很多场景已经给了我们提示。
比如在北京很有名的姚记炒肝点了炒肝并付了钱后,他们并不会直接把你点的炒肝给你,而是给你一张小票,然后让你拿着小票到出货区排队去取。为什么他们要将付钱和取货两个动作分开呢?原因很多,其中一个很重要的原因是为了使他们接待能力增强(并发量更高)。
还是回到我们的问题,只要这张小票在,你最终是能拿到炒肝的。同理转账服务也是如此,当支付宝账户扣除1万后,我们只要生成一个凭证(消息)即可,这个凭证(消息)上写着“让余额宝账户增加 1万”,只要这个凭证(消息)能可靠保存,我们最终是可以拿着这个凭证(消息)让余额宝账户增加1万的,即我们能依靠这个凭证(消息)完成最终一致性。
有两种方法:
支付宝在完成扣款的同时,同时记录消息数据,这个消息数据与业务数据保存在同一数据库实例里(消息记录表表名为message)。
上述事务能保证只要支付宝账户里被扣了钱,消息一定能保存下来。
当上述事务提交成功后,我们通过实时消息服务将此消息通知余额宝,余额宝处理成功后发送回复成功消息,支付宝收到回复后删除该条消息数据。
上述保存消息的方式使得消息数据和业务数据紧耦合在一起,从架构上看不够优雅,而且容易诱发其他问题。为了解耦,可以采用以下方式。
1)支付宝在扣款事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不真正发送,只有消息发送成功后才会提交事务;
2)当支付宝扣款事务被提交成功后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才真正发送该消息;
3)当支付宝扣款事务提交失败回滚后,向实时消息服务取消发送。在得到取消发送指令后,该消息将不会被发送;
4)对于那些未确认的消息或者取消的消息,需要有一个消息状态确认系统定时去支付宝系统查询这个消息的状态并进行更新。为什么需要这一步骤,举个例子:假设在第2步支付宝扣款事务被成功提交后,系统挂了,此时消息状态并未被更新为“确认发送”,从而导致消息不能被发送。
优点:消息数据独立存储,降低业务系统与消息系统间的耦合;
缺点:一次消息发送需要两次请求;业务处理服务需要实现消息状态回查接口。
还有一个很严重的问题就是消息重复投递,以我们支付宝转账到余额宝为例,如果相同的消息被重复投递两次,那么我们余额宝账户将会增加2万而不是1万了。
为什么相同的消息会被重复投递?比如余额宝处理完消息msg后,发送了处理成功的消息给支付宝,正常情况下支付宝应该要删除消息msg,但如果支付宝这时候悲剧的挂了,重启后一看消息msg还在,就会继续发送消息msg。
解决方法很简单,在余额宝这边增加消息应用状态表(message_apply),通俗来说就是个账本,用于记录消息的消费情况,每次来一个消息,在真正执行之前,先去消息应用状态表中查询一遍,如果找到说明是重复消息,丢弃即可,如果没找到才执行,同时插入到消息应用状态表(同一事务)。
基于事务消息的MQ方案是目前公认的较为理想的分布式事务解决方案,各大电商都在应用这一方案。种方式适合的业务场景广泛,而且比较可靠。不过这种方式技术实现的难度比较大。目前主流的开源MQ(ActiveMQ、RabbitMQ、Kafka)均未实现对事务消息的支持,所以需二次开发或者新造轮子。
注:关注作者微信公众号,了解更多分布式架构、微服务、netty、MySQL、spring、性能优化、等知识点。公众号:《Java烂猪皮》
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。