首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >交易系统使用storm,在消息高可靠情况下,如何避免消息重复

交易系统使用storm,在消息高可靠情况下,如何避免消息重复

作者头像
intsmaze-刘洋
发布于 2018-08-29 08:01:14
发布于 2018-08-29 08:01:14
73300
代码可运行
举报
运行总次数:0
代码可运行

概要:在使用storm分布式计算框架进行数据处理时,如何保证进入storm的消息的一定会被处理,且不会被重复处理。这个时候仅仅开启storm的ack机制并不能解决上述问题。那么该如何设计出一个好的方案来解决上述问题?

现有架构背景:本人所在项目组的实时系统负责为XXX的实时产生的交易记录进行处理,根据处理的结果向用户推送不同的信息。实时系统平时接入量每秒1000条,双十一的时候,最大几十万条。

原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/6219878.html

新浪微博:intsmaze刘洋洋哥

架构设计:

  storm设置的超时时间为3分钟;kafkaspout的pending的长度为2000;storm开启ack机制,拓扑程序中如果出现异常则调用ack方法,向spout发出ack消息;每一个交易数据会有一个全局唯一性di。

  处理流程:

  交易数据会发送到kafka,然后拓扑A去kafka取数据进行处理,拓扑A中的OnceBolt会先对从kafka取出的消息进行一个唯一性过滤(根据该消息的全局id判断该消息是否存储在redis中,如果有,则说明拓扑A已经对该消息处理过了,则不会把该消息发送该下游的calculateBolt,直接向spout发送ack响应;如果没有,则把该消息发送该下游的calculateBolt。),calculateBolt对接收到来自上游的数据进行规则的匹配,根据该消息所符合的规则推送到不同的kafka通知主题中。

  拓扑B则是不同的通知拓扑,去kafka读取对应通知的主题,然后把该消息推送到不同的客户端(微信客户端,支付宝客户端等)。

架构设计的意义:

  通过借用redis,来保证消息不会被重复处理,对异常的消息,我们不让该消息重发。

  因为系统只是对交易成功后的数据通过配置的规则进行区分来向用户推送不同的活动信息,从业务上看,系统并不需要保证所有交易的用户都一定要收到活动信息,只需要保证交易的用户不会收到重复的数据即可。

 但是在线上运行半年后,还是发现了消息重复处理的问题,某些用户还是会收到两条甚至多条重复信息。

  通过对现有架构的查看,我们发现问题出在拓扑B中(各个不同的通知拓扑),原因是拓扑B没有添加唯一性过滤bolt,虽然上游的拓扑对消息进行唯一性过滤了(保证了外部系统向kafka生产消息出现重复下,拓扑A不进行重复处理),但是回看拓扑B,我们可以知道消息重发绝对不是kafka主题中存在重复的两条消息,且拓扑B消息重复不是系统异常导致的(我们队异常进行ack应答),那么导致消息重复处理的原因就一定是消息超时导致的。ps:消息在storm中被处理,没有发生异常,而是由于集群硬件资源的争抢或者下游接口瓶颈无法快速处理拓扑B推送出去的消息,导致一条消息在3分钟内没有处理完,spout就认为该消息fail,而重新发该消息,但是超时的那一条消息并不是说不会处理,当他获得资源了,仍然会处理结束的。

 解决方案:在拓扑B中添加唯一性过滤bolt即可解决。

个人推测:当时实时系统架构设计时,设计唯一性过滤bolt时,可能仅仅是考虑到外部系统向kafka推送数据可能会存在相同的消息,并没有想到storm本身tuple超时导致的消息重复处理。

该系统改进:虽然从业务的角度来说,并不需要保证每一个交易用户都一定要收到活动信息,但是我们完全可以做到每一个用户都收到活动信息,且收到的消息不重复。

我们可以做到对程序的异常进行控制,但是超时导致的fail我们无法控制。

  我们对消息处理异常控制,当发生异常信息,我们在发送fail应答前,把该异常的消息存储到redis中,这样唯一性过滤的bolt就会对收到的每一条消息进行判断,如果在redis中,我们就知道该消息是异常导致的失败,就让该消息继续处理,如果该消息不在redis中,我们就知道该消息是超时导致的fail,那么我们就过滤掉该消息,不进行下一步处理。

这样我们就做到了消息的可靠处理且不会重复处理。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
博主解决的是90%的问题,主要是因为:

1,彻头彻尾的异常是不会给你写redis的机会的,只能说绝大多数时候是OK的。
2,超时的任务最终也可能运行成功,这也会导致你做了2次。

我的看法:
既然是交易系统,最重要的就是业务本身满足幂等性和可重入,架构上容错导致的重试和重入,都不应该导致业务错乱。

所以,我认为在架构上能做的,是要保障at least once,博主判断redis不存在就认为是超时重发,殊不知超时的bolt可能很久之后异常退出,这样消息就没有人处理了。

