随着计算机硬件和软件的发展,个人计算机里动辄几千几万线程已经成为家常便饭。而在程序中大量使用并发也成为了一个主流,因为这样的程序有更小的延迟,并且对多核CPU也有更充分的利用。
有锁并发
对于大多数程序员(当然我也基本上是其中一员),并发编程几乎就等价于给相关数据结构加上一个锁(Mutex)。比如如果我们需要一个支持并发的栈,那最简单的方法就是给一个单线程的栈加上锁std::sync::Mutex。(加上Arc是为了能让多个线程都拥有栈的所有权)
use std::sync::{Mutex, Arc};
#[derive(Clone)]
struct ConcurrentStack {
inner: Arc,
}
impl ConcurrentStack {
pub fn new() -> Self {
ConcurrentStack {
inner: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn push(&self, data: T) {
let mut inner = self.inner.lock().unwrap();
(*inner).push(data);
}
pub fn pop(&self) -> Option {
let mut inner = self.inner.lock().unwrap();
(*inner).pop()
}
}
这样做的好处是显而易见的:代码写起来十分方便,毕竟和单线程版本的几乎一样。只需要按照单线程的版本写完,然后给数据结构加上锁,然后在必要的时候获取和释放(在Rust中基本上是自动的)锁即可。
那么问题是什么呢?首先不谈你可能会忘记获取和释放锁(这一点要感谢Rust,在Rust中几乎不可能发生),你可能会面临死锁的问题(哲学家就餐问题)。然后也不谈一些低优先级的任务可能会长期抢占高优先级任务的资源(因为锁是第一位的),当线程数量比较大的时候,大部分的时间都被用在了同步上(等待锁能被获取),性能就会变得非常差。考虑一个大量读取而偶尔写入的并发数据库,如果用锁去处理,即使数据库没有任何更新,每两个读之间也需要做一次同步,代价太大!
无锁并发
于是,大量计算机科学家和程序员就把目光转向了无锁(lock-free)并发。无锁对象:如果一个共享对象保证了无论其他线程做何种操作,总有一些线程会在有限的系统操作步骤后完成一个对其的操作Her91。也就是说,至少有一个线程对其操作会取得成效。使用锁的并发明显就不属于这一范畴:如果获取了锁的线程被延迟,那么这段时间里没有任何线程能够完成任何操作。极端情况下如果出现了死锁,那么没有任何线程能够完成任何操作。
CAS(compare and swap)原语
那大家可能会好奇,无锁并发要怎么实现呢?有没有例子呢?在此之前让我们先看一下一个公认的在无锁并发中非常重要的原子原语:CAS。CAS的过程是用指定值去比较一储存值,只有当他们相同时,才会修改储存值为新的指定值。CAS是个原子操作(由处理器支持,比如x86的compare and exchange (CMPXCHG)),该原子性保证了如果其他线程已经改变了储存值,那么写入就会失败。Rust标准库中的std::sync::atomic中的类型就提供了CAS操作,比如原子指针std::sync::atomic::AtomicPtr
pub fn compare_and_swap(
&self,
current: *mut T,
new: *mut T,
order: Ordering
) -> *mut T
(在这里,不用纠结ordering是什么,也就是说请尽管忽略忽略Acquire,Release,Relaxed)
无锁栈(天真版)
#![feature(box_raw)]
use std::ptr::{self, null_mut};
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering::{Relaxed, Release, Acquire};
pub struct Stack {
head: AtomicPtr,
}
struct Node {
data: T,
next: *mut Node,
}
impl Stack {
pub fn new() -> Stack {
Stack {
head: AtomicPtr::new(null_mut()),
}
}
pub fn pop(&self) -> Option {
loop {
// 快照
let head = self.head.load(Acquire);
// 如果栈为空
if head == null_mut() {
return None
} else {
let next = unsafe { (*head).next };
// 如果现状较快照并没有发生改变
if self.head.compare_and_swap(head, next, Release) == head {
// 读取内容并返回
return Some(unsafe { ptr::read(&(*head).data) })
}
}
}
}
pub fn push(&self, t: T) {
// 创建node并转化为*mut指针
let n = Box::into_raw(Box::new(Node {
data: t,
next: null_mut(),
}));
loop {
// 快照
let head = self.head.load(Relaxed);
// 基于快照更新node
unsafe { (*n).next = head; }
// 如果在此期间,快照仍然没有过时
if self.head.compare_and_swap(head, n, Release) == head {
break
}
}
}
}
我们可以看到,无论是pop还是push思路都是一样的:先在快照上pop或者是push,然后尝试用CAS替换原来的数据。如果快照和数据一致,说明这一期间数据没有被写操作过,于是更新成功。如果不一致,说明在此期间有其他线程修改过数据,那么一切从头再来。这就是一个无锁的栈。似乎一切都已经大功告成了!
内存释放
确实你可能已经大功告成了,但前提是你在写Java,或者是其他有GC的语言。现在的问题在于,在Rust这种没有GC的语言中,pop中
return Some(unsafe { ptr::read(&(*head).data) })
并没有人去释放head,这是一个内存泄漏!哎,看来无锁并发好不容易呢。
crossbeam
在简单看了有锁和无锁并发的例子之后,我们发现并发还真不是那么容易的呢。什么都加个锁虽然简单粗暴但是恐怕成不了大气候。现在我们终于可以有请主角crossbeam了。该库最初的重点就是提供一些无锁的数据结构,让我们能免于绞尽脑汁去和这一难题较劲以及提供了内存管理工具,用来解决刚才我们提到的内存释放问题。现在已经演变成一个并发编程的军火库,提供各种大大小小有关并发编程的工具。再举个细节的小例子,为什么我们上边的ConcurrentStack需要Arc呢,就是因为普通的线程只能使用move而不能引用,而crossbeam也提供了工具去方便我们更容易的写出代码。
下回就让我们深入crossbeam-epoch去看一下基于epoch的垃圾收集工具,以及如何使用它来完成我们的无锁并发栈。
领取专属 10元无门槛券
私享最新 技术干货