首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

ElasticMQ 0.7.0:使用Akka和Spray的长轮询,非阻塞实现

一个基于Actor的兼容Scala和Amazon SQS接口的消息队列系统,ElasticMQ 0.7.0,刚刚发布。...如果队列中没有消息,而不是正在完成空响应的请求,ElasticMQ将等待MessageWaitTime秒钟,直到消息到达。...GetQueueData()).apply() } } } 这里的重要部分是flow块,它界定转换范围,以及用于提取未来内容的Future上的apply()调用。...当接收消息的请求到达,并且队列中没有任何内容时,我们不是立即回复(即向发送者Actor发送空列表),而是将原始请求的引用和发送方actor存储在一个map中。...使用Akka调度程序,我们还计划在指定的超时之后发回空列表并删除条目。 当新消息到达时,我们只需从map上获取一个等待请求,然后尝试完成它。同样,所有同步和并发问题都由Akka和参与者模型来处理。

1.6K90

干货 | 成本低误差小,携程基于 Kafka 的 Serverless 延迟队列的实践

四、产品选型 在 aws 上支持消息队列的产品有 RabbitMQ、Apache ActiveMQ 和 SQS。...具体来说,通过设置消息的 TTL,当达到 TTL 时消息还没有被消费,此时会投递到死信队列。...6.3 基于 SQS 和定时调度策略 使用基于 SQS 的多级队列的方式最大的问题是云上的成本问题,更具体一点是云上的存储成本问题。...当 DynamoDB 中的延迟消息被投递到 SQS 以后,会调用 API 去删除该消息。DynamoDB 中消息的数据结构还包括 topic、消息体等信息。...当 Scheduler 消费到通知消息时,会根据消息内容转换成时间戳,并在 DynamoDB 中查询这一时间戳范围内的所有消息,修改消息的延迟时间,投递到 SQS 的 Standard 队列中,最后删除

