本篇对笔者接触过的线程框架模型做一个概括性的总结。
主要介绍三种模型:
1. Disruptor:Apache Storm底层应用了Disruptor来实现worker内部的线程通信;
2. Reactor:Apache Netty整体架构基于Reactor模式;
3. Actor:Akka是在JVM上的Actor模型的实现。而Apache Flink的RPC框架是基于Akka实现的,之后任务执行框架修改为基于Actor的Mailbox模型;
Disruptor
https://lmax-exchange.github.io/disruptor/user-guide/index.html
https://github.com/LMAX-Exchange/disruptor
LMAX Disruptor 是一个高性能的线程间消息库。它起源于 LMAX 对并发性、性能和非阻塞算法的研究,如今已成为 Exchange 基础设施的核心部分。
Disruptor 是一个提供并发环形缓冲区数据结构的库。它被设计为在异步事件处理架构中提供低延迟、高吞吐量的工作队列。
核心抽象
1. RingBuffer——Disruptor底层数据结构实现,核心类,是线程间交换数据的中转地;
2. Sequencer——序号管理器,生产同步的实现者,负责消费者/生产者各自序号、序号栅栏的管理和协调,Sequencer有单生产者,多生产者两种不同的模式,里面实现了各种同步的算法;
3. Sequence——序号,声明一个序号,用于跟踪RingBuffer中任务的变化和消费者的消费情况,Disruptor里面大部分的并发代码都是通过对Sequence的值同步修改实现的,而非锁,这是Disruptor高性能的一个主要原因;
4. SequenceBarrier——序号栅栏,管理和协调生产者的游标序号和各个消费者的序号,确保生产者不会覆盖消费者未来得及处理的消息,确保存在依赖的消费者之间能够按照正确的顺序处理
5. EventProcessor——事件处理器,监听RingBuffer的事件,并消费可用事件,从RingBuffer读取的事件会交由实际的生产者实现类来消费;它会一直侦听下一个可用的序号,直到该序号对应的事件已经准备好。
6. EventHandler——业务处理器,是实际消费者的接口,完成具体的业务逻辑实现,第三方实现该接口;代表着消费者。
7. Producer——生产者接口,第三方线程充当该角色,producer向RingBuffer写入事件。
8. Wait Strategy——Wait Strategy决定了一个消费者怎么等待生产者将事件(Event)放入Disruptor中。
Java内置了几种内存消息队列,如下所示:
我们知道CAS算法比通过加锁实现同步性能高很多,而上表可以看出基于CAS实现的队列都是无界的,而有界队列是通过同步实现的。在系统稳定性要求比较高的场景下,为了防止生产者速度过快,如果采用无界队列会最终导致内存溢出,只能选择有界队列。
而有界队列只有ArrayBlockingQueue,该队列是通过加锁实现的,在请求锁和释放锁时对性能开销很大,这时候基于有界队列的高性能的Disruptor就应运而生。
Disruptord的高性能之道
1. 环形数据结构
为了避免垃圾回收,采用数组而非链表,本质是对象资源复用技术。同时,数组对处理器的缓存机制更加友好。
2. 元素位置定位
数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
3. 无锁设计
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。整个过程通过原子变量CAS,保证操作的线程安全。
Reactor
https://netty.io/
https://github.com/netty/netty
这个模式从Java NIO中来,是一种基于事件驱动的设计模式。Doug Lea(JUC并发包的作者)的"Scalable IO in Java"中阐述了Reactor模式。
Scalable IO in Java 地址:
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
演进过程
最最原始的网络编程思路就是服务器用一个while循环,不断监听端口是否有新的套接字连接,如果有,那么就调用一个处理函数处理,类似:
while(true){
socket = accept();
handle(socket)
}
这种方法的最大问题是无法并发,效率太低,如果当前的请求没有处理完,那么后面的请求只能被阻塞,服务器的吞吐量太低。
之后,想到了使用多线程,也就是很经典的connection per thread,每一个连接用一个线程处理,tomcat服务器的早期版本确实是这样实现的。
优点:
一定程度上极大地提高了服务器的吞吐量,因为之前的请求在read阻塞以后,不会影响到后续的请求,因为他们在不同的线程中。
缺点:
缺点在于资源要求太高,系统中创建线程是需要比较高的系统资源的,如果连接数太高,系统无法承受,而且,线程的反复创建-销毁也需要代价。
单线程Reactor
抽象出来两个组件——Reactor和Handler两个组件:
(1) Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理;新的事件包含连接建立就绪、读就绪、写就绪等。
(2) Handler:将自身(handler)与事件绑定,负责事件的处理,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。
缺点:
当其中某个 handler 阻塞时,会导致其他所有的client 的 handler 都得不到执行,并且更严重的是,handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少。这种单线程模型不能充分利用多核资源,所以实际使用的不多。因此,单线程模型仅仅适用于handler 中业务处理组件能快速完成的场景。
多线程Reactor
在单线程Reactor模式基础上,做如下改进:
1. 将Handler处理器的执行放入线程池,多线程进行业务处理。
2. 对于Reactor而言,可以仍为单个线程。如果服务器为多核的CPU,为充分利用系统资源,可以将Reactor拆分为两个线程。
Reactor优缺点
优点:
(1) 响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;
(2) 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
(3) 可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;
可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性;
缺点:
(1) 相比传统的简单模型,Reactor增加了一定的复杂性,因而有一定的门槛,并且不易于调试。
(2) Reactor模式需要底层的SynchronousEvent Demultiplexer支持,比如Java中的Selector支持,操作系统的select系统调用支持,如果要自己实现Synchronous Event Demultiplexer可能不会有那么高效。
(3) Reactor模式在IO读写数据时还是在同一个线程中实现的,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或者此时使用改进版的Reactor模式如Proactor模式。
Reactor vs Proactor模型:
Reactor模型:
1 向事件分发器注册事件回调
2 事件发生
3 事件分发器调用之前注册的函数
4 在回调函数中读取数据,对数据进行后续处理
Proactor模型:
1 向事件分发器注册事件回调
2 事件发生
3 操作系统读取数据,并放入应用缓冲区,然后通知事件分发器
4 事件分发器调用之前注册的函数
5 在回调函数中对数据进行后续处理
以下是Netty中的Reactor模型:
https://www.jianshu.com/p/0d0eece6d467
Actor
https://akka.io/
https://github.com/akka/akka
Carl Hewitt 在1973年对Actor模型进行了如下定义:"Actor模型是一个把'Actor'作为并发计算的通用原语". Actor是异步驱动,可以并行和分布式部署及运行的最小颗粒。也就是说,它可以被分配,分布,调度到不同的CPU,不同的节点,乃至不同的时间片上运行,而不影响最终的结果。因此Actor在空间(分布式)和时间(异步驱动)上解耦的。而Akka是Lightbend(前身是Typesafe)公司在JVM上的Actor模型的实现。我们在了解actor模型之前,首先来了解actor模型主要是为了解决什么样的问题。
在akka系统的官网上主要介绍了现代并发编程模型所遇到的问题,里面主要提到了三个点
(1) 在面向对象的语言中一个显著的特点是封装,然后通过对象提供的一些方法来操作其状态,但是共享内存的模型下,多线程对共享对象的并发访问会造成并发安全问题。一般会采用加锁的方式去解决
加锁会带来一些问题:
1. 加锁的开销很大,线程上下文切换的开销大
2. 加锁导致线程block,无法去执行其他的工作,被block无法执行的线程,其实也是占据了一种系统资源
3. 加锁在编程语言层面无法防止隐藏的死锁问题
(2) Java中并发模型是通过共享内存来实现,cpu中会利用cache来加速主存的访问,为了解决缓存不一致的问题,在java中一般会通过使用volatile来标记变量,让jmm的happens before机制来保障多线程间共享变量的可见性。因此从某种意义上来说是没有共享内存的,而是通过cpu将cache line的数据刷新到主存的方式来实现可见。因此与其去通过标记共享变量或者加锁的方式,依赖cpu缓存更新,倒不如每个并发实例之间只保存local的变量,而在不同的实例之间通过message来传递。
(3) call stack的问题 当我们编程模型异步化之后,还有一个比较大的问题是调用栈转移的问题,如下图中主线程提交了一个异步任务到队列中,worker thread 从队列提取任务执行,调用栈就变成了workthread发起的,当任务出现异常时,处理和排查就变得困难。
那么akka 的actor的模型是怎样处理这些问题的?
actor通过消息传递的方式与外界通信。消息传递是异步的。每个actor都有一个邮箱,该邮箱接收并缓存其他actor发过来的消息,actor一次只能同步处理一个消息,处理消息过程中,除了可以接收消息,不能做任何其他操作。
Actor模型的另一个好处就是可以消除共享状态,因为它每次只能处理一条消息,所以actor内部可以安全的处理状态,而不用考虑锁机制。
(1) actor之间可以互相发送message。
(2) actor在收到message之后会将其存入其绑定的Mailbox中。
(3) Actor中Mailbox中提取消息,执行内部方法,修改内部状态。
(4) 继续给其他actor发送message。
可以看到下图,actor内部的执行流程是顺序的,同一时刻只有一个message在进行处理,也就是actor的内部逻辑可以实现无锁化的编程。actor和线程数解耦,可以创建很多actor绑定一个线程池来进行处理,no lock,no block的方式能减少资源开销,并提升并发的性能
参考
1 https://developer.aliyun.com/article/616952?spm=a2c6h.13262185.0.0.4ce163f8Bh85tc
2 https://zhuanlan.zhihu.com/p/404668883
3 https://www.jianshu.com/p/e48d83e39a2f
4 https://zhuanlan.zhihu.com/p/229338771
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有