日期:
来源:郭霖收集编辑:小虾米君
https://juejin.cn/post/7209578803159924796
比较:读取到了一个值 A,在将其更新为 B 之前,检查原值是否仍为 A 替换 / 设置:YES 则将 A 更新为 B,结束;反之,重复上述操作直到成功为止
// java.util.concurrent.atomic.AtomicInteger
public class AtomicInteger extends Number implements java.io.Serializable {
private static final Unsafe unsafe = Unsafe.getUnsafe();
...
}
// jdk.internal.misc.Unsafe
public final class Unsafe {
...
private Unsafe() {}
private static final Unsafe theUnsafe = new Unsafe();
@CallerSensitive
public static Unsafe getUnsafe() {
...
return theUnsafe;
}
}
// jdk.internal.misc.Unsafe
public final class Unsafe {
...
@CallerSensitive
public static Unsafe getUnsafe() {
Class<?> caller = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(caller.getClassLoader()))
throw new SecurityException("Unsafe");
return theUnsafe;
}
}
// sun.misc.VM
public class VM {
...
public static boolean isSystemDomainLoader(ClassLoader loader) {
return loader == null;
}
}
class MyTrustedClass {
private static final Unsafe unsafe = Unsafe.getUnsafe();
...
private long myCountAddress = ...;
public int getCount() { return unsafe.getByte(myCountAddress); }
}
value 使用 volatile 修饰,默认为 0,反之为 AtomicInteger 构造时指定的初始值 initialValue valueOffset 是存放 value 的内存相对地址:在 static 块中通过 Unsafe 的 objectFieldOffset() 传入的 value 的 Field 字段得到。
// java.util.concurrent.atomic.AtomicInteger
public class AtomicInteger extends Number implements java.io.Serializable {
...
private volatile int value;
public AtomicInteger(int initialValue) {
value = initialValue;
}
public AtomicInteger() { }
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
}
// jdk.internal.misc.Unsafe
public final class Unsafe {
...
public native long objectFieldOffset(Field f);
}
// unsafe.cpp
UNSAFE_ENTRY(jlong, Unsafe_ObjectFieldOffset(JNIEnv *env, jobject unsafe, jobject field))
UnsafeWrapper("Unsafe_ObjectFieldOffset");
return find_field_offset(field, 0, THREAD);
UNSAFE_END
jint find_field_offset(jobject field, int must_be_static, TRAPS) {
if (field == NULL) {
THROW_0(vmSymbols::java_lang_NullPointerException());
}
oop reflected = JNIHandles::resolve_non_null(field);
oop mirror = java_lang_reflect_Field::clazz(reflected);
Klass* k = java_lang_Class::as_Klass(mirror);
int slot = java_lang_reflect_Field::slot(reflected);
int modifiers = java_lang_reflect_Field::modifiers(reflected);
if (must_be_static >= 0) {
int really_is_static = ((modifiers & JVM_ACC_STATIC) != 0);
if (must_be_static != really_is_static) {
THROW_0(vmSymbols::java_lang_IllegalArgumentException());
}
}
int offset = InstanceKlass::cast(k)->field_offset(slot);
return field_offset_from_byte_offset(offset);
}
// instanceKlass.h
class InstanceKlass: public Klass {
...
public:
int field_offset (int index) const { return field(index)->offset(); }
// Casting from Klass*
static InstanceKlass* cast(Klass* k) {
assert(k == NULL || k->is_klass(), "must be");
assert(k == NULL || k->oop_is_instance(), "cast to InstanceKlass");
return (InstanceKlass*) k;
}
}
返回 false 表示当前的值并非期待值,无法完成更新 返回 true 表示更新成功
// java.util.concurrent.atomic.AtomicInteger
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long valueOffset;
...
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
}
// jdk.internal.misc.Unsafe
public final class Unsafe {
...
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
}
先通过 JNIHandles 找到 AtomicInteger 实例的地址 oop 将该 oop 和 value 的相对地址作为参数调用 index_oop_from_field_offset_long() 并得到 value 的内存绝对地址 aadr 将更新值、addr 和期待值作为参数调用 cmpxchg() 继续
// unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
inline void* index_oop_from_field_offset_long(oop p, jlong field_offset) {
jlong byte_offset = field_offset_to_byte_offset(field_offset);
#ifdef ASSERT
if (p != NULL) {
assert(byte_offset >= 0 && byte_offset <= (jlong)MAX_OBJECT_SIZE, "sane offset");
if (byte_offset == (jint)byte_offset) {
void* ptr_plus_disp = (address)p + byte_offset;
assert((void*)p->obj_field_addr<oop>((jint)byte_offset) == ptr_plus_disp,
"raw [ptr+disp] must be consistent with oop::field_base");
}
jlong p_size = HeapWordSize * (jlong)(p->size());
assert(byte_offset < p_size, err_msg("Unsafe access: offset " INT64_FORMAT " > object's size " INT64_FORMAT, byte_offset, p_size));
}
#endif
if (sizeof(char*) == sizeof(jint)) // (this constant folds!)
return (address)p + (jint) byte_offset;
else
return (address)p + byte_offset;
}
首先计算得到 value 地址的当前数值,即 cur_as_bytes[offset] 假使当前数值等于期待值 compare_value: 调用 cmpxchg CPU 指令将 dest_int 地址对应的数值执行期待值为 cur,更新值为 new_val 的 CAS 操作 该操作返回的是期待值的话,表示更新成功,跳过循环并返回期待值。Unsafe_CompareAndSwapInt() 将收到匹配期待值的结果,并向 Java 侧返回 true 反之,将 cur 更新为 CPU 指令返回的当前值,再继续下一次循环,直到成功为止 反之即不等于期待值,直接返回 Unsafe_CompareAndSwapInt() 将收到不匹配期待值的结果,并向 Java 侧返回 false
// atomic.cpp
jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) {
uintptr_t dest_addr = (uintptr_t)dest;
uintptr_t offset = dest_addr % sizeof(jint);
volatile jint* dest_int = (volatile jint*)(dest_addr - offset);
jint cur = *dest_int;
jbyte* cur_as_bytes = (jbyte*)(&cur);
jint new_val = cur;
jbyte* new_val_as_bytes = (jbyte*)(&new_val);
new_val_as_bytes[offset] = exchange_value;
while (cur_as_bytes[offset] == compare_value) {
jint res = cmpxchg(new_val, dest_int, cur);
if (res == cur) break;
cur = res;
new_val = cur;
new_val_as_bytes[offset] = exchange_value;
}
return cur_as_bytes[offset];
}
// android/libcore/ojluni/src/main/java/java/
// util/concurrent/atomic/AtomicInteger.java
public class AtomicInteger extends Number implements java.io.Serializable {
...
public final boolean compareAndSet(int expectedValue, int newValue) {
// Android-changed: Using VarHandle instead of Unsafe
// return U.compareAndSetInt(this, VALUE, expectedValue, newValue);
return VALUE.compareAndSet(this, expectedValue, newValue);
}
}
// java.util.concurrent.atomic.AtomicInteger
public class AtomicInteger extends Number implements java.io.Serializable {
...
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
}
// jdk.internal.misc.Unsafe
public final class Unsafe {
...
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
}
循环时间长开销大:CAS 大量失败后长时间占用 CPU 资源,加大了系统性能开销 只能保证一个共享变量的原子操作:当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性 ABA 问题:CAS 机制本质上依赖值有没有发生变化的操作条件。但是如果值原来是 A、被改成变成了 B、最后又变回了 A,那么使用 CAS 进行检查时会发现它的值没有发生变化进行了操作,但是实际上却变化了,这其实违背了约定的条件。
AtomicInteger 通过静态方法 getUnsafe() 获得 Unsafe 实例 接着通过 Unsafe 实例的 native 方法 objectFieldOffset() 传入使用 volatie 修饰的数值 value 的 Field 字段得到其内存相对地址 关键的 compareAndSet() 亦由 Unsafe 调度,即 native 方法 compareAndSwapInt()。主要是传入 AtomicInteger 实例,value 内存相对地址,期待值以及更新值。 native 的实现由 atomic.cpp 的 cmpxchg() 完成,当前 value 内存地址返回的值匹配期待值的话,执行更新操作;反之,直接返回 false。 执行更新的操作采用 cmpxchg CPU 指令,如果得到的值不符合期待值的话,更新期待值继续下一次循环,直到匹配为止。