作者:林生生,GrowingIO 运营产品线研发经理,主要负责 GrowingIO 智能运营产品线研发管理工作。
GrowingIO 是一家提供增长平台的公司。在 2018 年初我们推出了基于底层数据能力的智能运营平台,结合精准的用户分群,数据采集以及多种运营方式,帮助企业客户用数据驱动用户运营,随时验证假设,助力产品增长。产品有以下特点:
下图是运营平台站外触达业务流程图。用户可以随时发起一次站外运营活动,通常是一个站外的触点(推送、短信、Webhook)。后台系统需要查询底层的数据平台接口,获取此次活动对应的人群信息,同时组装活动数据并对外投递任务。
在这个业务场景,需要解决如下几个问题:
综上,为了最大化利用服务器资源、提高服务稳定性和优化终端用户体验,GrowingIO 服务端团队在异步与反应式编程上做了一些实践。本文将介绍在优化过程中的探索与思考,希望能为读者带来帮助。
传统服务端程序一般采用同步阻塞模型,通过分配更多线程来支撑更多请求,这符合常人思维模式,但在突发流量的情况下,同步模型可能会导致线程池耗尽,基于一个请求一个线程的服务模式无法做到动态伸缩。
而异步编程的做法是基于一个共享的线程池,所有操作都是回调。如果遇到耗时的操作,线程并不会阻塞等待操作完成,而是会被释放回线程池中继续接受新的请求。等到耗时操作完成后(一般都是IO操作),通过消息机制重新向线程池申请线程恢复之前的请求代码。
我们可以简单写个程序简单实验一下,实现相同逻辑:1. 查询 db 2. 查询外部系统 3. 组装信息返回。唯一区别是一个是同步调用的实现,另一个是采用完全异步的方式实现。
本地使用相同的 jmeter 参数模拟并发测试,得到结果如下,从左到右每列的含义分别为:请求名称、请求数目、失败请求数目、错误率(本次测试中出现错误的请求的数量/请求的总数)、平均响应时间、最短响应时间、最大响应时间、90%用户响应时间、95%用户响应时间、99%用户响应时间、吞吐量。
总体测试结果如下:
同步代码测试结果
异步代码测试结果
同步代码总共完成了 260 次请求,平均响应时间约 5 秒,因为阻塞程序耗尽了线程池导致程序出现了拒绝服务的情况,产生了 13% 的错误率。
异步代码整体吞吐量有明显提升,相同时间内完成了 3000 次请求。错误率为 0 ,并且整体没有出现拒绝服务的情况。
可以看到基于消息驱动机制的异步系统能极大提高资源利用率,提高系统的吞吐量。而响应式系统则在消息驱动的基础上增加了三个要求:及时响应性、回弹性和可扩展性。
简单来说具备以下四个特点的系统可以称为一个响应式系统:
对应的响应式编程是一种程序设计思想,在 java 8 中首次引入了响应式流的规范,即 Reactive Streams 接口。Reactive Streams 非常类似于 JPA 或 JDBC,都是 API 规范,实际使用时需要采用对应的具体实现。JDK 提供的 Reactive Streams 接口:
Reactive Streams API 的范围是找到一组最小的接口,这些接口将描述必要的操作和实体,从而实现具有非阻塞背压的异步数据流。社区对于 Reactive Streams 的实现比较多,这里做一个简单的汇总和对比。
总结一下,如果是移动设备使用 rxjava 是比较合适的选择。如果是在服务端使用 spring 框架做开发,采用基于 reactor 实现的 webflux 更合适。如果是对性能要求很高,业务相对简单的场景,选择 vertx 可以最大限度发挥机器性能。而 gio 的真实场景是服务端的复杂业务系统,同时使用 scala 作为开发语言并且使用 play 作为 web 开发框架。所以在系统构建之初很自然的选择了 akka 作为我们的响应式系统的实现基础。
在最初的时候并没有直接采用 akka-stream,而是选择更为简单,建模能力更强的 akka-actor 作为系统实现的基础。Akka-actor 是基于 actor 模型构建的异步工具包, 使用 akka-actor 可以很轻松的进行基于消息驱动的异步编程。Actor 的基础就是消息传递,一个 actor 可以认为是一个基本的计算单元,它能接收消息并执行运算,它也可以发送消息给其他 actor。Actors 之间相互隔离,它们之间并不共享内存,所以 Actor 不需要去关注锁和内存原子性等一系列多线程常见的问题。
Akka-actor 最核心的实现包含三个部分:
Actor 本身是不绑定线程的,相同进程的 actor 共享一个线程池,mailbox 是一个 runnable 对象,核心逻辑就是从队列中取出消息调用 behavior 进行处理。
override final def run(): Unit = {
try {
if (!isClosed) { //Volatile read, needed here
processAllSystemMessages() // 先处理系统级别消息
processMailbox() // 然后处理普通消息
}
} finally {
setAsIdle() //Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
}
}
在同一个进程中,可以通过调整 akka-actor 线程池大小来进行纵向负载伸缩。同时,akka-actor 支持在一个系统中绑定不同类型、数量的线程池。比如在一些耗时较长的 IO 场景下可以单独配置一个线程池起到隔离的目的。对需要横向扩展的场景,akka 提供了基于 gossip 协议的点对点去中心化集群解决方案 akka-cluster。
Akka-cluster 通过 gossip 协议进行成员之间的发现和状态同步,同时提供了更高层的集群工具:
理论上使用 akka-actor 和 akka-cluster 可以使系统具备极强的扩展性(弹性)。但是在实际使用中我们并没有采用 akka-cluster 去扩展系统,原因也很简单,akka-cluster 生产案例太少,功能上过于复杂,不利于大规模推广。最终我们使用了传统的消息中间件作为系统横向扩展的解决方案。在单机内使用 akka-actor,涉及到跨节点通信的场景使用消息中间件进行通信。
在系统回弹性方面,akka-actor 提供了基于层级的监督机制。可以把整个 actor 系统看做是一棵树,每个 actor 实例都是树中的一个节点。监督机制指的是每个 actor 都是其子 actor 的监督者,需要针对子 actor 制定一个错误处理策略。
对应到具体的业务系统里,我们将整个流程分割成多个 actor 实现,为了实现监督与错误恢复,需要创建一个顶层 route actor 来引用所有具体的业务 actor 。如果某个业务actor 遇到问题并抛出了异常,异常会被监管者 route actor 来处理。监管者可以选择恢复出现问题的 actor 或者重启,也可能会将其停止掉,这依赖于问题的严重程度和恢复策略。Akka-actor 中有以下 4 种错误处理策略:
最终我们基于 actor 实现了整个业务流程:当一个用户发起一次站外活动请求,主应用(基于 play)会将活动的元数据写入数据库中然后立马返回结果到前端,达到及时响应的目的。同时将活动请求封装成一个 actor 消息,异步的投递给 route actor 进行后续的任务处理。Route actor 会根据接收到的具体消息类型进行路由分发,分别是 User Insight Actor(查询人群信息) - Build Push Task Actor(查询 db 组装 task) - Checkpoint Actor(存储 task 信息)- Publish Task Actor(发布 task 到 kafka)。
采用消息驱动的方式设计系统取得了一些好处:
上文提到我们创建了一个 route actor 将所有业务 actor 组织到一起,这样既能起到一个监督的作用,也可以知道全局的逻辑视图。但是这种实现方式也会带来一个问题,整体编排较为复杂。对于带有分支与合并逻辑的处理流更是难以描述,对后续新增流程也没有约束,只能人为约定一个顺序,比如在上面的比较靠前,可维护性比较差。
又因为整个流程中 User Insight Actor 部分依赖外部数据查询系统,比较容易成为整个系统的瓶颈。在负载不断变化的情况下,外部查询可能会失败,从而对系统整体可用性造成影响。针对这个问题需要设计对应的限流机制和重试机制。 上面提到 Actor 的 mailbox 本身就是一个队列,如果在负载过高的情况下消息是可以丢弃的,只需要指定 actor 的 maibox 类型为有界队列即可。假如消息不能被丢弃,可以采用令牌桶算法实现限流功能。对于重试机制,User Insight Actor 本身是无状态的,这里很自然想到在失败时重新发送试消息到 User Insight Actor 本身进行重试。
这个方案比较简单,如果要满足一些特殊场景下的需求,比如设定重试次数,延迟执行重试请求,指定重试失败后的降级策略,只能通过定制一些逻辑实现,但是要做到代码灵活复用需要花费大量时间进行设计。
上述方案都能满足业务需求,总体来讲通过 actor 模型可以快速实现轻量业务异步封装,但面对相对复杂业务逻辑时还是存在一些局限:
这也是为什么后来采用了 akka-stream 来对处理流程进行重构。Akka-stream 是基于 akka-actor 的 Reactive Streams 规范实现,具备以下特点:
并在上层提供了更加抽象灵活的 DSL 封装,即 source、sink、flow 组件。
Source 即响应流的源头,源头具有一个数据出口。我们可以通过各种数据来创建一个 Source:
val sourceFromRange = Source(1 to 10)
val sourceFromIterable = Source(List(1, 2, 3))
val sourceFromFuture = Source.fromFuture(Future.successful("hello"))
Sink 就是流的最终目的地,包含一个数据入口,我们可以如下来创建 Sink:
val sinkPrintingOutElements = Sink.foreach[String](println(_))
val sinkCalculatingASumOfElements = Sink.fold[Int, Int](0)(_ + _)
Flow 就是流的中间组件,包含一个数据入口和数据出口。我们可以这样来创建 Flow:
val flowDoublingElements = Flow[Int].map(_ * 2)
val flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0)
val flowBatchingElements = Flow[Int].grouped(10)
而整个业务流可以通过基础组件构成的图和网络来表示:
流式操作可以类比成流水线,每个算子都是一道处理工序,数据源就是加工原材料,经过多道工序处理后最后输出一个成品。
上文提到为了实现对系统速率的控制,引入了限流的逻辑,比如基于令牌桶算法的实现,只有程序拿到了令牌才能进入下一段处理逻辑,本质上这种实现方式是同步阻塞的,而且真实情况下下游节点可能完全能承载更多的请求。为了解决数据源和下游节点处理速度不一致的问题,在 Reactive Streams 的规范里引入了背压机制,本质上是一种由处理者向数据源发起数据请求,从而进行速度调整的一种方式。Akka-Stream 提供了一套开箱即用的背压功能,其实现方式和 Reactive Streams 一致,下游 subscriber 通过发送 subscription 到上游的 publisher 主动请求需要处理的元素数量。这样就能从整个数据流的源头进行速率控制,采用 pull 而不是 push 的模式能让系统按需保持最大的处理能力,同时又不会崩溃。
下面是基于 akka stream 重构后的处理流,简单对比 akka actor 的实现方式,基于操作符的组合代码更加清晰易读,可以轻松实现复杂任务编排。
从底层实现来讲,akka-stream 底层还是基于 akka-actor 进行工作的,只是在上层提供了更高视角的 DSL 封装 。这种灵活的编程方式能极大提高代码复用性和可维护性。
本文记录了 GrowingIO 服务团队在针对具体业务场景进行反应式系统设计的实践总结,从异步编程到使用 actor 模型构建基于消息驱动的系统,为了降低系统复杂度提高可维护性又引入了 akka-stram 作为反应式流的编排框架。最后,希望能与对反应式技术感兴趣的同学多多交流,打个小广告:我们的工程团队持续在招聘中~ 服务端、前端、大数据各种攻城狮都缺,感兴趣朋友欢迎砸简历 https://www.growingio.com/joinus。
参考资料:
https://info.lightbend.com/rs/558-NCX-702/images/COLL-ebook-Reactive-Microservices-Architecture.pdf
https://learning.oreilly.com/library/view/applied-akka-patterns
https://freecontent.manning.com/akka-in-action-why-use-clustering/
关于 GrowingIO
GrowingIO 是国内领先的一站式数字化增长整体方案服务商。为产品、运营、市场、数据团队及管理者提供客户数据平台、广告分析、产品分析、智能运营等产品和咨询服务,帮助企业在数字化转型的路上,提升数据驱动能力,实现更好的增长。
领取专属 10元无门槛券
私享最新 技术干货