本人是工作7年的老程序员,在头条分享我对Java运用和源码、各种框架运用和源码的认识和理解,如果对您有所帮助,请持续关注。
声明:所有的文章都是自己工作之余一个字一个字码上去的,希望对学习Java的同学有所帮助,如果有理解不到位的地方,欢迎交流。
上几篇文章,我对ConcurrentHashMap做了比较详细的讲解,还有最后一个容易被忽略的知识点,那就是ConcurrentHashMap是怎样获取元素数量的。刚学习Java的同学很是疑惑,这有什么好说的,不就是调用集合的size()方法不就可以了吗?但是请大家思考这样几个问题。
第一个问题:ConcurrentHashMap是线程安全的,使用关键字volatile+CAS+synchronized(锁分离)保证线程安全的,为了提高并发的性能,它不会锁住整个底层数组,而是如果有冲突时,只锁住有冲突的下标,如果我们求size的时候,如果要线程安全,岂不是需要锁住整个数组吗?
第二个问题:在JDK并发中不是为我们提供了原子计数的工具类AtomicLong吗?ConcurrentHashMap使用这个不就可以了吗?
如果大家知道了上面的问题,在结合ConcurrentHashMap的源码,想必有的人更疑惑了,本篇文章我就是通过讲解LongAdder,来解决大家的疑惑,看看Doug Lea大神是如何设计的。本篇文章较长,是我一个字一个字敲的,请耐心看完,也是Map集合系列最后一篇文章,接下来我会开始对Set集合进行详细的分析,请持续关注。
本篇文章的主要内容如下:
1:简单分析一下AtomicLong的原理
2:通过AtomicLong引出LongAdder
3:从源码角度全面解析LongAdder
4:看看ConcurrentHashMap中通过addCount怎样计数的
这个类相信大家都非常的熟悉,下面总结以下它是怎样保证线程安全的去更改数据的
1:利用关键字volatile的内存语义
1.1:任何对volatile变量的写,都会立刻从工作内存刷新到主存中。
1.2:任何对volatile变量的读,都会从主存中获取最新的到工作内存。
2:利用CAS机制(比较并交换),无阻塞、自旋的更新数据,直到更新成功。
在AtomicLong中定义了一个被volatile修饰的全局变量,代码如下:
private volatile long value;
所以只要线程对value进行更改,其他线程立刻就会可见。接下来我们看看其中的几个重要的方法:
//value+1,然后返回+1前的值
public final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}
//value+delta,然后返回+delta前的值
public final long getAndAdd(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta);
}
//value+1,然后返回+1后的值
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
上面几个方法,在多线程下是安全的,它是怎样保证线程安全的呢?就是利用CAS机制。
我们接着进入Unsafe中的getAndAddLong方法一探究竟。
public final long getAndAddLong(Object var1, long var2, long var4) {
long var6;
do {
var6 = this.getLongVolatile(var1, var2);
} while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
return var6;
}
从上面的源码可以看出,上面的getAndIncrement()/getAndAdd(long delta)/incrementAndGet()方法都是通过自旋,利用CAS更改value的,直到更改成功才返回,更改失败无限自旋。流程图如下:

Unsafe中getAndAddLong()的流程图
对于Atomic包中的类我会单独有一个专题,详细的分析每一个Atomic类,上面只是简单的介绍以下,大家看到上面的实现,会有如下的思考吗?
在并发量不太高的情况下,自旋次数很少就会更新成功,但是如果在大并发的情况下,都去更新value,是不是失败的次数直线上升?更新value失败就会无限的去尝试再次更新,自旋次数可以想象有多高?那么在这种情境下,AtomicLong似乎不太适合了,如果加锁,性能又受到影响,我们怎么办呢?
既然上面我们已经知道了,AtomicLong适合在低并发情况下使用,在高并发下由于自旋次数会直线上升,那么在高并发情况下用什么类来替代它呢?基于这种情况,LongAdder就应运而生了,它就是在高并发下来替代AtomicLong来进行计数的。首先我们看一下LongAdder的继承关系。