不过具体场景具体分析,看业务需求取舍既可。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    超时的任务最终也可能运行成功,这也会导致你做了2次。(ps:这个不会,我们认为超时的任务最终会处理成功,所以再次发送,我们会在唯一性过滤bolt中把该消息过滤掉)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
   超时的bolt可能很久之后异常退出,这样消息就没有人处理了(ps:这个我要研究下,就是超时后,再异常向spout发送fial响应是否还会重发消息,如果还会重发,那么就可以保证该异常消息可以再一次被处理)

  彻头彻尾的异常是不会给你写redis的机会的,只能说绝大多数时候是OK的。(ps:正确,但是是不可控的吧,就像kafka把offset存储在zookeeper中,如果zookeeper挂掉就没有办法,确实绝大部分是ok
的,解决办法不知道有没有。)
  最重要的就是业务本身满足幂等性和可重入,架构上容错导致的重试和重入,都不应该导致业务错乱(ps:我不是很明白,我这里并不要求一条消息具备事务的特性和幂等性有什么关系)
以上是我对该朋友对本系统架构找出的问题的个人思考。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2016-12-26 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Storm 的可靠性保证测试
Storm是一个分布式的实时计算框架,可以很方便地对流式数据进行实时处理和分析,能运用在实时分析、在线数据挖掘、持续计算以及分布式 RPC 等场景下。Storm 的实时性可以使得数据从收集到处理展示在秒级别内完成,从而为业务方决策提供实时的数据支持。 在美团点评公司内部,实时计算主要应用场景包括实时日志解析、用户行为分析、实时消息推送、消费趋势展示、实时新客判断、实时活跃用户数统计等。这些数据提供给各事业群,并作为他们实时决策的有力依据,弥补了离线计算“T+1”的不足。 在实时计算中,用户不仅仅关心时效性的
美团技术团队
2018/03/12
1.2K0
Storm 的可靠性保证测试
Storm的ack机制在项目应用中的坑
正在学习storm的大兄弟们,我又来传道授业解惑了,是不是觉得自己会用ack了。好吧,那就让我开始啪啪打你们脸吧。
intsmaze-刘洋
2018/08/29
1.4K0
Storm的ack机制在项目应用中的坑
【Storm】Storm之how
通过ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理,从而可以让开发者采取动作。
章鱼carl
2022/03/31
8140
【Storm】Storm之how
Storm参数配置及代码优化
背景 本人在维护一套由storm、kafka、zookeeper组成的分布式实时计算系统。当数据量很小的时候,系统处理起来其实是绰绰有余的,基本上按照系统默认配置来就可以了。然而当数据量增长到一定规模的时候,系统的各个配置都对整个系统的性能有着至关重要的影响。在不断的处理现网问题、研究的过程中,对系统的一些关键配置有一些心得。在这里分享出来,同大家一起学习交流。 今天我们在这里只介绍storm一些相关的比较重要的配置项和优化项。 参数配置 并行度 本人曾在上一篇文章中翻译过官方关于并行度的解释,但是实际在生
gaofc
2018/05/09
1.2K0
Storm消息处理可靠性保证
Storm可以保证每一个从spout发出的消息能被完全处理。本章描述storm是如何完成这个保证以及用户如何从storm的可靠性能力获益的。 消息“完全处理”的含义 一个tuple从spout发出后可能会触发成千上万的tuple基于它而创建。以work count的topology为例考虑下: TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new KestrelSpout("kestrel.bac
囚兔
2018/02/08
9980
Storm消息处理可靠性保证
storm消息机制
这章讨论Storm's reliability capabilities, 如何保证从spout emit出来的所有tuple都被正确的执行(fully processed)? What does 
汤高
2018/03/08
1.2K0
storm消息机制
一脸懵逼学习Storm---(一个开源的分布式实时计算系统)
Storm的官方网址:http://storm.apache.org/index.html 1:什么是Storm?  Storm是一个开源的分布式实时计算系统,可以简单、可靠的处理大量的数据流。被称作
别先生
2018/01/02
1.6K0
一脸懵逼学习Storm---(一个开源的分布式实时计算系统)
Storm组件介绍
(1)Topologies 拓扑 解释: 拓扑类似一个集装箱,所有的货物都会存储在集装箱里面最后被托运走,storm里面所有的代码和文件最终会被打包在一个拓扑中,然后提交在storm集群中运行,类似于Hadoop中的一个MapReduce的作业,最大的区别在于MapReduce最终会主动停止,Storm的Topologies不会主动停止,除非你强制kill掉它 相关拓展: TopologyBuilder : Java里面构造Topology工具类 生产模式 Config conf = new Con
我是攻城师
2018/05/14
1.1K0
kafkaspot在ack机制下如何保证内存不溢
storm框架中的kafkaspout类实现的是BaseRichSpout,它里面已经重写了fail和ack方法,所以我们的bolt必须实现ack机制,就可以保证消息的重新发送;如果不实现ack机制,那么kafkaspout就无法得到消息的处理响应,就会在超时以后再次发送消息,导致消息的重复发送。
intsmaze-刘洋
2018/08/29
7040
storm从入门到放弃(三),放弃使用 StreamId 特性
  序:StreamId是storm中实现DAG有向无环图的重要一个特性,但是从实际生产环境来看,这个功能其实蛮影响生产环境的稳定性的,我们系统在迭代时会带来整体服务的不可用。
intsmaze-刘洋
2018/08/29
5320
storm从入门到放弃(三),放弃使用  StreamId  特性
实时可靠的开源分布式实时计算系统——Storm
在Hadoop生态圈中,针对大数据进行批量计算时,通常需要一个或者多个MapReduce作业来完成,但这种批量计算方式是满足不了对实时性要求高的场景。 Storm是一个开源分布式实时计算系统,它可以实时可靠地处理流数据。 Storm特点 在Storm出现之前,进行实时处理是非常痛苦的事情,我们主要的时间都花在关注往哪里发消息,从哪里接收消息,消息如何序列化,真正的业务逻辑只占了源代码的一小部分。一个应用程序的逻辑运行在很多worker上,但这些worker需要各自单独部署,还需要部署消息队列。最大问题是
CSDN技术头条
2018/02/06
2.3K0
实时可靠的开源分布式实时计算系统——Storm
实时大数据开发实践
本文主要从大数据起源谈起,介绍了几种主要的大数据处理框架,包括其中的容错机制,实现细节及原理等。再主要介绍了使用storm进行大数据开发的具体过程,以及开发过程中遇到的坑和一些优化。以下内容基于本人上次部门内分享整理,去掉了一些业务性的内容,尽量给大家展现一些技术细节。
gaofc
2018/12/24
1.4K0
Storm同步调用之DRPC模型探讨
摘要:Storm的编程模型是一个有向无环图,决定了storm的spout接收到外部系统的请求后,spout并不能得到bolt的处理结果并将结果返回给外部请求。所以也就决定了storm无法提供对外部系统的同步调用功能。
intsmaze-刘洋
2018/08/29
1K0
Storm同步调用之DRPC模型探讨
storm1.0节点间消息传递过久分析及调优
  序:最近对storm平台系统进行性能检测发现偶尔会出现oncebolt向另一个twobolt发送数据后,twobolt要500毫秒后才接收到进行处理。这里简单说增大twobolt的并行度即可解决,但是究其内部原因是因为storm的通信机制所导致的问题。   先介绍背景:一个拓扑的结构,spout(并行度:1)[处理性能:capacity 0.04],oncebolt(并行度:20)[处理性能:capacity 0.2],twobolt(并行度:100)[处理性能:capacity 0.6];整个拓扑就我预估最大的处理量就是一秒一千条。
intsmaze-刘洋
2018/08/29
2470
Storm——分布式实时流式计算框架
随机分组,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。 轮询,平均分配
时间静止不是简史
2020/07/24
5.6K0
快速认识实时计算系统 Storm
Storm是什么 Storm 是一个分布式数据流处理系统,用于大规模数据的实时处理。 例如用户在购物网站中会产生很多行为记录,如浏览、搜索感兴趣的商品,就可以使用Storm对这些行为记录进行实时分析处
dys
2018/04/04
1.4K0
快速认识实时计算系统 Storm
Storm Ack框架笔记
  Storm利用Acker Bolt节点跟踪消息,当Spout发送出去的消息以及这些消息所衍生出来的消息均被处理后,Spout将受到对应于该消息的Ack。实现要点:
用户3003813
2018/09/06
4910
Storm Ack框架笔记
storm 分布式实时计算系统介绍
在Storm之前,进行实时处理是非常痛苦的事情: 需要维护一堆消息队列和消费者,他们构成了非常复杂的图结构。消费者进程从队列里取消息,处理完成后,去更新数据库,或者给其他队列发新消息。
用户5265382
2019/05/17
1.9K0
storm 分布式实时计算系统介绍
Storm 稳定态
假设一个topology有4个worker,2个spout,2个bolt。spout1有4个task,spout2有2个task,bolt1有4个task,bolt2有4个task。(默认一个task对应一个Executor)
Meet相识
2018/09/12
1.2K0
Storm 稳定态
事实数据分析——Storm框架(一)
总体描述:nimbus下命令(分配任务),zk监督执行(心跳监控,worker、supurvisor的心跳都归它管),supervisor服从命令(下载代码),招募人马(创建worker和线程等),worker、executor就给我干活!task就是具体要干的活。
趣学程序-shaofeer
2019/09/25
1.1K0
事实数据分析——Storm框架(一)
相关推荐
Storm 的可靠性保证测试
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档