LongAdder类源码剖析

LongAdder类源码剖析

为了记录的方便主要参考了死磕LongAdder源码分析-彤哥读源码LongAdder源码分析

一、LongAdder类的简介

image-20231115164641020
当多个线程更新用于收集统计信息等目的而不是用于细粒度同步控制的公共总和时,此类通常比AtomicLong更可取。在低更新争用的情况下,这两个类具有相似的特征。但在高竞争情况下,此类的预期吞吐量明显更高,但代价是空间消耗更高。此类扩展了Number,但没有定义equals、hashCode和compareTo等方法,因为实例预计会发生变化,因此不能用作集合键。

image-20231115165038291

二、LongAdder类的原理

LongAdder的原理是,在最初无竞争时,只更新base的值,当有多线程竞争时通过分段的思想,让不同的线程更新不同的段,最后把这些段相加就得到了完整的LongAdder存储的值。如下图,把一个Long型拆成一个base变量外加多个Cell,每个Cell包装一个Long型变量。当多个线程并发累加的时:

  • 如果并发度低,就直接加到base变量上
  • 如果并发度高,冲突频繁,就平摊到这些Cell上

最后取值时,再把base和这些Cell求sum运算。

img

三、Striped64类的解析

下面的内容来自源码注释,第一手资料!

LongAdder继承自Striped64抽象类,Striped64类中定义了Cell内部类和各重要属性。此类维护一个延迟初始化的原子更新变量表即cells数组,以及一个额外的base字段,表的大小是2的幂次,索引使用屏蔽的每线程哈希码。此类中几乎所有声明都是包私有的,可以由子类直接访问,表条目Cell类即AtomicLong填充的变体(通过 @sun.misc.Contished)以减少缓存争用。对于大多数原子来说,填充是多余的,因为它们通常不规则地分散在内存中,因此不会互相干扰。但是驻留在数组中的原子对象往往会彼此相邻放置,因此在没有这种预防措施的情况下,通常会共享缓存行即伪共享(会对性能产生巨大的负面影响)。Cell类相对较大,我们避免在需要它们之前创建它们。当没有争用时,所有更新都对base字段进行;第一次争用时(base进行CAS更新时失败),表将初始化为大小2;在进一步争用时,表大小将加倍,直到达到大于或等于CPUS数量的最接近的2的幂。表槽保持为空 (null),直到需要它们为止。单个自旋锁(“cellsBusy”)用于初始化表和调整表大小,以及用新的Cell填充槽。当锁不可用时,线程尝试其他槽(或base字段)。通过ThreadLocalRandom维护的线程探测字段用作每线程哈希码。我们让它们保持未初始化时为零(如果它们以这种方式出现),直到它们在槽0上竞争,然后它们被初始化为通常不会与其他值冲突的值。执行更新操作时,失败的CAS会指示争用和/或表冲突。发生冲突时,如果表大小小于容量,则大小会加倍,除非其他线程持有锁。如果哈希槽为空,并且锁可用,则会创建一个新的Cell。否则,如果槽存在,则尝试CAS。通过双重散列进行重试,使用辅助散列 (Marsaglia XorShift) 来尝试找到空闲插槽。==表的大小是有上限的,因为当线程数多于CPUs时,假设每个线程都绑定到一个CPU,就会存在一个完美的哈希函数将线程映射到槽,从而消除冲突。当我们达到容量时,我们通过随机改变冲突线程的哈希码来搜索该映射。由于搜索是随机的,并且冲突只能通过CAS故障得知,因此收敛速度可能很慢,而且由于线程通常不会永远绑定到CPUS,因此可能根本不会发生。然而,尽管有这些限制,在这些情况下观察到的争用率通常很低。==

主要内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// AtomicLong的填充变体仅支持原始访问和CAS
@sun.misc.Contended static final class Cell {
// 存储元素的值,使用volatile修饰保证可见性
volatile long value;
Cell(long x) { value = x; }
// CAS更新value的值
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe实例
private static final sun.misc.Unsafe UNSAFE;
// value字段的偏移量
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}

CPU缓存行能带来免费加载数据的好处,所以处理数组性能非常高,但同时CPU缓存行也带来了弊端,多线程处理不相干的变量时会相互影响,也就是伪共享;Cell数组中的元素依次顺序排列,存在伪共享的问题,避免伪共享的主要思路就是让不相干的变量不要出现在同一个缓存行中,可以使用Java8提供的注解@sun.misc.Contended

主要属性

1
2
3
4
5
6
7
// cells数组,存储各个段的值
transient volatile Cell[] cells;
// 最初无竞争时使用的,也算一个特殊的段
transient volatile long base;
// 标记当前是否有线程在创建或扩容cells,或者在创建Cell
// 通过CAS更新该值,相当于是一个锁
transient volatile int cellsBusy;

四、LongAdder类的方法

image-20231115205227356