LongAdder的继承关系
通过上面的继承关系,可以看出LongAdder继承Striped64,这个父类实现了核心内容,除了LongAdder外,继承Striped64的类还有如下:
1:public class LongAccumulator extends Striped64
2:public class DoubleAdder extends Striped64
3:public class DoubleAccumulator extends Striped64
上面这4个类有什么区别吗?
1:LongAdder主要是在原来的值的基础上+1,或者+x(调用者指定的x)
2:LongAccumulator主要是在原来值的基础上,加上你自己自定义的公式,如每次在原来基础上乘上5,然后+2,所以5*原来的值+2.从这可以看出LongAdder是它的一个特例,LongAdder只能在原来基础上加上一个指定的值,不能自己设定函数,而LongAccumulator可以自定义函数。
3:DoubleAdder和DoubleAccumulator主要是针对double类型的数据。
从他们的构造函数可以看一下:
//LongAdder
public LongAdder() {
}
-------------------------------------------
//LongAccumulator
//1:LongBinaryOperator:是一个功能接口,定义往下看。
public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
@FunctionalInterface
public interface LongBinaryOperator {
//实现这个接口的类,只要实现这个方法就可以了
long applyAsLong(long left, long right);
}
从构造函数上就可以看出LongAccumulator的构造函数中传递一个功能接口,我们可以按照我们的想法去实现这个功能接口。而LongAdder则没有提供这个功能,所以只能在原来值的基础上增加或者减少一个指定的值。DoubleAdder和DoubleAccumulate也是同样的道理。接下来我们已LongAdder为例开始分析它是怎样在高并发下保证安全并且比AtomicLong性能高的。
上面我们已经分析了LongAdder的构造函数,它只有一个无参构造,我接下来说以下两个非常重要的成员变量,它们都在父类Stripe64中。
第一个重要的成员变量:
/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells;
Cell数组,大小必须是2的n次方幂。主要用于并发时更新,而Cell的定义如下:
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
第二个重要的成员变量:
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base;
看上面的英文注释非常的重要,翻译成中文就是:它是一个基础值,主要用于没有竞争的情况,还有就是初始化Cell数组时,用CAS更新。所以base主要用于两种情况下:
第一个情况:当没有竞争(也就是没有并发的情况下)时,CAS更新base.
第二个情况:当初始化Cell数组时,CAS更新base
通过上面两个成员变量可以总结如下:
1:base是一个基值,当没有并发或者初始化Cell数组时CAS更新。
2:如果有竞争(也就是出现并发的情况),则更新Cell数组来实现,Cell数组的更新机制是锁分段机制,竞争更新时不至于锁住整个数组,所以提高并发的性能。
3:最终的计算总数=base+Cell数组的所有元素数量。
通过上面的总结,大家是不是理解了LongAdder的能够在高并发下提供高性能的机制,那就是和ConcurrentHashMap类似用锁分离技术实现高性能。不至于像AtomicLong一样出现的过多的失败自旋。
第三个重要的成员变量:
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
transient volatile int cellsBusy;
上面的注释中文意思:当初始化Cell数组或者创建Cell时作为一个锁。
3个重要的属性理解完成以后,我们接下来看看重要的方法。
//在原来值的基础上+1
public void increment() {
add(1L);
}
//在原来值的基础上-1
public void decrement() {
add(-1L);
}
两个方法都是调用的add方法,我们接下来进入add方法
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
//$if1
if ((as = cells) != null || !casBase(b = base, b + x)) {
//走到这一步:说明cells不等于null,或者有竞争了,因为caseBase失败了。
//uncontended:表示的是否有并发,true:表示有并发,false:表示没有并发。
boolean uncontended = true;
//$if2
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
//走到这一步会调用longAccumulate方法:有以下几个条件:
1:要么Cell数组还没有初始化
2:要么计算的数组下标还没有值
3:要么CAS更新失败。
longAccumulate(x, null, uncontended);
}
}
$if1有两个条件,只要满足一个就能进入if语句,两个条件解释如下:
1:(as=cells)!=null:说明以前某个点已经有竞争了,Cell数组已经初始化了。
2:!casBase(b=base,b+x):能够执行到这个条件,说cells==null,Cell数组还没有初始化,只需利用CAS修改base进行计数就可以了,如果CAS执行成功,说明计数成功,代码逻辑结束,如果CAS执行失败,说明第一次出现竞争的情况,要进入if语句
$if2有4个条件,只要满足一个就能进入if语句,调用longAccumulate方法。4个条件解释如下:
1:as==null:说明Cell数组还没有初始化
2:(m=as.length-1):说明Cell数组长度0
1和2:说明Cell数组还未初始化成功。
3:(a=as[getProbe()&m])==null:说明当前线程计算的hash的下标还没有值,所以需要调用longAccumulate方法创建Cell。
4:!(uncontended = a.cas(v = a.value, v + x)):尝试利用CAS对cells[threadLocalRandomProbe%cells.length]位置的Cell对象的value进行累加,如果CAS执行成功,则代码逻辑结束,如果CAS执行失败,则需要调用longAccumulage方法重新计算一个hash值
从上面两个if语句,可以总结出什么时候可以调用longAccumulagte()方法:
Case1:Cell数组还没有初始化,并且CAS执行修改base值失败。这个时候需要调用longAccumulate初始化Cell数组。
Case2:Cell数组已经被初始化了,但是当前线程hash计算的下标对应的值为null,这个时候需要调用longAccumulate创建Cell对象放到此下标中。
Case3:Cell数组已经被初始化了,并且当前线程hash计算的下标已经有值了,但是通过CAS进行对Cell对象的value修改时,出现了竞争,执行失败了。这个时候需要调用longAccumulate重新计算hash的下标。
从上面的源码也可以证明我的总结,如果Cell数组为null,则利用CAS更新base就可以,如果更新成功,则结束,如果更新失败,则说明有并发,需要更新Cell数组了。那我们接下来继续进入longAccumulate方法,这个方法在父类Stripe64中,是一个核心的方法。
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
//这一段if语句,你就理解成当前线程计算出hash值。
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
//$1:此时Cell数组不为null,需要更新数组
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
//$2: 当前线程获取了锁cellsBusy,进行对Cell数组初始化。
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
//$3: 说明此时有一个线程正在初始化Cell数组,当前线程的计数通过CAS去更新base.
}
}
这个条件语句证明当Cell数组已经被初始化,则通过更新Cell数组进行计数,如果一个线程正在初始化Cell数组,则通过CAS更新base计数。

