为了解决高并发下多线程对一个变量CAS争夺失败后进行自旋而造成的降低并发性能的问题,LongAdder在内部维护多个Cell元素**(一个动态的Cell数组)**来分担单个变量进行争夺开销。下面围绕以下话题从源码角度来分析LongAdder的实现!
解决问题1,首先看下LongAdder的类结构图,如图所示:
由该图可知,LongAdder继承自Striped64类,在Striped64内部维护着三个变量。
LongAdder的真实值其实是base的值与Cell数组里面所有Cell元素中的value值的累加,base是个基础值,默认为0。
cellsBusy用来实现自旋锁,状态值只有0和1,当创建Cell元素,扩容Cell数组或者初始化Cell数组时,使用CAS操作该变量来保证同时只有一个线程可以进行其中之一的操作。
解决问题6,下面看Cell的构造!
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
可以看到,Cell的构造很简单,其内部维护了一个被声明为volatile的变量,这里生命为volatile是因为线程操作value变量时没有使用锁,为了保证变量的内存可见性这里将其声明为volatile的。另外cas方法通过CAS操作,保证了当前线程更新时被分配的Cell元素中Value值的原子性。另外,Cell类使用@sun.misc.Contended修饰是为了避免伪共享。
下面先说一说LongAdder常用函数,然后在里边寻找答案。
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}
public long sumThenReset() {
Cell[] as = cells; Cell a;
long sum = base;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;
a.value = 0L;
}
}
}
return sum;
}
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// (1)~~~
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
// (2)~~~
if (as == null || (m = as.length - 1) < 0 ||
// (3) ~~~
(a = as[getProbe() & m]) == null ||
// (4)~~~
!(uncontended = a.cas(v = a.value, v + x)))
// (5)
longAccumulate(x, null, uncontended);
}
}
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
代码(1)首先看cells是否为null,如果为nul则当前在基础变量base上进行累加,这时候就类似AtomicLong的操作。 如果cells不为null或者线程执行代码(1)的CAS操作失败了,则会去执行代码(2)。 代码(2) (3) 决定当前线程应该访问cells数组里面的哪一个Cell元素,如果当前线程映射的元素存在执行代码(4),使用CAS操作去更新分配的Cell元素的value值,如果当前线程映射元素不存在或者存在但是CAS操作执行失败执行代码(5)。 其实将代码(2) (3) (4) 合起来看就是获取当前线程应该访问的cells数组的Cell元素,然后进行CAS更新操作,只是获取期间如果有些条件不满足则会跳转到代码(5)。执行。另外当前线程应该访问cells数组的哪一个Cell元素是通过getProbe() & m 进行计算的,其中m是当前cells数组元素个数-1,getProbe() 则用于获取当前线程中变量 threadLocalRandomProbe的值,这个值一开始为0,在代码(5) 里面会对其进行初始化。并且当前线程通过分配的Cell元素的cas函数来保证对Cell元素value值更新的原子性,到这里再一次完善了问题6并且回答了问题2。
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// (6) 初始化当前线程的变量threadLocalRandomProde的值
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current();
h = getProbe();
wasUncontended = true;
}
boolean collide = false;
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) { // (7)
if ((a = as[(n - 1) & h]) == null) { // (8)
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true;
// (9) 当前Cell存在,则执行CAS设置
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// (10) 当前Cell数组元素个数大于CPU个数
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// (11) 是否有冲突
else if (!collide)
collide = true;
// (12) 如果当前元素个数没有达到CPU个数并且有冲突则扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
// (12.1)
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
// (12.2)
cellsBusy = 0;
}
// (12.3)
collide = false;
continue; // Retry with expanded table
}
// (13) 为了能够找到一个空闲的Cell,重新计算hash,xorshift算法生成随机数
h = advanceProbe(h);
}
// (14) 初始化Cell数组
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
// (14.1)
Cell[] rs = new Cell[2];
// (14.2)
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
// (14.3)
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
上面比较复杂,我们只关注问题3,4,5部分。 当每个线程第一次执行到代码(6)时,会初始化当前线程变量threadLocalRandomProde的值,上面也说了,这个变量在计算当前线程被分配到cells数组的哪一个Cell元素时会用到。 问题3 cells数组的初始化是在代码(14) 中进行的,其中cellsBusy是一个标示,为0说明当前cells数组没有在被初始化或者扩容,也没有在新建Cell元素,为1则说明cells数组在被初始化或者扩容,或者当前在创建新的Cell元素,通过CAS操作来进行0或1状态的切换,这里使用casCellsBusy方法。假设当前线程通过CAS设置cellsBusy为1,则当前线程开始初始化操作,那么这时候其他线程就不能进行扩容了。如代码(14.1)初始化cells数组元素个数为2,然后使用h&1计算当前线程应该访问cell数组的哪个位置,也就是当前线程的threadLocalRandomProbe变量&(cells数组元素个数-1),然后标示cells数组已经被初始化了,最后代码(14.3)重置了cellsBusy标记。显然这里没有使用CAS操作,却是线程安全的,原因是cellsBusy是volatile修饰的。这保证了变量的内存可见性,另外此时其他地方的代码没有机会修改cellsBusy的值。在这里初始化的cells数组里面的两个元素的值目前还是null。这里回答了问题3,知道了cells数组如何被初始化的。 问题4 cells数组的扩容实在代码(12)中进行的,对cells扩容是有条件的,也就是代码(10)(11)的条件都不满足的时候。具体就是当前cells的元素个数小于当前机器CPU个数并且当前多个线程访问了cells中同一个元素,从而导致冲突使其中一个线程CAS失败时才会进行扩容操作。这里为何要涉及CPU个数呢?因为只有当每个CPU都运行一个线程时才会使多线程的效果最佳,也就是当cells数组元素个数与CPU个数一致时,每个Cell都使用一个CPU进行处理,这时性能才是最佳的。代码(12)中的扩容操作也是先通过CAS设置cellsBusy为1,然后才能进行扩容。假设CAS成功则执行代码(12.1)将容量扩充为之前的2倍,并复制Cell元素到扩容后数组。另外,扩容后cells数组里面除了包含复制过来的元素外,还包含其他新元素,这些元素值目前还是null。这里就解决了问题4。 在代码(7)(8)中,当前线程调用add方法并根据当前线程的随机数threadLocalRandomProbe和cells元素计算个数计算访问的Cell元素下标,然后如果发现对应下标值为null,则新增一个Cell元素到cells数组,并且将其添加到cells数组之前要竞争设置cellsBusy为1。 问题5 代码(13)对CAS失败的线程重新计算当前线程的随机值threadLocalRandomProbe,以减少下次访问cells元素时的冲突机会。这里回答了问题5。
介绍完了JDK1.8中新增的LongAdder原子性操作类,该类通过内部cells数组分担了高并发下多线程同时对一个原子变量进行更新时的竞争两,让多个线程可以同时对cells数组里面的元素进行并行的更新操作。另外,数组元素Cell使用@sun.misc.Contended注解进行修饰,这避免了cells数组内多个原子变量被放入同一个缓存行,也是避免了伪共享,这对性能也是一个提升。