前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >LongAdder 源码分析

LongAdder 源码分析

作者头像
itliusir
发布2020-02-10 10:51:56
5350
发布2020-02-10 10:51:56
举报
文章被收录于专栏:刘君君

摘要:

  1. 有了 AtomicLong 为什么还会有 LongAdder

TOP 带着问题看源码

  1. 有了 AtomicLong 为什么还会有 LongAdder

1. 基本介绍

LongAdder 是一个线程安全,JDK 8新加入的一个用来计数的工具类

按照作者的说法,LongAdder 在多个线程更新下比 AtomicLong 性能更好,但要消耗更多的空间

LongAdder 继承自 Striped64,其对一些简单情况做了处理(cell 存在且更新没有竞争),复杂情况交给 Striped64 的 longAccumulate。

2. Striped64

Striped64 设计思路是把多个线程分散到不同计数单元,减少线程竞争,提高并发效率

2.1 成员变量分析

代码语言:javascript
复制
// 可用 CPU 数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
// cell 数组,大小为2的幂次方
transient volatile Cell[] cells;
// 基础偏移值
transient volatile long base;
// 0 无锁 1 有锁
transient volatile int cellsBusy;

2.2 Cell 类分析

代码语言:javascript
复制
@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

Cell 类是 Striped64 的静态内部类。通过注解 `@sun.misc.Contended` 来自动实现缓存行填充,让 Java 编译器和 JRE 运行时来决定如何填充。本质上是一个填充了的、提供了 CAS 更新的 volatile 变量。

2.3 longAccumulate() 分析

代码语言:javascript
复制
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
  	// 获取线程的 probe hash值,如果 seed 初始化,probe 为非0
    if ((h = getProbe()) == 0) {
      	// 如果 probe 为0,就强制初始化一次
        ThreadLocalRandom.current(); // force initialization
      	// get 到 probe
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
          	// 通过 hash值获取数组 cells 一个index
            if ((a = as[(n - 1) & h]) == null) {
              	// 当前位置为空,并且拿到锁(cellsBusy 0是无锁,1是有锁)
                if (cellsBusy == 0) {       // Try to attach new Cell
                  	// 构建一个 Cell
                    Cell r = new Cell(x);   // Optimistically create
                  	// casCellsBusy 会把 cellsBuy 设置为1,也即是获取锁
                    if (cellsBusy == 0 && casCellsBusy()) {
                      	// 创建标识
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                // 计算hash位置j
                                rs[j = (m - 1) & h] == null) {
                              	// 把新构建的 Cell 塞到数组cells的index j的地方
                                rs[j] = r;
                              	// 更新创建完成状态
                                created = true;
                            }
                        } finally {
                          	// free lock
                            cellsBusy = 0;
                        }
                      	// 如果完成直接退出
                        if (created)
                            break;
                      	// 否则继续创建(失败)
                        continue;           // Slot is now non-empty
                    }
                }
              	// 执行到这里说明也是失败(没拿到锁),设置碰撞标识为false
                collide = false;
            }
          	// hash位置已经有值,则往下走
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
          	// 对当前位置累加,例如原本地方的值是1,要加1,现在则为2。成功就退出
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
              	// cells 长度大于cpu数量,设置碰撞标识为false
                collide = false;            // At max size or stale
            else if (!collide)
              	// 碰撞标识设置为 true
                collide = true;
          	// 说明前面操作没有成功,再次尝试获取锁进行扩容
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                      	// 扩容2倍,然后数组copy
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                  	// lock free
                    cellsBusy = 0;
                }
              	// 扩容后重试
                collide = false;
                continue;                   // Retry with expanded table
            }
          	// 重新计算 hash 值
            h = advanceProbe(h);
        }
      	// 1. 初始化 cells,
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                  	// 最开始容量是2
                    Cell[] rs = new Cell[2];
                  	// hash对应位置赋值
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
      	// 初始化失败,CAS 把 value 累加到 base
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

3. add() 分析

代码语言:javascript
复制
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
  	// cells 为空直接使用 cas 赋值,cas成功直接返回
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
          	// cas 失败 || cells 不为空 且 index 处为null || cas 再次修改失败
          	// 调用 Striped64 的 longAccumulate
            longAccumulate(x, null, uncontended);
    }
}

4. sum() 分析

熟悉 ConcurrentHashMap 的同鞋看到 sum 相比已经很熟悉,惰性按需计算,可能会不太精准

代码语言:javascript
复制
public long sum() {
    Cell[] as = cells; Cell a;
  	// 先统计 base的值
    long sum = base;
    if (as != null) {
      	// 再遍历 cells 中的值进行累加
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

5. reset() 分析

遍历 cells 数组,重置为0

代码语言:javascript
复制
public void reset() {
    Cell[] as = cells; Cell a;
    base = 0L;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                a.value = 0L;
        }
    }
}

总结

可以看到 LongAdder 的核心思路就是保证高并发最坏的情况,通过对线程进行散列分片减少竞争时长,利用上了多核的性能。这种设计方式和 CSAPP 中 提高并行性.md#%E6%8F%90%E9%AB%98%E5%B9%B6%E8%A1%8C%E6%80%A7](https://github.com/itliusir/CS_Notes/blob/master/操作系统/操作系统(十二).md#提高并行性) 提到的方式是一样的。

回到开篇 TOP 1 问题,可以看到 LongAdder 主要目的是解决高并发下 AtomicLong 自旋开销问题 。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-07-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TOP 带着问题看源码
  • 1. 基本介绍
  • 2. Striped64
    • 2.1 成员变量分析
      • 2.2 Cell 类分析
        • 2.3 longAccumulate() 分析
        • 3. add() 分析
        • 4. sum() 分析
        • 5. reset() 分析
        • 总结
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档