longAccumulate的简化流程图
$1:如果Cell数组不为null,它是怎样计数的
如果Cell数组不为null,则此时是通过锁分离的机制修改Cell来进行计数的,源码如下:
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
//走到这一步:说明满足Case2:Cell数组已经初始化,但是对应下标值为null
//....代码省略
}
else if (!wasUncontended) // CAS already known to fail
//走到这一步:说明满足Case3:Cell数组已经初始化,但是对应下标值不为null,但是CAS累加时失败,需要重新计算hash的下标。
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
//走到这一步:说明通过重新计算当前线程hash下标,再次累加成功。
//1:LongAddr的fn=null,所以会执行v+x,所以只能做加减操作。
//2:LongAccumulate的fn!=null,说明执行我们实现的fn。
break;
else if (n >= NCPU || cells != as)
//走到这一步:说明如果Cell数组的长度超过CPU的核数,则不再进行扩容了。
//collide:表示扩容标识,如果false则不再进行扩容
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
//走到这一步:说明获取了锁,然后对Cell数组进行扩容。扩大为原来的2倍。
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
$1中有是一个非常复杂的条件语句,总结如下:
1:Cell数组已经初始化,但是对应下标值为null,创建新的Cell对象。
2:Cell对象累加失败,重新计算hash的下标,然后在继续CAS累加。
3:判断需要是否扩容。
$2:当前线程获取了锁,然后对Cell进行初始化,只能有一个线程进行初始化:只有子类中的条件满足Case1:(as=cells)==null时才进入这个语句
//cellsBusy是一个锁,casCellsBusy()是通过CAS获取锁,获取锁的线程开始对Cell数组进行初始化。
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
//走到这一步:说明当前线程获取了锁,可以初始化Cell数组了
//init:表示是否已经初始化完成了
boolean init = false;
try { // Initialize table
if (cells == as) {
//初始化数组长度2,并把指定的x值放到数组的一个下标下。
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
//走到这一步:释放锁
cellsBusy = 0;
}
if (init)
//走到这一步:说明初始化完成,跳出无限循环
break;
}
这一段代码非常的简单,就是成功获取锁的线程,进行对Cell数组初始化,获取锁失败的线程继续向下执行代码逻辑。
$3:当一个线程正在初始化时,其他线程通过CAS更新base进行计数:只有子类中的条件满足Case1:as.length-1<0时才进入这个语句。说明有一个线程正在初始化,但是还没有初始化完成。
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
当$2获取锁失败的线程,就会执行$3通过CAS更新base来进行计数。
上面我讲解了LongAdder的计数,它适应于并发量高的计数,那么怎样获取总数呢?
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
上面的代码是不是很容易理解了,总量就是base+Cell数组累加。但是要有这样一个概念,通过这计算的可能不太精准,只是一个大概的数字,如果要结果需要非常的精准,那么这个LongAdder就不太适合了。
通过对LongAdder的详细讲解,在回过头来看ConcurrentHashMap的计数,就非常的容易了。我这里仅仅退出代码:
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
}
-------------------------------------
fullAddCount方法和Stripe64中的longAccumulate是一样的。
这篇文章结束后,我对Map集合的文章就告一段落了,希望对你有所帮助,接下来我继续Set
在ConcurrentHashMap中如果获取元素的总长度,应该调用mappingCount方法,解释如下:
/**
* Returns the number of mappings. This method should be used
* instead of {@link #size} because a ConcurrentHashMap may
* contain more mappings than can be represented as an int. The
* value returned is an estimate; the actual count may differ if
* there are concurrent insertions or removals.
*
* @return the number of mappings
* @since 1.8
*/
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
上面的注释中文意思:这个方法应该代替size方法,因为ConcurrentHashMap也许包含比整形更多的元素,返回值是一个估计值,如果存在并发插入或者删除,则可能与实际计数有所不同。所以通过size或者mappingCount计算出的值并不一定准确,但是在实际应用中,很少会查询所有元素的数量。这个知识点大家要理解。
| 留言与评论(共有 0 条评论) |