2.1K40
  • 您找到你想要的搜索结果了吗?
    是的
    没有找到

    什么场景(不)适合使用Lambda

    以项目经验为例,有一个API Gateway -> Function A -> Function B -> 第三方系统的访问链路,在测试环境(用的人少,流量波动大)中,从页面调用这个接口的时间基本上在8...秒以上,有时会超过10秒,让客户怀疑系统的性能有问题。...从价格方面来考虑,Lambda使用的是基于调用次数计费的模型,当调用次数增长到一定的阈值以上,其成本有效性必定会低于基于使用资源时长计费的模型。...让我们按照AWS的5 Pillars来分析为什么这是一个良好的解决方案: Reliability: API Gateway加上SQS能够保证足够的高可用性,并且提供稳定的低延迟,这对Webhook的监听器来说相当重要...Lambda支持同步和异步两种调用模式,以项目经验来看,同步调用模式受冷启动影响更大,有时会通过SQS将调用封装成异步模式。

    1.4K20

    ElasticMQ 0.7.0:长轮询,使用Akka和Spray的非阻塞实现

    主要的客户端改进是: 支持长轮询,这是SQS前一段时间的补充 更简单的独立服务器 - 只需下载一个jar包 使用长时间的轮询的过程中,当收到消息时,可以指定一个额外的的MessageWaitTime属性...如果队列中没有消息,,ElasticMQ将等待MessageWaitTime几秒钟直到消息到达,而不是用空响应完成请求。...实现说明 出于好奇,下面是对ElasticMQ如何实现的简短描述,包括核心系统,REST层,Akka数据流使用和长轮询实现。所有的代码都可以在GitHub上找到。...如前所述,ElasticMQ现在使用Akka和Spray来实现,并且不包含任何阻塞调用。一切都是异步的。 核心 核心系统是基于角色的。...当接收到消息的请求到达时,队列中没有任何内容产生,而是立即回复(即向发送者actor发送空列表),我们将储存原始请求的引用和发送方actor在map中。

    1.6K60

    JavaScript如何工作:引擎,运行时和调用堆栈的概述

    事实证明,有很多开发人员每天都在使用JavaScript,但不知道什么会发生什么。 概览 几乎所有人都已经听说过V8引擎的概念,大多数人都知道JavaScript是单线程的,或者是使用回调队列。...调用堆栈中的每个条目称为堆栈帧。 这正是抛出异常时构造堆栈跟踪的方式 - 当异常发生时,它基本上是调用堆栈的状态。...“Blowing the stack”  - 当您达到最大调用堆栈大小时,会发生这种情况。 这可能会很容易发生,特别是如果您在不经常地对代码进行测试的情况下使用递归。...在单个线程上运行代码可能非常容易,因为您不必处理在多线程环境中出现的复杂场景,例如死锁。 但是在单线程上运行也是非常有限的。 由于JavaScript有一个调用堆栈,当运行缓慢时会发生什么?...并发和事件循环 当您在调用堆栈中进行函数调用需要大量时间才能处理时会发生什么? 例如,假设您想在浏览器中使用JavaScript进行一些复杂的图像转换。 你可能会问 - 为什么这甚至是一个问题?

    1.8K40

    做了这个优化,我们系统性能提升了几倍

    基于上述考虑,最终的方案是集成SQS,采用lambda函数调用的方式,架构图如下所示: ?...通过当前的这种数据架构,就可以不用依赖对象存储了,数据直接存储在SQS中了,而且AWS服务支持通过lambda函数调用,这样就可以在需要服务的时候调用了,不需要服务一直启动,可以大大的节省服务器资源。...使用SQS有两个好处: SQS消息设置唯一ID,可以进行队列去重,应用场景为:亚马逊数据获取延迟,导致消息堆积,下一轮消息过来,队列中就会存在重复消息。...五、总结: 本次优化根本性优化主要有3点,数据获取服务迁移到国外,对跨境传输数据的处理、数据存储。方案的设计和选择一定要根据实际场景来设计,例如为什么用SQS队列而不用Kafka队列呢?...因为aws没有提供Kafka的服务,如果我们自己搭建,没有相关经验的运维人员,出现问题之后将是灾难性后果。还有为什么选择aws的lambda函数调用,也是同样的道理。

    81410

    基础设施即代码的历史与未来

    主要区别在于 playbook 是声明性的 - 它描述了它想要发生的事情,比如在机器上安装给定的 Apt 软件包。这与脚本不同,脚本包含要执行的命令。...我们不编写调用 SQS API 来创建队列的代码——我们只声明我们想要一个具有 VisibilityTimeout 属性设置为 120 的队列,部署引擎(在这种情况下是 CloudFormation )...它们的第一个缺点是它们主要在单个云服务的层面上操作。因此,虽然它们使使用 Lambda 或 SQS 变得简单,但您仍然需要知道这些服务是什么,以及为什么考虑使用它们。...什么情况下使用哪种服务更合适? 许多开发人员对每个云供应商的服务了解不够详细,特别是因为这些服务往往经常发生变化,引入新服务(或现有服务的新功能)并废弃旧服务。...在某种意义上,这是重复——我的应用程序代码使用 SQS 队列对我的基础设施代码提出了隐含的要求,以正确地配置该队列。

    25310

    Amazon云计算AWS(三)

    (二)RDS的使用   从用户和开发者的角度来看,RDS和一个远程MySQL关系数据库没什么两样。...(2)队列Queue   队列是存放消息的容器,类似于S3中的桶。队列的数目是任意的,创建队列时用户必须给其指定一个在SQS账户内唯一的名称。队列在传递消息时会尽可能 “先进先出”。...2、消息取样   队列中的消息是被冗余存储的,同一个消息会存放在系统的多个服务器上。其目的是为了保证系统的高可用性,但这会给用户查询队列中的消息带来麻烦。...当用户发出查询队列中消息的命令后,系统在所有的服务器上使用基于加权随机分布算法随机地选出部分服务器,然后返回这些服务器上保存的所查询的队列消息副本。   ...当消息数量较少时,SQS进行消息取样时可能会出现返回结果不准确的现象。但由于消息采样具有随机性,只要用户一直查询下去,总会查询到所有的消息。

    6610

    Serverless 常见的应用设计模式

    实施方面,可以使用 SQS 构建此模式。 消息队列包含多个发送方/接收方的时候,而每个 SQS 队列通常只有一个接收器。...SQS 队列可以订阅一个 SNS 主题,将消息推送到 SNS 主题,SQS 会自动将消息推送到所有订阅的队列。...通常,扇出模式用于将消息推送到特定队列或消息管道订阅的所有客户端。 此模式通常使用 SNS 主题实现,当向主题添加新消息时,允许调用多个订阅者。以 S3 为例。...当新消息添加到主题时,会强制并行调用所有订阅者,从而导致事件扇出。...SNS 主题支持其他订阅者,例如电子邮件和 SQS 队列。向主题添加新消息可以同时调用 Lambda 函数、发送电子邮件或将消息推送到 SQS 队列。

    2.8K30

    基于Karma构建微服务

    SNS接受一个服务传递给它的消息,并通过SQS将它发布到适当的队列中。然后,微服务可以将作业从队列中取出,处理它们,并在成功时删除它们。...如果一个进程失败了,那么这个消息会返回到队列中,这样进程的另一个实例就可以对其进行工作。 当部署一个新的微服务时,它包含一个配置文件,该文件描述了想要侦听的消息类型以及要发布的消息类型。...我们有一个名为Fare的内部工具,它读取配置并设置适当的SQS和SNS队列。...任何其他对订单发生有兴趣的服务都可以在他们自己的队列中完成他们需要的任何事情,而store API也不需要担心。 当我们需要对消息立即响应时,我们仍然使用HTTP请求,例如登录或覆盖地图。...相反,我们专注于使每个组件尽可能完善,并且看看将它们组合一起时会发生什么。我们试图让每个微服务都履行合同。“当我这样做时,我得到了这个回报。”我们拿这些合同(contracts),并手动确保他们履行。

    1K50

    一文掌握Serverless中的异常处理

    万事万物都经不起审视,因为世上没有同样的成长环境,也没有同样的认知水平,更「没有适用于所有人的解决方案」; 不要急着评判文章列出的观点,只需代入其中,适度审视一番自己即可,能「跳脱出来从外人的角度看看现在的自己处在什么样的阶段...1  Lambda 错误类型 深入研究错误处理策略之前,先了解 AWS Lambda 中可能发生的错误类型。 1.1 调用错误 当 Lambda 函数被触发但无法正确执行时发生。...2 错误处理的最佳实践 2.1 死信队列 (DLQs) AWS SQS 中的死信队列 (DLQ) 是一个单独的队列,用于捕获和存储 Lambda 函数在处理 SQS 队列时无法成功处理的消息。...场景 假设有一个处理来自 SQS 队列的消息的 Lambda 函数。由于各种原因如意外数据格式、处理逻辑中的错误或外部依赖项的间歇性问题,一些消息始终无法被 Lambda 函数成功处理。...解决方案 为 SQS 队列配置死信队列,以捕获和存储无法成功处理的消息。使用 DLQ 进行调查并重新处理失败的消息。

    16010

    machinery中文文档( 值得收藏 )

    任务是一个函数,它定义当worker收到消息时发生的事情。 每个任务至少要返回一个error作为返回值。除了错误任务现在可以返回任意数量的参数。...理想情况下,任务应该是等幂的,这意味着当使用相同的参数多次调用任务时,不会出现意外的结果。 签名(Signatures) 签名包装了任务的调用参数,执行选项(比如不可变性)和和成功/错误回调任务。...如果将其保留为空,则默认行为是将其设置为直接交换类型的默认队列的绑定键,以及其他交换类型的默认队列名。 ETAETA是用于延迟任务的时间戳。如果填为nil,任务被推送到worker将立即执行。...OnSuccess定义了任务成功执行后将被调用的任务。他是一个signature类型的切片。 OnError定义任务执行失败后将被调用的任务。...当一个任务成功完成时,结果被附加到chain中下一个任务的参数列表的末尾。

    1.7K10

    协程-无栈协程(下)

    unsigned short类型     ·整个PT协程,在创建之前需要调用PT_INIT进行初始化,初始化之后调用PT_BEGIN拉起协程,协程运行完毕之后调用PT_END关闭协程 ·ProtoThread...·ProtoThread通过宏PT_SCHEDULE来实现协程的调度,通常调用PT_SCHEDULE的是主控协程,主控协程决策调度哪个协程之后通过PT_SCHEDULE进行调度     我们尝试用ProtoThread...); ·当读到消息之后,对于未开启流程的玩家创建一个协程,其他的则调度对应的协程(PT_SCHEDULE(login_thread(role_iter->second)))继续往后走; ·对于登录协程...: ·阻塞命令PT_WAIT_UNTIL新增标签字段label,当阻塞时,我们不仅指明解除阻塞所需满足的条件,也指明解除阻塞后要执行的代码段 ·调度的指令LC_RESUME,则是根据标签的地址直接跳转的对应代码去执行...,不会因为程序的重新编译发生变化,所以重启不会影响协程的恢复和执行 参考资料 函数调用过程 ucontext manual pages swapcontext() — Save and restore

    87020

    无服务器系统的设计模式

    在数字时代,我们正在试图将人脑数字化,因此将企业机器进行数字化并不是什么了不起的事情。将企业的某一组成部分或者某一区域实现数字化是不够的。实际上,要操控一个企业,就必须要集成其所有不同的组成部分。...如果我们专注于使用无服务器方式实现一个架构的话,那么随之而来的是一些基本的、高层次的问题。 使用无服务器构建块设计一个系统时,首选的架构风格是什么?...它所涉及的不同方面,包括运行机制、适用性、使用场景、使用模式、实现模式等,每一步都在不断发生着变化。...管道和过滤器一个非常常见的用法是这样的:当客户端的请求到达服务器的时候,请求载荷必须要经历一个过滤和认证的过程。...我们可以通过使用 AWS 的简单队列服务(Simple Queue Service,SQS)来实现这一点,如下图所示。每个 lambda 过滤器处理一个事件并将其推送到队列中。

    2.1K20

    如何设计和实现微信公众号关注后48小时内定时给粉丝自动推送发送图文图片或文本消息?

    [image.png] linux的时间轮数据结构,如下, 内核的定时器本质上是 Single-Shot Timer,如果想成为 Repeating Timer,可以在注册的回调函数中再次的注册自己 [...使用AWS的SQS消息队列服务 AWS的SQS提供delay的支持, 非常完美得解决了这个问题, [image.png] 接口调用也很简单 System.out.println("Sending a message...(request); 有点是调用简单, 一个月有100万条消息的免费额度, 缺点是超过配额之后, 费用还是挺贵的....Redis提供的这个事件回调,并不承诺可靠. 使用RabbitMq实现延时队列 AMQP协议和RabbitMQ队列本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能....reconnect的之后, 不保证可靠性, 这个监控显示有概率非常小发生, 不到十万分之一, 对比了机器的环境, 应该是和网络抖动有关, 这个小概率的修复, 目前我们吞下了.

    1.8K00

    关于线程池你不得不知道的一些设置

    看完我上一篇文章「你都理解创建线程池的参数吗?」之后,当遇到这种问题,你觉得你完全能够唬住面试官了,50k轻松到手。殊不知,要是面试官此刻给你来个反杀: 初始化线程池时可以预先创建线程吗?...线程池的核心线程可以被回收吗?为什么? 如果此刻你一脸懵逼,这个要慌,问题很大,50k马上变5k。 ? 有细心的网友早就想到了这个问题: ? ?...在ThreadPoolExecutor线程池中,还有一些不常用的设置。我建议如果您在应用场景中没有特殊的要求,就不需要使用这些设置。 初始化线程池时可以预先创建线程吗?...上面我也说了,当线程空闲时会从blockingQueue阻塞队列中阻塞获取任务执行,所以我们来看看是保证核心线程不被销毁的,我们直接定位到源码部位: java.util.concurrent.ThreadPoolExecutor...,则返回null; take():从blocking阻塞队列取出一个任务,如果BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的任务被加入为止。

    82730

    如何避免AWS的高额账单?

    最终找到根因在于一个会触发Lambda执行的消息事件由于某个bug被大量复制,并且该事件在被Lambda处理后原样发回SQS,导致发生死循环。...执行时延 (Duration) 一般Severless系统的函数都有最大执行时间的限制。而且从系统的设计原则上讲,一次函数调用也不应执行过长的时间。...当监控到较多的长时延函数调用时,表明系统出现了异常情况,且极有可能导致性能问题。同时,长时延也意味着成本的增加。...调用次数 (Invocations) 调用次数表示某一时间范围内函数的调用次数,它能够反映当前函数的活跃程度以及整体上的执行情况。调用次数的突然变化也会反映系统中的异常情况。...了解得越清楚,在配置监控和告警时会更得心应手,收到告警后也有助于快速定位问题。 除了针对各个基础服务的各类指标进行监控外,监控云平台各个账号的账单也是避免损失的一大法宝。

    18520

    里氏替换原则(LSP)

    里氏替换原则,从语义上要保证不会因为继承而发生变化。 其实这样的定义还是很抽象的,我想应该大多数程序员在写代码中,继承一个类的时候,脑海中应该不是浮现出上述的原则。...子类对象替换父类对象 我们在编程的时候,经常都会使用到队列,在做SaaS开发的时候,如果你在AWS平台上开发则可能使用SQS, 如果你在Azure平台上开发可能会使用Service Bus Queue。...假设你现在AWS平台开发,但是你在做实现的时候,也需要抽象出队列的接口给调用者使用,因为说不定某一天要切换到Azure平台,这样的抽象接口有利于你进行扩展,在扩展到另一种队列的时候,调用者改动的代码对产品的影响会很小...动态角度: 在运行时实际上因为多态原理,实际调用的是子类QueueSQS的方法GetTask,可以认为子类对象在动态上替换了父类 上述例子,目前看是没有违背里氏替换原则,也是我们常见的使用场景。...结束语 当思考这些抽象原则的时候,实话说笔者的大脑也是有些难熬的。写下这些,一是为了能够将自己的想法输出加强印象,也希望让看到的同行一起探讨。

    66220

    使用Celery构建生产级工作流编排器

    当任务已定义好了以及哪个 worker 将执行它们时,下一步需要确定路由。 Celery 有一个可以通过配置提及的任务路由这个惊人的特性。 它可以根据名称自动将任务路由到不同的队列中,是的!...这些可以允许恢复由于限制而导致任务被终止而发生的数据库事务。...预加载机器学习模型文件:当使用 ML 模型构建工作流应用程序时,一种最佳优化技术是将它们加载为全局变量,这样一来,模型加载发生在工作器初始化时,并且可用作共享的静态文件。...ELK 上的日志监控 Sentry:在处理可能让你感到意外的不同类型数据时,错误可能是不可预料的,尤其是当流量很大时,Sentry 可能是你的好帮手,它会在出现问题时提醒你,在 Celery 工作进程启动时设置...为了定义最佳扩展策略,我们查看队列指标,例如 Amazon SQS 上提供的指标。 使用 SQS 指标调整策略 扩展和生产设置?

    40910

    MQ·将多消息合并为一条消息的发送、消费的设计与实现

    由于mq使用的是亚马逊的sqs服务,而sqs是按请求数消费的原因,所以才有的将多消息合并为一条消息发送的想法。...什么样的业务场景下才适合这么干? 将大量消息合并为一个消息后会导致消息消费失去原子性。...每个MesaageLooper的run方法实现的就是一个死循环,从阻塞队列中拿消息,当消息等于256时,或者阻塞超过1s就将拿到的消息合并成一个消息发送到mq。...原本计划是让消费者占用较小的内存,以实现将消费者寄生在其它服务所在的机器上,充分利用其它耗内存而cpu利用率低的服务所在的机器。...Sqs支持一次拉取多条消息,并且有一个可见性超时的特性,当消息被消费者拉取到之后,在多长时间内未删除,下次可能还会被拉取到,或者其它消费者还能拉取到。最初我设置的可见性超时是60s。 ?

    4.1K10
    领券