LongAdder实现原理

Java并发工具类- LongAdder的实现原理过程 - 知乎 (zhihu.com)

(4 封私信 / 80 条消息) 2020-10-22:谈谈java中LongAdder和LongAccumulator相同点和不同点? - 知乎 (zhihu.com)

Longadder的功能和atomicLong实际的作用其实是一样的,都是一个线程安全的元子类。

其区别在于,AtomicLong的value是一个。而LongAdder将value分成了多个,分散了线程竞争,将本来对一个变量的多线程竞争,分散到多个变量上。

AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

组成

先来看看LongAdder类中的重要属性

继承了

public class LongAdder extends Striped64 implements Serializable

Striped64是什么?代表着64字节,即正好是一个CPU缓存行的大小。通常对intel来说,CPU读取内存的数据,会一次性读取64个字节,即正好是一个缓存行的大小,并且是一个原子性操作,不会有多个CPU同时读取这64个字节。

Striped64类中的主要属性

Ceel类

Cell类上有一个@Contended的注解,这个注解的意思就是Cell代表着一个64字节的类

@sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

base变量:当没有线程竞争时,首先会对base进行增加。

Cells数组:当检测有base变量在被线程占有时,就会尝试去cell数组的变量加。

总和为base变量+cells数组中的value

    /**
     * Table of cells. When non-null, size is a power of 2.
     */
    transient volatile Cell[] cells;

    /**
     * Base value, used mainly when there is no contention, but also as
     * a fallback during table initialization races. Updated via CAS.
     */
    transient volatile long base;

ADD具体过程

add方法

/**
     * Adds the given value.
     *
     * @param x the value to add
     */
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null    //当cells!=null,说明可能,那么就尝试对base变量的CAS
        || !casBase(b = base, b + x)) {//如果CAS base变量失败 就进入下面代码
        boolean uncontended = true;    //表示没有竞争
        if (as == null || (m = as.length - 1) < 0 ||//情况1 cell数组中没有初始化
            (a = as[getProbe() & m]) == null ||    // 情况2 cell数组中的cell对象没有初始化
            !(uncontended = a.cas(v = a.value, v + x)))    //情况3 CAS Cell对应的value值
            //综上,满足上述三种情况之一,就会进入这个方法,方法详情见后续
            longAccumulate(x, null, uncontended);
    }
}

我longAccumlate方法

主要思想:不要让主线程在一个竞争店进行循环,让它去别的竞争点尝试

如果抢占当前cell失败,那么尝试抢占Base变量;

如果抢占Base变量失败,那么尝试从Cell数组中换一个Cell来抢占

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            //对应情况二:cell数组不为空,但是数组中的cell对象为null
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // 如果到了这里,可以理解为有线程在竞争
                    wasUncontended = true;      // 那么将wasUncotended设置为true说明有竞争
                else if (a.cas(v = a.value, ((fn == null) ? v + x :    //选中cell对象不为空,对cell的value进行cas
                                             fn.applyAsLong(v, x))))
                    break;
                else if (n >= NCPU || cells != as)    //将cell对象的value值CAS失败,就进入这里。如果数量大于CPU核心数,就没有必要进行扩容了或者
                    collide = false;            // 将collide设置为false;再下一次循环时,就会进入下一行代码。
                else if (!collide)
                    collide = true;            //将collide设置为true,下一次循环跳入到扩容代码
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // 判断cell是否被其他线程改动
                            Cell[] rs = new Cell[n << 1];    //对Cell进行扩容
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);    //换一个随机数 即换一个对象
            }
            //对应第一种情况 Cells数组为空需要初始化
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //如果以上俩个都不满足,则进入尝试对base的add操作,因为可能之前操作base的线程已经操作完成了,所以可以尝试,不要让线程只等待在无意义的自旋上
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

longAccumulate

LongAccumulator是LongAdder的增强版。LongAdder只能针对数值的进行加减运算,而LongAccumulator提供了自定义的函数操作。其构造函数如下:

public LongAccumulator(LongBinaryOperator accumulatorFunction,
                       long identity) {
    this.function = accumulatorFunction;
    base = this.identity = identity;
}

通过LongBinaryOperator,可以自定义对入参的任意操作,并返回结果(LongBinaryOperator接收2个long作为参数,并返回1个long)。LongAccumulator内部原理和LongAdder几乎完全一样,都是利用了父类Striped64的longAccumulate方法。

public class LongAccumulatorTest {

    public static void main(String[] args) throws InterruptedException {
        // 创建一个 LongAccumulator 对象,用于累加 x+y,初始值为 0
        LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 0);
        // 创建一个固定大小为 8 的线程池
        ExecutorService executor = Executors.newFixedThreadPool(8);
        // 对整数范围从 1 到 4 进行遍历
        IntStream.range(1, 5).forEach(i ->
                // 提交任务给线程池,每个任务对 accumulator 进行累加
                executor.submit(() -> accumulator.accumulate(i)));

        // 休眠 2000 毫秒,等待所有提交的任务执行完成
        Thread.sleep(2000);

        // 获取当前累加结果并重置累加器
        System.out.println("累加结果:"+accumulator.getThenReset());

    }
}
Last modification:April 10, 2024
如果觉得我的文章对你有用,请随意赞赏