下面我们看一下LongAdder类的核心方法add

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// 判断cells是否还没被初始化,并且尝试对base值进行cas操作
if ((as = cells) != null || !casBase(b = base, b + x)) {
// 如果cells已经初始化或cas操作失败,则运行if内部语句
boolean uncontended = true;
// 1.cell[]数组是否初始化
// 2.cell[]数组虽然初始化了但是数组长度是否为0,应该不会出现
// 3.该线程所对应cell槽位是否为null
// 4.尝试对该线程对应的cell槽位进行cas更新
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

总结:开始竞争不激烈的时候直接CAS更新base变量,随着竞争的增加CAS可能失败,随即会创建Cell数组,后续的请求线程会根据线程的threadLocalRandomProbe值计算哈希槽位,如果该槽位为空则创建新Cell,否则对该Cell槽位进行CAS更新,一旦更新失败可能就要扩容了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// 如果getProbe()方法返回0,说明随机数未初始化
if ((h = getProbe()) == 0) {
// 强制初始化
ThreadLocalRandom.current();
// 重新获取probe值
h = getProbe();
// 都未初始化,肯定还不存在竞争激烈
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
// 情况1:当前线程对应的Cell槽位未初始化
if ((a = as[(n - 1) & h]) == null) {
// 当前无其它线程在创建或扩容cells,也没有线程在创建Cell
if (cellsBusy == 0) { // Try to attach new Cell
// 新建一个Cell,值为当前需要增加的值
Cell r = new Cell(x); // Optimistically create
// 再次检测cellsBusy,并尝试CAS加锁
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock(rs[j = (m - 1) & h] == null)
Cell[] rs; int m, j;
// 这里一定要重新获取cells,因为as并不在锁定范围内,有可能已经扩容了
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 创建成功了就返回,否则继续自旋尝试
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// 重新尝试
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 情况2:再次尝试CAS更新当前线程所在Cell的值
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 如果cells数组的长度达到了CPU核心数,或者cells数组扩容了,则设置collide为false保证能够跳过扩容逻辑
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 这里都是代码技巧,归根结底是为了保证后续再次CAS失败后扩容的正常运行
else if (!collide)
collide = true;
// 情况3:明确出现冲突了,尝试占有锁,并扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 检查是否有其它线程已经扩容过了
if (cells == as) { // Expand table unless stale
// 新数组为原数组的两倍
Cell[] rs = new Cell[n << 1];
// 把旧数组元素拷贝到新数组中
for (int i = 0; i < n; ++i)
rs[i] = as[i];
// 重新赋值cells为新数组
cells = rs;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 已解决冲突
collide = false;
// 使用扩容后的新数组重新尝试
continue;
}
// 更新失败或者达到了CPU核心数,重新生成probe,并重试
h = advanceProbe(h);
}
// 情况4:需要加锁初始化Cell数组,并且检查Cell数组之前没有初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try {
// 双重检查,确保Cell数组不会重复初始化
if (cells == as) {
// 默认Cell数组初始容量为2
Cell[] rs = new Cell[2];
// 初始化对应槽位的cell实例
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 初始化成功就可以直接退出,否则表明Cell数组并发初始化失败,重试
if (init)
break;
}
// 情况5:锁被占用,其他线程在尝试初始化Cell数组,则直接CAS更新base变量
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

longAccumulate方法可以看作是LongAdder类底层使用的最重要也是最难的方法,基本上可以分成5种情况:

  • 情况1:线程待处理的Cell数组槽位为空,需要创建并初始化新Cell
  • 情况2:线程CAS更新相应的Cell槽位失败,判断是否需要扩容,再次重试
  • 情况3:线程多次CAS尝试更新对应的Cell槽位均失败,表明竞争过于激烈,需要进行Cell数组扩容
  • 情况4:线程开始CAS更新base变量失败后发现Cell数组还未初始化,于是初始化容量为2的Cell数组
  • 情况5:cellsBusy锁被占用而无法初始化,表明其他线程在尝试初始化Cell数组,于是CAS更新base变量,这无疑是一个兜底方案!

image-20240206101314867

image-20240206104650743

总结来说,LongAdder实例在开始竞争不激烈时通过CAS更新base值就能满足要求,当竞争非常激烈时,单纯靠base遍历维持这么大的并发度已经不切实际了,因此Cell数组就要闪亮登场了。刚开始Cell数组还没有初始化,这时我们通过CAS加锁配合双重检查完成Cell数组的初始化(对应的Cell槽位也初始化完成)。当然,Cell数组初始化后下一次的add操作通过线程安全的随机值散列找到对应的Cell槽位可能为空,这时还是通过CAS加锁配合双重检查完成Cell槽位的初始化。注意,Cell数组一旦初始化后我们就不再更新base了,而是更新Cell数组中的对应Cell实例,我们也知道并发的CAS更新同一个Cell实例可能会失败,此时我们还有尝试机会(下次尝试的Cell实例可能会变化,因为线程安全的随机值通过advanceProbe方法更新了),如果再次尝试失败表明冲突过于频繁,我们就会进行Cell数组的扩容操作来减少冲突(Cell数组的大小超过CPU核心数后就不会再扩容了)。

那么如何获取LongAdder当前的累积总和呢,其实就是通过sum方法获取最终一致性的结果(并非强一致性,因为没有加锁控制这一过程),因此,适合高并发的统计场景,而不适合要对某个Long型变量进行严格同步的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public long sum() {
Cell[] as = cells; Cell a;
// sum初始等于base
long sum = base;
// 如果cells不为空
if (as != null) {
// 遍历所有的Cell
for (int i = 0; i < as.length; ++i) {
// 如果所在的Cell不为空,就把它的value累加到sum中
if ((a = as[i]) != null)
sum += a.value;
}
}
// 返回sum
return sum;
}

LongAdder减少冲突的方法以及在求和场景下比AtomicLong更高效。因为LongAdder在更新数值时并非对一个数进行更新,而是分散到多个Cell,这样在多线程的情况下可以有效的减少冲突和压力,使得更加高效。

有趣的是Cell数组最多扩容到大于或等于NCPU的最小二次幂,因为同一时刻最后只有NCPU个线程同时运行并操作Cell数组,而每个线程会根据自己的threadLocalRandomProbe哈希值找到对应的Cell槽位,如果发生冲突即CAS失败会重新计算线程的该probe值,因此哈希分布随着时间会逐渐均匀,Cells数组的长度并不需要多长,达到CPU核心数即可。