Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >ElasticMQ 0.7.0:长轮询,使用Akka和Spray的非阻塞实现

ElasticMQ 0.7.0:长轮询,使用Akka和Spray的非阻塞实现

作者头像
Techeek
发布于 2018-01-11 09:14:05
发布于 2018-01-11 09:14:05
1.7K0
举报
文章被收录于专栏:云计算云计算

ElasticMQ 0.7.0: Long Polling, Non-Blocking Implementation Using Akka and Spray

原文作者:Adam Warski

原文地址:https://dzone.com/articles/elasticmq-070-long-polling-non

译者微博:@从流域到海域

译者博客:blog.csdn.net/solo95

ElasticMQ 0.7.0:长轮询,使用Akka和Spray的非阻塞实现

ElasticMQ 0.7.0,一个附带基于actor的Scala的消息队列系统刚刚发布。

这是一次重大的重写(即版本升级),升级之后将在核心使用Akka actors 并在REST层使用Spray。到目前为止,只有核心和SQS模块被重写, 日志( journaling),SQL后端和副本(replication)模块的重写尚未完成。

主要的客户端改进是:

  • 支持长轮询,这是SQS前一段时间的补充
  • 更简单的独立服务器 - 只需下载一个jar包

使用长时间的轮询的过程中,当收到消息时,可以指定一个额外的的MessageWaitTime属性。如果队列中没有消息,,ElasticMQ将等待MessageWaitTime几秒钟直到消息到达,而不是用空响应完成请求。这有助于减少带宽的使用(不需要非常频繁地进行请求),进而提高系统整体性能(发送后立即收到消息)并降低SQS成本。

独立的服务器现在是一个单一的jar包。要运行本地内存SQS实现(例如,测试使用SQS的应用程序),只需要下载jar文件并运行:

代码语言:bash
AI代码解释
复制
java -jar elasticmq-server-0.7.0.jar

这将在http://localhost:9324启动服务器。当然,接口和端口都是可配置的,详情请参阅自述文件。像以前一样,您也可以使用任何基于JVM的语言来运行嵌入式服务器。

实现说明

出于好奇,下面是对ElasticMQ如何实现的简短描述,包括核心系统,REST层,Akka数据流使用和长轮询实现。所有的代码都可以在GitHub上找到。

如前所述,ElasticMQ现在使用Akka和Spray来实现,并且不包含任何阻塞调用。一切都是异步的。

核心

核心系统是基于角色的。有一个主角色(main actor)(QueueManagerActor),它知道系统中当前创建了哪些队列,并提供了创建和删除队列的可能性。

为了与actor沟通,使用了类型化问答模式。例如,要查找一个队列(一个队列也是一个actor),就会定义一个消息:

代码语言:java
AI代码解释
复制
case class LookupQueue(queueName:Stringextends Replyable [Option [ActorRef]]

用法如下所示:

代码语言:java
AI代码解释
复制
import org.elasticmq.actor.reply._
val lookupFuture: Future[Option[ActorRef]] = queueManagerActor ? LookupQueue("q2")

如前所述,每个队列都是一个actor,并且已经封装了队列状态。我们可以使用简单的可变数据结构,而不需要任何线程同步,因为角色模型(actor model)为我们处理了这个问题。有一些消息可以发送给queue-actor,例如:

代码语言:java
AI代码解释
复制
case class SendMessage(message: NewMessageData)   extends Replyable[MessageData]
case class ReceiveMessages(visibilityTimeout: VisibilityTimeout, count: Int, 
           waitForMessages: Option[Duration])     extends Replyable[List[MessageData]]
case class GetQueueStatistics(deliveryTime: Long) extends Replyable[QueueStatistics]
Rest层

SQS查询/ REST层是使用Spray来实现的,这是一个基于Akka的轻量级REST/HTTP工具包。

除了基于角色的非阻塞IO实现外,Spray还提供了强大的路由spray-routing。它包含一些内置的指令,用于在请求方法(get / post等)上进行匹配,提取表单参数中的查询参数或匹配请求路径。但它也可以让你使用简单的指令组合来定义你自己的指令。一个典型的ElasticMQ route示例如下所示:

代码语言:java
AI代码解释
复制
val listQueuesDirective = 
  action("ListQueues") {
    rootPath {
      anyParam("QueueNamePrefix"?) { prefixOption =>
        // logic
      }
    }
  }

action"Action"URL的body参数中匹配指定的action名称并接受/拒绝请求的地方,rootPath会匹配出空路径(...)。Spray有一个很好的教程,如果你有兴趣,我建议你看看这篇教程。

如何使用路由中的队列角色(queue actors)来完成HTTP请求?

关于Spray的RequestContext好处是,它所做的只是将一个实例传递给你的路由,不需要任何回复。完全放弃请求或使用某个value完成该请求仅仅取决于它的路由。该请求也可以在另一个线程中完成 - 或者,例如,在未来某个线程运行完成时。这正是ElasticMQ所做的。在这里使用mapflatMapfor-comprehensions(这是一个针对map/ flatMap更好的语法)是非常方便的,例如(省略了一些内容):

代码语言:java
AI代码解释
复制
// Looking up the queue and deleting it are going to be called in sequence,
// but asynchronously, as ? returns a Future
for {
   queueActor <- queueManagerActor ? LookupQueue(queueName)
   _ <- queueActor ? DeleteMessage(DeliveryReceipt(receipt))
} {
   requestContext.complete(200, "message deleted")
}

有时,当流程更复杂时,ElasticMQ会使用Akka Dataflow,这需要启用continuations插件。还有一个类似的项目,使用宏,Scala Async,但这个仍处于早期开发阶段。

使用Akka Dataflow,您可以编写使用Future们的代码,就好像编写正常的序列化代码一样。CPS插件会将其转换为在需要时使用回调。这是一个来自CreateQueueDirectives的例子:

(序列化代码sequential code,也有翻译成顺序代码的,即按顺序执行的代码,过程中不存在多线程异步操作,译者注)

代码语言:java
AI代码解释
复制
flow {
  val queueActorOption = (queueManagerActor ? LookupQueue(newQueueData.name)).apply()
  queueActorOption match {
    case None => {
      val createResult = (queueManagerActor ? CreateQueue(newQueueData)).apply()
      createResult match {
        case Left(e) => throw new SQSException("Queue already created: " + e.message)
        case Right(_) => newQueueData
      }
    }
    case Some(queueActor) => {
      (queueActor ? GetQueueData()).apply()
    }
  }
}

这里的重要部分是flow代码块,它界定了转换的范围,以及调用Future提取future内容的apply()。这看起来像完全正常的序列化代码,但是在执行时,因为第一次Future是第一次使用将会异步运行。

长轮询

由于所有的代码都是异步和非阻塞的,实现长轮询非常容易。请注意,从一个队列接收消息时,我们得到一个Future[List[MessageData]]。为了发出响应已完成这个future,HTTP请求也将会以适当的响应来完成。然而,这个future几乎可以立即完成(例如正常情况下),比如在10秒之后 - 代码所需的支持没有变化。唯一要做的就是延迟完成future,直到指定的时间过去或新的消息到达。

实现在QueueActorWaitForMessagesOps中。当接收到消息的请求到达时,队列中没有任何内容产生,而是立即回复(即向发送者actor发送空列表),我们将储存原始请求的引用和发送方actor在map中。使用Akka调度程序,我们还计划在指定的时间超过之后发回空列表并删除条目。

当新消息到达时,我们只需从map上等待一个请求,然后尝试去完成它。同样,所有同步和并发问题都由Akka和actor模型来处理。

请测试新版本,如果您有任何反馈,请让我们知晓!

Adam

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
ElasticMQ 0.7.0:使用Akka和Spray的长轮询,非阻塞实现
原文地址:https://dzone.com/articles/elasticmq-070-long-polling-non
大脸猫爱吃鱼
2018/01/15
1.7K0
Akka 使用系列之四: Future
本文介绍了Akka在Spark中的使用,包括Akka的主要特性和架构。首先介绍了Akka的入门知识,然后详细阐述了Akka在Spark中的使用,包括如何使用Akka进行RPC调用、如何使用Akka异步处理消息和如何使用Akka进行并行计算。最后,本文总结了Akka在Spark中的使用,并介绍了另一种基于Netty的RPC实现。
AlgorithmDog
2017/12/29
1.1K0
Akka 使用系列之四: Future
akka-typed(2) - typed-actor交流方式和交流协议
akka系统是一个分布式的消息驱动系统。akka应用由一群负责不同运算工作的actor组成,每个actor都是被动等待外界的某种消息来驱动自己的作业。所以,通俗点描述:akka应用就是一群actor相互之间发送消息的系统,每个actor接收到消息后开始自己负责的工作。对于akka-typed来说,typed-actor只能接收指定类型的消息,所以actor之间的消息交流需要按照消息类型来进行,即需要协议来规范消息交流机制。想想看,如果用户需要一个actor做某件事,他必须用这个actor明白的消息类型来发送消息,这就是一种交流协议。
用户1150956
2020/06/02
7850
akka-typed(1) - actor生命周期管理
akka-typed的actor从创建、启用、状态转换、停用、监视等生命周期管理方式和akka-classic还是有一定的不同之处。这篇我们就介绍一下akka-typed的actor生命周期管理。
用户1150956
2020/05/29
8960
Akka 指南 之「Akka 和 Java 内存模型」
使用 LightBend 平台(包括 Scala 和 Akka)的一个主要好处是简化了并发软件的编写过程。本文讨论了 LightBend 平台,特别是 Akka 如何在并发应用程序中处理共享内存。
CG国斌
2019/05/26
1.1K0
Akka(25): Stream:对接外部系统-Integration
   在现实应用中akka-stream往往需要集成其它的外部系统形成完整的应用。这些外部系统可能是akka系列系统或者其它类型的系统。所以,akka-stream必须提供一些函数和方法来实现与各种不
用户1150956
2018/01/05
2.1K0
Akka(1):Actor - 靠消息驱动的运算器
  Akka是由各种角色和功能的Actor组成的,工作的主要原理是把一项大的计算任务分割成小环节,再按各环节的要求构建相应功能的Actor,然后把各环节的运算托付给相应的Actor去独立完成。Akka
用户1150956
2018/01/05
6900
Akka 使用系列之一: 快速入门
AlgorithmDog
2017/12/29
1.2K0
Akka 使用系列之一: 快速入门
akka-typed(6) - cluster:group router, cluster-load-balancing
先谈谈akka-typed的router actor。route 分pool router, group router两类。我们先看看pool-router的使用示范:
用户1150956
2020/06/15
8060
基于Scala的并发编程模型Akka
        Akka 是 JVM 平台上构建高并发、分布式和容错应用的工具包和运行时环境。Akka用Scala 语言编写,同时提供了 Scala 、JAVA 的开发接口。
Java架构师必看
2021/05/14
1.3K0
基于Scala的并发编程模型Akka
Akka(8): 分布式运算:Remoting-远程查找式
该文摘要总结:本文介绍了如何利用Akka和Scala实现一个分布式计算系统,用于执行并行计算任务。具体来说,文章介绍了如何利用Akka的Actor模型和Scala的并发编程库来实现一个分布式计算系统,该系统能够执行多个计算任务,并将结果返回给调用者。同时,文章还探讨了如何利用Akka的Identify消息处理Actor的死亡,从而避免Actor的丢失,并提高系统的可用性。
用户1150956
2018/01/05
1.9K0
Akka(33): Http:Marshalling,to Json
  Akka-http是一项系统集成工具。这主要依赖系统之间的数据交换功能。因为程序内数据表达形式与网上传输的数据格式是不相同的,所以需要对程序高级结构化的数据进行转换(marshalling or
用户1150956
2018/01/05
2.1K0
Akka(3): Actor监管 - 细述BackoffSupervisor
    在上一篇讨论中我们谈到了监管:在Akka中就是一种直属父子监管树结构,父级Actor负责处理直属子级Actor产生的异常。当时我们把BackoffSupervisor作为父子监管方式的其中一种
用户1150956
2018/01/05
9300
akka-typed(0) - typed-actor, typed messages
akka 2.6.x正式发布以来已经有好一段时间了。核心变化是typed-actor的正式启用,当然persistence,cluster等模块也有较大变化。一开始从名称估摸就是把传统any类型的消息改成强类型消息,所以想拖一段时间看看到底能对我们现有基于akka-classic的应用软件有什么深层次的影响。不过最近考虑的一些系统架构逼的我不得不立即开始akka-typed的调研,也就是说akka-classic已经无法或者很困难去实现新的系统架构,且听我道来:最近在考虑一个微服务中台。作为后台数据服务调用的唯一入口,平台应该是个分布式软件,那么采用akka-cluster目前是唯一的选择,毕竟前期搞过很多基于akka-cluster的应用软件。但是,akka-cluster-sharding只能支持一种entity actor。毕竟,由于akka-classic的消息是没有类型的,只能在收到消息后再通过类型模式匹配的方式确定应该运行的代码。所以,这个actor必须包括所有的业务逻辑处理运算。也就是说对于一个大型应用来说这就是一块巨型代码。还有,如果涉及到维护actor状态的话,比如persistenceActor,或者综合类型业务运算,那么又需要多少种类的数据结构,又怎样去维护、管理这些结构呢?对我来说这基本上是mission-impossible。实际上logom应该正符合这个中台的要求:cluster-sharding, CQRS... 抱着一种好奇的心态了解了一下lagom源码,忽然恍然大悟:这个东西是基于akka-typed的!想想看也是:如果我们可以把actor和消息类型绑在一起,那么我们就可以通过消息类型对应到某种actor。也就是说基于akka-typed,我们可以把综合性的业务划分成多个actor模块,然后我们可以指定那种actor做那些事情。当然,经过了功能细分,actor的设计也简单了许多。现在这个新的中台可以实现前台应用直接调用对应的actor处理业务了。不用多想了,这注定就是akka应用的将来,还等什么呢?
用户1150956
2020/05/29
5880
Spark集群 + Akka + Kafka + Scala 开发(3) : 开发一个Akka + Spark的应用
前言 在Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境中,我们已经部署好了一个Spark的开发环境。 在Spark集群 + Akka + Kafka + Scala 开发(2) : 开发一个Spark应用中,我们已经写好了一个Spark的应用。 本文的目标是写一个基于akka的scala工程,在一个spark standalone的集群环境中运行。 akka是什么? akka的作用 akka的名字是action kernel的回文。根据官方定义:akka用于r
绿巨人
2018/05/18
1.2K0
akka-typed(4) - EventSourcedBehavior in action
前面提到过,akka-typed中较重要的改变是加入了EventSourcedBehavior。也就是说增加了一种专门负责EventSource模式的actor, 最终和其它种类的actor一道可以完美实现CQRS。新的actor,我还是把它称为persistentActor,还是一种能维护和维持运行状态的actor。即,actor内部状态可以存放在数据库里,然后通过一组功能函数来提供对状态的处理转变,即持续化处理persistence。当然作为一种具备EventSourcedBehavior的actor, 普遍应有的actor属性、方法、消息处理协议、监管什么的都还必须存在。在这篇讨论里我们就通过案例和源码来说明一下EventSourcedBehavior是如何维护内部状态及作为一种actor又应该怎么去使用它。
用户1150956
2020/06/09
5330
Akka(2):Actor生命周期管理 - 监控和监视
  在开始讨论Akka中对Actor的生命周期管理前,我们先探讨一下所谓的Actor编程模式。对比起我们习惯的行令式(imperative)编程模式,Actor编程模式更接近现实中的应用场景和功能测试
用户1150956
2018/01/05
2.5K0
akka-grpc - 应用案例
上期说道:http/2还属于一种不算普及的技术协议,可能目前只适合用于内部系统集成,现在开始大面积介入可能为时尚早。不过有些项目需求不等人,需要使用这项技术,所以研究了一下akka-grpc,写了一篇介绍。本想到此为止,继续其它项目。想想这样做法有点不负责任,像是草草收场。毕竟用akka-grpc做了些事情,想想还是再写这篇跟大家分享使用kka-grpc的过程。
用户1150956
2020/09/01
9350
Akka 指南 之「邮箱」
Akka 的邮箱中保存着发给 Actor 的信息。通常,每个 Actor 都有自己的邮箱,但也有例外,如使用BalancingPool,则所有路由器(routees)将共享一个邮箱实例。
CG国斌
2019/05/26
1.6K0
Akka 指南 之「FSM」
为了使用有限状态机(Finite State Machine)Actor,你需要将以下依赖添加到你的项目中:
CG国斌
2019/05/26
2.8K0
相关推荐
ElasticMQ 0.7.0:使用Akka和Spray的长轮询,非阻塞实现
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档