LongAdder实现原理
Java并发工具类- LongAdder的实现原理过程 - 知乎 (zhihu.com)
(4 封私信 / 80 条消息) 2020-10-22:谈谈java中LongAdder和LongAccumulator相同点和不同点? - 知乎 (zhihu.com)
Longadder的功能和atomicLong实际的作用其实是一样的,都是一个线程安全的元子类。
其区别在于,AtomicLong的value是一个。而LongAdder将value分成了多个,分散了线程竞争,将本来对一个变量的多线程竞争,分散到多个变量上。
AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
组成
先来看看LongAdder类中的重要属性
继承了
public class LongAdder extends Striped64 implements Serializable
Striped64是什么?代表着64字节,即正好是一个CPU缓存行的大小。通常对intel来说,CPU读取内存的数据,会一次性读取64个字节,即正好是一个缓存行的大小,并且是一个原子性操作,不会有多个CPU同时读取这64个字节。
Striped64类中的主要属性
Ceel类
Cell类上有一个@Contended的注解,这个注解的意思就是Cell代表着一个64字节的类
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
base变量:当没有线程竞争时,首先会对base进行增加。
Cells数组:当检测有base变量在被线程占有时,就会尝试去cell数组的变量加。
总和为base变量+cells数组中的value
/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base;
ADD具体过程
add方法
/**
* Adds the given value.
*
* @param x the value to add
*/
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null //当cells!=null,说明可能,那么就尝试对base变量的CAS
|| !casBase(b = base, b + x)) {//如果CAS base变量失败 就进入下面代码
boolean uncontended = true; //表示没有竞争
if (as == null || (m = as.length - 1) < 0 ||//情况1 cell数组中没有初始化
(a = as[getProbe() & m]) == null || // 情况2 cell数组中的cell对象没有初始化
!(uncontended = a.cas(v = a.value, v + x))) //情况3 CAS Cell对应的value值
//综上,满足上述三种情况之一,就会进入这个方法,方法详情见后续
longAccumulate(x, null, uncontended);
}
}
我longAccumlate方法
主要思想:不要让主线程在一个竞争店进行循环,让它去别的竞争点尝试
如果抢占当前cell失败,那么尝试抢占Base变量;
如果抢占Base变量失败,那么尝试从Cell数组中换一个Cell来抢占
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
//对应情况二:cell数组不为空,但是数组中的cell对象为null
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // 如果到了这里,可以理解为有线程在竞争
wasUncontended = true; // 那么将wasUncotended设置为true说明有竞争
else if (a.cas(v = a.value, ((fn == null) ? v + x : //选中cell对象不为空,对cell的value进行cas
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as) //将cell对象的value值CAS失败,就进入这里。如果数量大于CPU核心数,就没有必要进行扩容了或者
collide = false; // 将collide设置为false;再下一次循环时,就会进入下一行代码。
else if (!collide)
collide = true; //将collide设置为true,下一次循环跳入到扩容代码
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // 判断cell是否被其他线程改动
Cell[] rs = new Cell[n << 1]; //对Cell进行扩容
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h); //换一个随机数 即换一个对象
}
//对应第一种情况 Cells数组为空需要初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//如果以上俩个都不满足,则进入尝试对base的add操作,因为可能之前操作base的线程已经操作完成了,所以可以尝试,不要让线程只等待在无意义的自旋上
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
longAccumulate
LongAccumulator是LongAdder的增强版。LongAdder只能针对数值的进行加减运算,而LongAccumulator提供了自定义的函数操作。其构造函数如下:
public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
通过LongBinaryOperator,可以自定义对入参的任意操作,并返回结果(LongBinaryOperator接收2个long作为参数,并返回1个long)。LongAccumulator内部原理和LongAdder几乎完全一样,都是利用了父类Striped64的longAccumulate方法。
public class LongAccumulatorTest {
public static void main(String[] args) throws InterruptedException {
// 创建一个 LongAccumulator 对象,用于累加 x+y,初始值为 0
LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 0);
// 创建一个固定大小为 8 的线程池
ExecutorService executor = Executors.newFixedThreadPool(8);
// 对整数范围从 1 到 4 进行遍历
IntStream.range(1, 5).forEach(i ->
// 提交任务给线程池,每个任务对 accumulator 进行累加
executor.submit(() -> accumulator.accumulate(i)));
// 休眠 2000 毫秒,等待所有提交的任务执行完成
Thread.sleep(2000);
// 获取当前累加结果并重置累加器
System.out.println("累加结果:"+accumulator.getThenReset());
}
}