今でもあなたは私の光丶

并发编程(3)同步工具类&Atomic类

同步工具类

Semaphore

Semaphore也就是信号量,提供了资源数量的并发访问控制,其使用代码很简单,如下所示:

// 一开始有5份共享资源。第二个参数表示是否是公平
Semaphore myResources = new Semaphore(5, true);
// 工作线程每获取一份资源,就在该对象上记下来
// 在获取的时候是按照公平的方式还是非公平的方式,就要看上一行代码的第二个参数了。
// 一般非公平抢占效率较高。
myResources.acquire();
// 工作线程每归还一份资源,就在该对象上记下来
// 此时资源可以被其他线程使用
myResources.release();
/*
释放指定数目的许可,并将它们归还给信标。
可用许可数加上该指定数目。
如果线程需要获取N个许可,在有N个许可可用之前,该线程阻塞。
如果线程获取了N个许可,还有可用的许可,则依次将这些许可赋予等待获取许可的其他线程。
*/
semaphore.release(2);
/*
从信标获取指定数目的许可。如果可用许可数目不够,则线程阻塞,直到被中断。
该方法效果与循环相同,
for (int i = 0; i < permits; i++) acquire();
只不过该方法是原子操作。
如果可用许可数不够,则当前线程阻塞,直到:(二选一)
1. 如果其他线程释放了许可,并且可用的许可数满足当前线程的请求数字;
2. 其他线程中断了当前线程。
permits – 要获取的许可数
*/
semaphore.acquire(3);

案例:
大学生到自习室抢座,写作业:

package com.lagou.concurrent.demo;
import java.util.Random;
import java.util.concurrent.Semaphore;
public class MyThread extends Thread {
private final Semaphore semaphore;
private final Random random = new Random();
public MyThread(String name, Semaphore semaphore) {
super(name);
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " - 抢座成
功,开始写作业");
Thread.sleep(random.nextInt(1000));
System.out.println(Thread.currentThread().getName() + " - 作业完
成,腾出座位");
} catch (InterruptedException e) {
e.printStackTrace();
}
semaphore.release();
}
}
package com.lagou.concurrent.demo;
import java.util.concurrent.Semaphore;
public class Demo {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 5; i++) {
new MyThread("学生-" + (i + 1), semaphore).start();
}
}
}

如下图所示,假设有n个线程来获取Semaphore里面的10份资源(n > 10),n个线程中只有10个 线程能获取到,其他线程都会阻塞。直到有线程释放了资源,其他线程才能获取到。

当初始的资源个数为1的时候,Semaphore退化为排他锁。正因为如此,Semaphone的实现原理和 锁十分类似,是基于AQS,有公平和非公平之分。Semaphore相关类的继承体系如下图所示:

public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void release() {
sync.releaseShared(1);
}

由于Semaphore和锁的实现原理基本相同,上面的代码不再展开解释。资源总数即state的初始 值,在acquire里对state变量进行CAS减操作,减到0之后,线程阻塞;在release里对state变量进行 CAS加操作。

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
// ...
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// ...
}
public class Semaphore {
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
static final class FairSync extends Sync {
// ...
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available,
remaining))
return remaining;
}
}
}
}
package java.lang.invoke;
public abstract class VarHandle {
// ...
// CAS,原子操作
public final native
@MethodHandle.PolymorphicSignature
@HotSpotIntrinsicCandidate
boolean compareAndSet(Object... args);
// ...
}

CountDownLatch

CountDownLatch使用场景

假设一个主线程要等待5个 Worker 线程执行完才能退出,可以使用CountDownLatch来实现:
线程:

package com.lagou.concurrent.demo;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class MyThread extends Thread {
private final CountDownLatch latch;
private final Random random = new Random();
public MyThread(String name, CountDownLatch latch) {
super(name);
this.latch = latch;
}
@Override
public void run() {
try {
Thread.sleep(random.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "运行结束");
latch.countDown();
}
}

Main类:

package com.lagou.concurrent.demo;
import java.util.concurrent.CountDownLatch;
public class Main {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
new MyThread("线程1", latch).start();
new MyThread("线程2", latch).start();
new MyThread("线程3", latch).start();
new MyThread("线程4", latch).start();
// new MyThread("线程5", latch).start();
// 当前线程等待
latch.await();
System.out.println("程序运行结束");
}
}

下图为CountDownLatch相关类的继承层次,CountDownLatch原理和Semaphore原理类似,同样 是基于AQS,不过没有公平和非公平之分。

await()实现分析

如下所示,await()调用的是AQS 的模板方法,这个方法在前面已经介绍过。 CountDownLatch.Sync重新实现了tryAccuqireShared方法:

public void await() throws InterruptedException {
// AQS的模板方法
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 被CountDownLatch.Sync实现
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

从tryAcquireShared(...)方法的实现来看,只要state != 0,调用await()方法的线程便会被放入AQS 的阻塞队列,进入阻塞状态。

countDown()实现分析

public void countDown() {
sync.releaseShared(1);
}
// AQS的模板方法
public final boolean releaseShared(int arg) {
// 由CountDownLatch.Sync实现
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

countDown()调用的AQS的模板方法releaseShared(),里面的tryReleaseShared(...)由 CountDownLatch.Sync实现。从上面的代码可以看出,只有state=0,tryReleaseShared(...)才会返回 true,然后执行doReleaseShared(...),一次性唤醒队列中所有阻塞的线程。
总结:由于是基于AQS阻塞队列来实现的,所以可以让多个线程都阻塞在state=0条件上,通过 countDown()一直减state,减到0后一次性唤醒所有线程。如下图所示,假设初始总数为M,N个线程 await(),M个线程countDown(),减到0之后,N个线程被唤醒。

CyclicBarrier

CyclicBarrier使用场景

CyclicBarrier使用方式比较简单:

CyclicBarrier barrier = new CyclicBarrier(5);
barrier.await();

该类用于协调多个线程同步执行操作的场合。

使用场景:10个工程师一起来公司应聘,招聘方式分为笔试和面试。首先,要等人到齐后,开始笔 试;笔试结束之后,再一起参加面试。把10个人看作10个线程,10个线程之间的同步过程如下图所示:

Main类:

package com.lagou.concurrent.cyclicbarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Main {
public static void main(String[] args) throws BrokenBarrierException,
InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(5);
for (int i = 0; i < 5; i++) {
new MyThread("线程-" + (i + 1), barrier).start();
}
}
}

MyThread类:

package com.lagou.concurrent.cyclicbarrier;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class MyThread extends Thread {
private final CyclicBarrier barrier;
private final Random random = new Random();
public MyThread(String name, CyclicBarrier barrier) {
super(name);
this.barrier = barrier;
}
@Override
public void run() {
try {
Thread.sleep(random.nextInt(2000));
System.out.println(Thread.currentThread().getName() + " - 已经到
达公司");
barrier.await();
Thread.sleep(random.nextInt(2000));
System.out.println(Thread.currentThread().getName() + " - 已经笔
试结束");
barrier.await();
Thread.sleep(random.nextInt(2000));
System.out.println(Thread.currentThread().getName() + " - 已经面
试结束");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
super.run();
}
}

在整个过程中,有2个同步点:第1个同步点,要等所有应聘者都到达公司,再一起开始笔试;第2 个同步点,要等所有应聘者都结束笔试,之后一起进入面试环节。

CyclicBarrier实现原理

CyclicBarrier基于ReentrantLock+Condition实现。

public class CyclicBarrier {
private final ReentrantLock lock = new ReentrantLock();
// 用于线程之间相互唤醒
private final Condition trip = lock.newCondition();
// 线程总数
private final int parties;
private int count;
private Generation generation = new Generation();
// ...
}

下面详细介绍 CyclicBarrier 的实现原理。先看构造方法:

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
// 参与方数量
this.parties = parties;
this.count = parties;
// 当所有线程被唤醒时,执行barrierCommand表示的Runnable。
this.barrierCommand = barrierAction;
}

接下来看一下await()方法的实现过程。

public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
// 响应中断
if (Thread.interrupted()) {
// 唤醒所有阻塞的线程
breakBarrier();
throw new InterruptedException();
}
// 每个线程调用一次await(),count都要减1
int index = --count;
// 当count减到0的时候,此线程唤醒其他所有线程
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

关于上面的方法,有几点说明:

  1. CyclicBarrier是可以被重用的。以上一节的应聘场景为例,来了10个线程,这10个线程互相等 待,到齐后一起被唤醒,各自执行接下来的逻辑;然后,这10个线程继续互相等待,到齐后再 一起被唤醒。每一轮被称为一个Generation,就是一次同步点。
  2. CyclicBarrier 会响应中断。10 个线程没有到齐,如果有线程收到了中断信号,所有阻塞的线 程也会被唤醒,就是上面的breakBarrier()方法。然后count被重置为初始值(parties),重 新开始。
  3. 上面的回调方法,barrierAction只会被第10个线程执行1次(在唤醒其他9个线程之前),而 不是10个线程每个都执行1次。

Exchanger

使用场景

Exchanger用于线程之间交换数据,其使用代码很简单,是一个exchange(...)方法,使用示例如 下:

package com.lagou.concurrent.demo;
import java.util.Random;
import java.util.concurrent.Exchanger;
public class Main {
private static final Random random = new Random();
public static void main(String[] args) {
// 建一个多线程共用的exchange对象
// 把exchange对象传给3个线程对象。每个线程在自己的run方法中调用exchange,把自
己的数据作为参数
// 传递进去,返回值是另外一个线程调用exchange传进去的参数
Exchanger<String> exchanger = new Exchanger<>();
new Thread("线程1") {
@Override
public void run() {
while (true) {
try {
// 如果没有其他线程调用exchange,线程阻塞,直到有其他线程调
用exchange为止。
String otherData = exchanger.exchange("交换数据1");
System.out.println(Thread.currentThread().getName()
+ "得到<==" + otherData);
Thread.sleep(random.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
new Thread("线程2") {
@Override
public void run() {
while (true) {
try {
String otherData = exchanger.exchange("交换数据2");
System.out.println(Thread.currentThread().getName()
+ "得到<==" + otherData);
Thread.sleep(random.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
new Thread("线程3") {
@Override
public void run() {
while (true) {
try {
String otherData = exchanger.exchange("交换数据3");
System.out.println(Thread.currentThread().getName()
+ "得到<==" + otherData);
Thread.sleep(random.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
}

在上面的例子中,3个线程并发地调用exchange(...),会两两交互数据,如1/2、1/3和2/3。

实现原理

Exchanger的核心机制和Lock一样,也是CAS+park/unpark。
首先,在Exchanger内部,有两个内部类:Participant和Node,代码如下:

public class Exchanger<V> {
// ...
// 添加了Contended注解,表示伪共享与缓存行填充
@jdk.internal.vm.annotation.Contended static final class Node {
int index; // Arena index
int bound; // Last recorded value of Exchanger.bound
int collides; // 本次绑定中,CAS操作失败次数
int hash; // 自旋伪随机
Object item; // 本线程要交换的数据
volatile Object match; // 对方线程交换来的数据
// 当前线程
volatile Thread parked; // 当前线程阻塞的时候设置该属性,不阻塞为null。
}
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}
// ...
}

每个线程在调用exchange(...)方法交换数据的时候,会先创建一个Node对象。
这个Node对象就是对该线程的包装,里面包含了3个重要字段:第一个是该线程要交互的数据,第 二个是对方线程交换来的数据,最后一个是该线程自身。
一个Node只能支持2个线程之间交换数据,要实现多个线程并行地交换数据,需要多个Node,因 此在Exchanger里面定义了Node数组:

exchange(V x)实现分析

明白了大致思路,下面来看exchange(V x)方法的详细实现:

上面方法中,如果arena不是null,表示启用了arena方式交换数据。如果arena不是null,并且线程 被中断,则抛异常
如果arena不是null,并且arenaExchange的返回值为null,则抛异常。对方线程交换来的null值是 封装为NULL_ITEM对象的,而不是null。
如果slotExchange的返回值是null,并且线程被中断,则抛异常。
如果slotExchange的返回值是null,并且areaExchange的返回值是null,则抛异常。

slotExchange的实现:

package java.util.concurrent;
public class Exchanger<V> {
// ...
/**
* 如果不启用arenas,则使用该方法进行线程间数据交换。
*
* @param item 需要交换的数据
* @param timed 是否是计时等待,true表示是计时等待
* @param ns 如果是计时等待,该值表示最大等待的时长。
* @return 对方线程交换来的数据;如果等待超时或线程中断,或者启用了arena,则返回
null。
*/
private final Object slotExchange(Object item, boolean timed, long ns)
{
// participant在初始化的时候设置初始值为new Node()
// 获取本线程要交换的数据节点
Node p = participant.get();
// 获取当前线程
Thread t = Thread.currentThread();
// 如果线程被中断,则返回null。
if (t.isInterrupted())
return null;
for (Node q;;) {
// 如果slot非空,表明有其他线程在等待该线程交换数据
if ((q = slot) != null) {
// CAS操作,将当前线程的slot由slot设置为null
// 如果操作成功,则执行if中的语句
if (SLOT.compareAndSet(this, q, null)) {
// 获取对方线程交换来的数据
Object v = q.item;
// 设置要交换的数据
q.match = item;
// 获取q中阻塞的线程对象
Thread w = q.parked;
if (w != null)
// 如果对方阻塞的线程非空,则唤醒阻塞的线程
LockSupport.unpark(w);
return v;
}
// create arena on contention, but continue until slot null
// 创建arena用于处理多个线程需要交换数据的场合,防止slot冲突
if (NCPU > 1 && bound == 0 &&
BOUND.compareAndSet(this, 0, SEQ)) {
arena = new Node[(FULL + 2) << ASHIFT];
}
}
// 如果arena不是null,需要调用者调用arenaExchange方法接着获取对方线程交
换来的数据
else if (arena != null)
return null;
else {
// 如果slot为null,表示对方没有线程等待该线程交换数据
// 设置要交换的本方数据
p.item = item;
// 设置当前线程要交换的数据到slot
// CAS操作,如果设置失败,则进入下一轮for循环
if (SLOT.compareAndSet(this, null, p))
break;
p.item = null;
}
}
// 没有对方线程等待交换数据,将当前线程要交换的数据放到slot中,是一个Node对象
// 然后阻塞,等待唤醒
int h = p.hash;
// 如果是计时等待交换,则计算超时时间;否则设置为0。
long end = timed ? System.nanoTime() + ns : 0L;
// 如果CPU核心数大于1,则使用SPINS数,自旋;否则为1,没必要自旋。
int spins = (NCPU > 1) ? SPINS : 1;
// 记录对方线程交换来的数据
Object v;
// 如果p.match==null,表示还没有线程交换来数据
while ((v = p.match) == null) {
// 如果自旋次数大于0,计算hash随机数
if (spins > 0) {
// 生成随机数,用于自旋次数控制
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
// p是ThreadLocal记录的当前线程的Node。
// 如果slot不是p表示slot是别的线程放进去的
} else if (slot != p) {
spins = SPINS;
} else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
p.parked = t;
if (slot == p) {
if (ns == 0L)
// 阻塞当前线程
LockSupport.park(this);
else
// 如果是计时等待,则阻塞当前线程指定时间
LockSupport.parkNanos(this, ns);
}
p.parked = null;
} else if (SLOT.compareAndSet(this, p, null)) {
// 没有被中断但是超时了,返回TIMED_OUT,否则返回null
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT :
null;
break;
}
}
// match设置为null值 CAS
MATCH.setRelease(p, null);
p.item = null;
p.hash = h;
// 返回获取的对方线程交换来的数据
return v;
}
// ...
}

arenaExchange的实现:

package java.util.concurrent;
public class Exchanger<V> {
// ...
/**
* 当启用arenas的时候,使用该方法进行线程间的数据交换。
*
* @param item 本线程要交换的非null数据。
* @param timed 如果需要计时等待,则设置为true。
* @param ns 表示计时等待的最大时长。
* @return 对方线程交换来的数据。如果线程被中断,或者等待超时,则返回null。
*/
private final Object arenaExchange(Object item, boolean timed, long ns)
{
Node[] a = arena;
int alen = a.length;
Node p = participant.get();
// 访问下标为i处的slot数据
for (int i = p.index;;) { // access slot at i
int b, m, c;
int j = (i << ASHIFT) + ((1 << ASHIFT) - 1);
if (j < 0 || j >= alen)
j = alen - 1;
// 取出arena数组的第j个Node元素
Node q = (Node)AA.getAcquire(a, j);
// 如果q不是null,则将数组的第j个元素由q设置为null
if (q != null && AA.compareAndSet(a, j, q, null)) {
// 获取对方线程交换来的数据
Object v = q.item; // release
// 设置本方线程交换的数据
q.match = item;
// 获取对方线程对象
Thread w = q.parked;
if (w != null)
// 如果对方线程非空,则唤醒对方线程
LockSupport.unpark(w);
return v;
}
// 如果自旋次数没达到边界,且q为null
else if (i <= (m = (b = bound) & MMASK) && q == null) {
// 提供本方数据
p.item = item; // offer
// 将arena的第j个元素由null设置为p
if (AA.compareAndSet(a, j, null, p)) {
long end = (timed && m == 0) ? System.nanoTime() + ns :
0L;
Thread t = Thread.currentThread(); // wait
// 自旋等待
for (int h = p.hash, spins = SPINS;;) {
// 获取对方交换来的数据
Object v = p.match;
// 如果对方交换来的数据非空
if (v != null) {
// 将p设置为null,CAS操作
MATCH.setRelease(p, null);
// 清空
p.item = null; // clear for next
use
p.hash = h;
// 返回交换来的数据
return v;
}
// 产生随机数,用于限制自旋次数
else if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; //
xorshift
if (h == 0) // initialize hash
h = SPINS | (int)t.getId();
else if (h < 0 && // approx 50% true
(--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield(); // two yields per
wait
}
// 如果arena的第j个元素不是p
else if (AA.getAcquire(a, j) != p)
spins = SPINS; // releaser hasn't set
match yet
else if (!t.isInterrupted() && m == 0 &&
(!timed ||
(ns = end - System.nanoTime()) > 0L)) {
p.parked = t; // minimize window
if (AA.getAcquire(a, j) == p) {
if (ns == 0L)
// 当前线程阻塞,等待交换数据
LockSupport.park(this);
else
LockSupport.parkNanos(this, ns);
}
p.parked = null;
}
// arena的第j个元素是p并且CAS设置arena的第j个元素由p设置
为null成功
else if (AA.getAcquire(a, j) == p &&
AA.compareAndSet(a, j, p, null)) {
if (m != 0) // try to shrink
BOUND.compareAndSet(this, b, b + SEQ - 1);
p.item = null;
p.hash = h;
i = p.index >>>= 1; // descend
// 如果线程被中断,则返回null值
if (Thread.interrupted())
return null;
if (timed && m == 0 && ns <= 0L)
// 如果超时,返回TIMED_OUT。
return TIMED_OUT;
break; // expired; restart
}
}
}
else
p.item = null; // clear offer
}
//
else {
if (p.bound != b) { // stale; reset
p.bound = b;
p.collides = 0;
i = (i != m || m == 0) ? m : m - 1;
}
else if ((c = p.collides) < m || m == FULL ||
!BOUND.compareAndSet(this, b, b + SEQ + 1)) {
p.collides = c + 1;
i = (i == 0) ? m : i - 1; // cyclically
traverse
}
else
i = m + 1; // grow
p.index = i;
}
}
}
// ...
}

Phaser

用Phaser替代CyclicBarrier和CountDownLatch

从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强 大。

1、 用Phaser替代CountDownLatch

考虑讲CountDownLatch时的例子,1个主线程要等10个worker线程完成之后,才能做接下来的事 情,也可以用Phaser来实现此功能。在CountDownLatch中,主要是2个方法:await()和 countDown(),在Phaser中,与之相对应的方法是awaitAdance(int n)和arrive()。

package com.lagou.concurrent.demo;
import java.util.Random;
import java.util.concurrent.Phaser;
public class Main {
public static void main(String[] args) {
Phaser phaser = new Phaser(5);
for (int i = 0; i < 5; i++) {
new Thread("线程-" + (i + 1)) {
private final Random random = new Random();
@Override
public void run() {
System.out.println(getName() + " - 开始运行");
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName() + " - 运行结束");
phaser.arrive();
}
}.start();
}
System.out.println("线程启动完毕");
phaser.awaitAdvance(phaser.getPhase());
System.out.println("线程运行结束");
}
}

2、 用Phaser替代CyclicBarrier

考虑前面讲CyclicBarrier时,10个工程师去公司应聘的例子,也可以用Phaser实现,代码基本类 似。

package com.lagou.concurrent.demo;
import java.util.concurrent.Phaser;
public class Main {
public static void main(String[] args) {
Phaser phaser = new Phaser(5);
for (int i = 0; i < 5; i++) {
new MyThread("线程-" + (i + 1), phaser).start();
}
phaser.awaitAdvance(0);
}
}
package com.lagou.concurrent.demo;
import java.util.Random;
import java.util.concurrent.Phaser;
public class MyThread extends Thread {
private final Phaser phaser;
private final Random random = new Random();
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(getName() + " - 开始向公司出发");
slowly();
System.out.println(getName() + " - 已经到达公司");
// 到达同步点,等待其他线程
phaser.arriveAndAwaitAdvance();
System.out.println(getName() + " - 开始笔试");
slowly();
System.out.println(getName() + " - 笔试结束");
// 到达同步点,等待其他线程
phaser.arriveAndAwaitAdvanc
System.out.println(getName() + " - 开始面试");
slowly();
System.out.println(getName() + " - 面试结束");
}
private void slowly() {
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

arriveAndAwaitAdance()就是 arrive()与 awaitAdvance(int)的组合,表示“我自己已到达这个同步 点,同时要等待所有人都到达这个同步点,然后再一起前行”。

Phaser新特性

特性1:动态调整线程个数

CyclicBarrier 所要同步的线程个数是在构造方法中指定的,之后不能更改,而 Phaser 可以在运行 期间动态地调整要同步的线程个数。Phaser 提供了下面这些方法来增加、减少所要同步的线程个数。

register() // 注册一个
bulkRegister(int parties) // 注册多个
arriveAndDeregister() // 解除注册

特性2:层次Phaser
多个Phaser可以组成如下图所示的树状结构,可以通过在构造方法中传入父Phaser来实现。

public Phaser(Phaser parent, int parties) {
// ...
}

先简单看一下Phaser内部关于树状结构的存储,如下所示:

可以发现,在Phaser的内部结构中,每个Phaser记录了自己的父节点,但并没有记录自己的子节点 列表。所以,每个 Phaser 知道自己的父节点是谁,但父节点并不知道自己有多少个子节点,对父节点 的操作,是通过子节点来实现的。

树状的Phaser怎么使用呢?考虑如下代码,会组成下图的树状Phaser。

Phaser root = new Phaser(2);
Phaser c1 = new Phaser(root, 3);
Phaser c2 = new Phaser(root, 2);
Phaser c3 = new Phaser(c1, 0);

本来root有两个参与者,然后为其加入了两个子Phaser(c1,c2),每个子Phaser会算作1个参与 者,root的参与者就变成2+2=4个。c1本来有3个参与者,为其加入了一个子Phaser c3,参与者数量变 成3+1=4个。c3的参与者初始为0,后续可以通过调用register()方法加入。

对于树状Phaser上的每个节点来说,可以当作一个独立的Phaser来看待,其运作机制和一个单独的 Phaser是一样的。
父Phaser并不用感知子Phaser的存在,当子Phaser中注册的参与者数量大于0时,会把自己向父节 点注册;当子Phaser中注册的参与者数量等于0时,会自动向父节点解除注册。父Phaser把子Phaser当 作一个正常参与的线程就即可。

state变量解析

大致了解了Phaser的用法和新特性之后,下面仔细剖析其实现原理。Phaser没有基于AQS来实现, 但具备AQS的核心特性:state变量、CAS操作、阻塞队列。先从state变量说起。

这个64位的state变量被拆成4部分,下图为state变量各部分:

最高位0表示未同步完成,1表示同步完成,初始最高位为0。
Phaser提供了一系列的成员方法来从state中获取上图中的几个数字,如下所示:

下面再看一下state变量在构造方法中是如何被赋值的:

public Phaser(Phaser parent, int parties) {
if (parties >>> PARTIES_SHIFT != 0)
// 如果parties数超出了最大个数(2的16次方),抛异常
throw new IllegalArgumentException("Illegal number of parties");
// 初始化轮数为0
int phase = 0;
this.parent = parent;
if (parent != null) {
final Phaser root = parent.root;
// 父节点的根节点就是自己的根节点
this.root = root;
// 父节点的evenQ就是自己的evenQ
this.evenQ = root.evenQ;
// 父节点的oddQ就是自己的oddQ
this.oddQ = root.oddQ;
// 如果参与者不是0,则向父节点注册自己
if (parties != 0)
phase = parent.doRegister(1);
}
else {
// 如果父节点为null,则自己就是root节点
this.root = this;
// 创建奇数节点
this.evenQ = new AtomicReference<QNode>();
// 创建偶数节点
this.oddQ = new AtomicReference<QNode>();
}
this.state = (parties == 0) ? (long)EMPTY :
((long)phase << PHASE_SHIFT) | // 位或操作,赋值state。最高位
为0,表示同步未完成
((long)parties << PARTIES_SHIFT) |
((long)parties);
}

当parties=0时,state被赋予一个EMPTY常量,常量为1;
当parties != 0时,把phase值左移32位;把parties左移16位;然后parties也作为最低的16位,3个 值做或操作,赋值给state。

阻塞与唤醒(Treiber Stack)

基于上述的state变量,对其执行CAS操作,并进行相应的阻塞与唤醒。如下图所示,右边的主线程 会调用awaitAdvance()进行阻塞;左边的arrive()会对state进行CAS的累减操作,当未到达的线程数减到 0时,唤醒右边阻塞的主线程。

在这里,阻塞使用的是一个称为Treiber Stack的数据结构,而不是AQS的双向链表。Treiber Stack 是一个无锁的栈,它是一个单向链表,出栈、入栈都在链表头部,所以只需要一个head指针,而不需要 tail指针,如下的实现:

为了减少并发冲突,这里定义了2个链表,也就是2个Treiber Stack。当phase为奇数轮的时候,阻 塞线程放在oddQ里面;当phase为偶数轮的时候,阻塞线程放在evenQ里面。代码如下所示。

arrive()方法分析

下面看arrive()方法是如何对state变量进行操作,又是如何唤醒线程的。

arrive()和 arriveAndDeregister()内部调用的都是 doArrive(boolean)方法。
区别在于前者只是把“未达到线程数”减1;后者则把“未到达线程数”和“下一轮的总线程数”都减1。下 面看一下doArrive(boolean)方法的实现。

private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
// 获取未到达线程数
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
// 如果未到达线程数小于等于0,抛异常。
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
// CAS操作,将state的值减去adjust
if (STATE.compareAndSet(this, s, s-=adjust)) {
// 如果未到达线程数为1
if (unarrived == 1) {
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (root == this) {
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
STATE.compareAndSet(this, s, n);
releaseWaiters(phase);
}
// 如果下一轮的未到达线程数为0
else if (nextUnarrived == 0) { // propagate deregistration
phase = parent.doArrive(ONE_DEREGISTER);
STATE.compareAndSet(this, s, s | EMPTY);
}
else
// 否则调用父节点的doArrive方法,传递参数1,表示当前节点已完成
phase = parent.doArrive(ONE_ARRIVAL);
}
return phase;
}
}
}

关于上面的方法,有以下几点说明:

1、 定义了2个常量如下。
当 deregister=false 时,只最低的16位需要减 1,adj=ONE_ARRIVAL;当deregister=true 时,低32位中的2个16位都需要减1,adj=ONE_ARRIVAL|ONE_PARTY。

2、 把未到达线程数减1。减了之后,如果还未到0,什么都不做,直接返回。如果到0,会做2件事 情:第1,重置state,把state的未到达线程个数重置到总的注册的线程数中,同时phase加 1;第2,唤醒队列中的线程。

下面看一下唤醒方法:

awaitAdvance()方法分析

下面的while循环中有4个分支:
初始的时候,node==null,进入第1个分支进行自旋,自旋次数满足之后,会新建一个QNode节 点;
之后执行第3、第4个分支,分别把该节点入栈并阻塞。

private int internalAwaitAdvance(int phase, QNode node) {
// assert root == this;
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) { // 不可中断模式的自旋
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
if (interrupted || --spins < 0) { // 自旋结束,建一个节点,之后进入阻
塞
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
else
Thread.onSpinWait();
}
else if (node.isReleasable()) // 从阻塞唤醒,退出while循环
break;
else if (!queued) { // push onto queue
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node); // 节点入栈
}
else {
try {
ForkJoinPool.managedBlock(node); // 调用node.block()阻塞
} catch (InterruptedException cantHappen) {
node.wasInterrupted = true;
}
}
}
if (node != null) {
if (node.thread != null)
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // possibly clean up on abort
}
releaseWaiters(phase);
return p;
}

这里调用了ForkJoinPool.managedBlock(ManagedBlocker blocker)方法,目的是把node对应的线 程阻塞。ManagerdBlocker是ForkJoinPool里面的一个接口,定义如下:

public static interface ManagedBlocker {
boolean block() throws InterruptedException;
boolean isReleasable();
}

QNode实现了该接口,实现原理还是park(),如下所示。之所以没有直接使用park()/unpark()来实 现阻塞、唤醒,而是封装了ManagedBlocker这一层,主要是出于使用上的方便考虑。一方面是park()可 能被中断唤醒,另一方面是带超时时间的park(),把这二者都封装在一起。

static final class QNode implements ForkJoinPool.ManagedBlocker {
final Phaser phaser;
final int phase;
final boolean interruptible;
final boolean timed;
boolean wasInterrupted;
long nanos;
final long deadline;
volatile Thread thread; // nulled to cancel wait
QNode next;
QNode(Phaser phaser, int phase, boolean interruptible,
boolean timed, long nanos) {
this.phaser = phaser;
this.phase = phase;
this.interruptible = interruptible;
this.nanos = nanos;
this.timed = timed;
this.deadline = timed ? System.nanoTime() + nanos : 0L;
thread = Thread.currentThread();
}
public boolean isReleasable() {
if (thread == null)
return true;
if (phaser.getPhase() != phase) {
thread = null;
return true;
}
if (Thread.interrupted())
wasInterrupted = true;
if (wasInterrupted && interruptible) {
thread = null;
return true;
}
if (timed &&
(nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
thread = null;
return true;
}
return false;
}
public boolean block() {
while (!isReleasable()) {
if (timed)
LockSupport.parkNanos(this, nanos);
else
LockSupport.park(this);
}
return true;
}
}

理解了arrive()和awaitAdvance(),arriveAndAwaitAdvance()就是二者的一个组合版本。

Atomic类

AtomicInteger和AtomicLong

如下面代码所示,对于一个整数的加减操作,要保证线程安全,需要加锁,也就是加synchronized 关键字。

public class MyClass {
private int count = 0;
public void synchronized increment() {
count++;
}
public void synchronized decrement() {
count--;
}
}

但有了Concurrent包的Atomic相关的类之后,synchronized关键字可以用AtomicInteger代替,其 性能更好,对应的代码变为:

public class MyClass {
private AtomicInteger count = new AtomicInteger(0);
public void add() {
count.getAndIncrement();
}
public long minus() {
count.getAndDecrement();
}
}

其对应的源码如下:

上图中的U是Unsafe的对象:

AtomicInteger的 getAndIncrement() 方法和 getAndDecrement() 方法都调用了一个方法: U.getAndAddInt(…) 方法,该方法基于CAS实现:

do-while循环直到判断条件返回true为止。该操作称为自旋。

getAndAddInt 方法具有volatile的语义,也就是对所有线程都是同时可见的。

而 weakCompareAndSetInt 方法的实现:

调用了 compareAndSetInt 方法,该方法的实现:

上图中的方法中,

  • 第一个参数表示要修改哪个对象的属性值;
  • 第二个参数是该对象属性在内存的偏移量;
  • 第三个参数表示期望值;
  • 第四个参数表示要设置为的目标值。

源码比较简单,重要的是其中的设计思想。

悲观锁与乐观锁

对于悲观锁,认为数据发生并发冲突的概率很大,读操作之前就上锁。synchronized关键字,后面 要讲的ReentrantLock都是悲观锁的典型。

对于乐观锁,认为数据发生并发冲突的概率比较小,读操作之前不上锁。等到写操作的时候,再判 断数据在此期间是否被其他线程修改了。如果被其他线程修改了,就把数据重新读出来,重复该过程; 如果没有被修改,就写回去。判断数据是否被修改,同时写回新值,这两个操作要合成一个原子操作, 也就是CAS ( Compare And Set )。

AtomicInteger的实现就是典型的乐观锁。

Unsafe 的CAS详解

Unsafe类是整个Concurrent包的基础,里面所有方法都是native的。具体到上面提到的 compareAndSetInt方法,即:

要特别说明一下第二个参数,它是一个long型的整数,经常被称为xxxOffset,意思是某个成员变量 在对应的类中的内存偏移量(该变量在内存中的位置),表示该成员变量本身。
第二个参数的值为AtomicInteger中的属性VALUE:

VALUE的值:

而Unsafe的 objectFieldOffset(...) 方法调用,就是为了找到AtomicInteger类中value属性所 在的内存偏移量。

objectFieldOffset 方法的实现:

其中objectFieldOffset1的实现为:

所有调用CAS的地方,都会先通过这个方法把成员变量转换成一个Offset。以AtomicInteger为例:

package java.util.concurrent.atomic;
public class AtomicInteger extends Number implements java.io.Serializable {
private static final jdk.internal.misc.Unsafe U =
jdk.internal.misc.Unsafe.getUnsafe();
private static final long VALUE =
U.objectFieldOffset(AtomicInteger.class, "value");
}

从上面代码可以看到,无论是Unsafe还是VALUE,都是静态的,也就是类级别的,所有对象共用 的。
此处的VALUE就代表了value变量本身,后面执行CAS操作的时候,不是直接操作value,而是操作 VALUE。

自旋与阻塞

当一个线程拿不到锁的时候,有以下两种基本的等待策略:

  • 策略1:放弃CPU,进入阻塞状态,等待后续被唤醒,再重新被操作系统调度。
  • 策略2:不放弃CPU,空转,不断重试,也就是所谓的“自旋”。

很显然,如果是单核的CPU,只能用策略1。因为如果不放弃CPU,那么其他线程无法运行,也就无 法释放锁。但对于多CPU或者多核,策略2就很有用了,因为没有线程切换的开销。

AtomicInteger的实现就用的是“自旋”策略,如果拿不到锁,就会一直重试。

注意:以上两种策略并不互斥,可以结合使用。如果获取不到锁,先自旋;如果自旋还拿不到锁, 再阻塞,synchronized关键字就是这样的实现策略。
除了AtomicInteger,AtomicLong也是同样的原理。

AtomicBoolean和AtomicReference

为什么需要AtomicBoolean

对于int或者long型变量,需要进行加减操作,所以要加锁;但对于一个boolean类型来说,true或 false的赋值和取值操作,加上volatile关键字就够了,为什么还需要AtomicBoolean呢?

这是因为往往要实现下面这种功能:

if (!flag) {
flag = true;
// ...
}
// 或者更清晰一点的:
if (flag == false) {
flag = true;
// ...
}

也就是要实现 compare和set两个操作合在一起的原子性,而这也正是CAS提供的功能。上面的代 码,就变成:

if (compareAndSet(false, true)) {
// ...
}

同样地,AtomicReference也需要同样的功能,对应的方法如下:

其中,expect是旧的引用,update为新的引用。

如何支持boolean和double类型

在Unsafe类中,只提供了三种类型的CAS操作:int、long、Object(也就是引用类型)。如下所 示:

即,在jdk的实现中,这三种CAS操作都是由底层实现的,其他类型的CAS操作都要转换为这三种之 一进行操作。

其中的参数:

  1. 第一个参数是要修改的对象
  2. 第二个参数是对象的成员变量在内存中的位置(一个long型的整数)
  3. 第三个参数是该变量的旧值
  4. 第四个参数是该变量的新值。

AtomicBoolean类型如何支持?
对于用int型来代替的,在入参的时候,将boolean类型转换成int类型;在返回值的时候,将int类型 转换成boolean类型。如下所示:

如果是double类型,又如何支持呢?
这依赖double类型提供的一对double类型和long类型互转的方法:

Unsafe类中的方法实现:

AtomicStampedReference和AtomicMarkableReference

ABA问题与解决办法

到目前为止,CAS都是基于“值”来做比较的。但如果另外一个线程把变量的值从A改为B,再从B改回 到A,那么尽管修改过两次,可是在当前线程做CAS操作的时候,却会因为值没变而认为数据没有被其他 线程修改过,这就是所谓的ABA问题。
举例来说:
小张欠小李100块,约定今天还,给打到网银。
小李家的网银余额是0,打过来之后应该是100块。
小张今天还钱这个事小李知道,小李还告诉了自己媳妇。
小张还钱,小李媳妇看到了,就取出来花掉了。
小李恰好在他媳妇取出之后检查账户,一看余额还是0。
然后找小张,要账。

这其中,小李家的账户余额从0到100,再从100到0,小李一开始检查是0,第二次检查还是0,就 认为小张没还钱。
实际上小李媳妇花掉了。
ABA问题。
其实小李可以查看账户的收支记录。

要解决 ABA 问题,不仅要比较“值”,还要比较“版本号”,而这正是 AtomicStampedReference做的 事情,其对应的CAS方法如下:

之前的 CAS只有两个参数,这里的 CAS有四个参数,后两个参数就是版本号的旧值和新值。
当expectedReference != 对象当前的reference时,说明该数据肯定被其他线程修改过;
当expectedReference == 对象当前的reference时,再进一步比较expectedStamp是否等于对象当 前的版本号,以此判断数据是否被其他线程修改过。

为什么没有AtomicStampedInteger或AtomictStampedLong

要解决Integer或者Long型变量的ABA问题,为什么只有AtomicStampedReference,而没有 AtomicStampedInteger或者AtomictStampedLong呢?

因为这里要同时比较数据的“值”和“版本号”,而Integer型或者Long型的CAS没有办法同时比较两个 变量。
于是只能把值和版本号封装成一个对象,也就是这里面的Pair内部类,然后通过对象引用的CAS来 实现。代码如下所示

当使用的时候,在构造方法里面传入值和版本号两个参数,应用程序对版本号进行累加操作,然后 调用上面的CAS。如下所示:

AtomicMarkableReference

AtomicMarkableReference与AtomicStampedReference原理类似,只是Pair里面的版本号是 boolean类型的,而不是整型的累加变量,如下所示:

因为是boolean类型,只能有true、false 两个版本号,所以并不能完全避免ABA问题,只是降低了 ABA发生的概率。

AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和 AtomicReferenceFieldUpdater

为什么需要AtomicXXXFieldUpdater

如果一个类是自己编写的,则可以在编写的时候把成员变量定义为Atomic类型。但如果是一个已经 有的类,在不能更改其源代码的情况下,要想实现对其成员变量的原子操作,就需要 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和 AtomicReferenceFieldUpdater。

通过AtomicIntegerFieldUpdater理解它们的实现原理。
AtomicIntegerFieldUpdater是一个抽象类。
首先,其构造方法是protected,不能直接构造其对象,必须通过它提供的一个静态方法来创建,如 下所示:

方法 newUpdater 用于创建AtomicIntegerFieldUpdater类对象:

newUpdater(...)静态方法传入的是要修改的类(不是对象)和对应的成员变量的名字,内部通过反 射拿到这个类的成员变量,然后包装成一个AtomicIntegerFieldUpdater对象。所以,这个对象表示的是 类的某个成员,而不是对象的成员变量。

若要修改某个对象的成员变量的值,再传入相应的对象,如下所示:

accecssCheck方法的作用是检查该obj是不是tclass类型,如果不是,则拒绝修改,抛出异常。
从代码可以看到,其 CAS 原理和 AtomictInteger 是一样的,底层都调用了 Unsafe 的 compareAndSetInt(...)方法。

限制条件

要想使用AtomicIntegerFieldUpdater修改成员变量,成员变量必须是volatile的int类型(不能是 Integer包装类),该限制从其构造方法中可以看到:

至于 AtomicLongFieldUpdater、AtomicReferenceFieldUpdater,也有类似的限制条件。其底层 的CAS原理,也和AtomicLong、AtomicReference一样。

AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray

Concurrent包提供了AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray三个数组 元素的原子操作。注意,这里并不是说对整个数组的操作是原子的,而是针对数组中一个元素的原子操 作而言。

使用方式

以AtomicIntegerArray为例,其使用方式如下:

相比于AtomicInteger的getAndIncrement()方法,这里只是多了一个传入参数:数组的下标i。

其他方法也与此类似,相比于 AtomicInteger 的各种加减方法,也都是多一个下标 i,如下所示。

实现原理

其底层的CAS方法直接调用VarHandle中native的getAndAdd方法。如下所示:

明白了AtomicIntegerArray的实现原理,另外两个数组的原子类实现原理与之类似。

Striped64与LongAdder

从JDK 8开始,针对Long型的原子操作,Java又提供了LongAdder、LongAccumulator;针对 Double类型,Java提供了DoubleAdder、DoubleAccumulator。Striped64相关的类的继承层次如下图 所示。

LongAdder原理

AtomicLong内部是一个volatile long型变量,由多个线程对这个变量进行CAS操作。多个线程同时 对一个变量进行CAS操作,在高并发的场景下仍不够快,如果再要提高性能,该怎么做呢?

把一个变量拆成多份,变为多个变量,有些类似于 ConcurrentHashMap 的分段锁的例子。如下图 所示,把一个Long型拆成一个base变量外加多个Cell,每个Cell包装了一个Long型变量。当多个线程并 发累加的时候,如果并发度低,就直接加到base变量上;如果并发度高,冲突大,平摊到这些Cell上。 在最后取值的时候,再把base和这些Cell求sum运算。

以LongAdder的sum()方法为例,如下所示。

由于无论是long,还是double,都是64位的。但因为没有double型的CAS操作,所以是通过把 double型转化成long型来实现的。所以,上面的base和cell[]变量,是位于基类Striped64当中的。英文 Striped意为“条带”,也就是分片。

abstract class Striped64 extends Number {
transient volatile Cell[] cells;
transient volatile long base;
@jdk.internal.vm.annotation.Contended static final class Cell {
// ...
volatile long value;
Cell(long x) { value = x; }
// ...
}
}

最终一致性

在sum求和方法中,并没有对cells[]数组加锁。也就是说,一边有线程对其执行求和操作,一边还 有线程修改数组里的值,也就是最终一致性,而不是强一致性。这也类似于ConcurrentHashMap 中的 clear()方法,一边执行清空操作,一边还有线程放入数据,clear()方法调用完毕后再读取,hash map里 面可能还有元素。因此,在LongAdder适合高并发的统计场景,而不适合要对某个 Long 型变量进行严 格同步的场景。

伪共享与缓存行填充

在Cell类的定义中,用了一个独特的注解@sun.misc.Contended,这是JDK 8之后才有的,背后涉及 一个很重要的优化原理:伪共享与缓存行填充。

每个 CPU 都有自己的缓存。缓存与主内存进行数据交换的基本单位叫Cache Line(缓存行)。在 64位x86架构中,缓存行是64字节,也就是8个Long型的大小。这也意味着当缓存失效,要刷新到主内 存的时候,最少要刷新64字节。

如下图所示,主内存中有变量X、Y、Z(假设每个变量都是一个Long型),被CPU1和CPU2分别读 入自己的缓存,放在了同一行Cache Line里面。当CPU1修改了X变量,它要失效整行Cache Line,也就 是往总线上发消息,通知CPU 2对应的Cache Line失效。由于Cache Line是数据交换的基本单位,无法 只失效X,要失效就会失效整行的Cache Line,这会导致Y、Z变量的缓存也失效。

虽然只修改了X变量,本应该只失效X变量的缓存,但Y、Z变量也随之失效。Y、Z变量的数据没有修 改,本应该很好地被 CPU1 和 CPU2 共享,却没做到,这就是所谓的“伪共享问题”。

问题的原因是,Y、Z和X变量处在了同一行Cache Line里面。要解决这个问题,需要用到所谓的“缓 存行填充”,分别在X、Y、Z后面加上7个无用的Long型,填充整个缓存行,让X、Y、Z处在三行不同的缓 存行中,如下图所示:

声明一个@jdk.internal.vm.annotation.Contended即可实现缓存行的填充。之所以这个地方要用 缓存行填充,是为了不让Cell[]数组中相邻的元素落到同一个缓存行里。

LongAdder核心实现

下面来看LongAdder最核心的累加方法add(long x),自增、自减操作都是通过调用该方法实现的。

当一个线程调用add(x)的时候,首先会尝试使用casBase把x加到base变量上。如果不成功,则再用 c.cas(...)方法尝试把 x 加到 Cell 数组的某个元素上。如果还不成功,最后再调用longAccumulate(...)方 法。

注意:Cell[]数组的大小始终是2的整数次方,在运行中会不断扩容,每次扩容都是增长2倍。上面代 码中的 cs[getProbe() & m] 其实就是对数组的大小取模。因为m=cs.length–1,getProbe()为该线程 生成一个随机数,用该随机数对数组的长度取模。因为数组长度是2的整数次方,所以可以用&操作来优 化取模运算。
对于一个线程来说,它并不在意到底是把x累加到base上面,还是累加到Cell[]数组上面,只要累加 成功就可以。因此,这里使用随机数来实现Cell的长度取模。

如果两次尝试都不成功,则调用 longAccumulate(...)方法,该方法在 Striped64 里面 LongAccumulator也会用到,如下所示。

final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
// true表示最后一个slot非空
boolean collide = false;
done: for (;;) {
Cell[] cs; Cell c; int n; long v;
// 如果cells不是null,且cells长度大于0
if ((cs = cells) != null && (n = cs.length) > 0) {
// cells最大下标对随机数取模,得到新下标。
// 如果此新下标处的元素是null
if ((c = cs[(n - 1) & h]) == null) {
// 自旋锁标识,用于创建cells或扩容cells
if (cellsBusy == 0) { // 尝试添加新的Cell
Cell r = new Cell(x); // Optimistically create
// 如果cellsBusy为0,则CAS操作cellsBusy为1,获取锁
if (cellsBusy == 0 && casCellsBusy()) {
try { // 获取锁之后,再次检查
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 赋值成功,返回
rs[j] = r;
break done;
}
} finally {
// 重置标志位,释放锁
cellsBusy = 0;
}
continue; // 如果slot非空,则进入下一次循环
}
}
collide = false;
}
else if (!wasUncontended) // CAS操作失败
wasUncontended = true; // rehash之后继续
else if (c.cas(v = c.value,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break;
else if (n >= NCPU || cells != cs)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == cs) // 扩容,每次都是上次的两倍长度
cells = Arrays.copyOf(cs, n << 1);
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
// 如果cells为null或者cells的长度为0,则需要初始化cells数组
// 此时需要加锁,进行CAS操作
else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
try { // Initialize table
if (cells == cs) {
// 实例化Cell数组,实例化Cell,保存x值
Cell[] rs = new Cell[2];
// h为随机数,对Cells数组取模,赋值新的Cell对象。
rs[h & 1] = new Cell(x);
cells = rs;
break done;
}
} finally {
// 释放CAS锁
cellsBusy = 0;
}
}
// 如果CAS操作失败,最后回到对base的操作
// 判断fn是否为null,如果是null则执行加操作,否则执行fn提供的操作
// 如果操作失败,则重试for循环流程,成功就退出循环
else if (casBase(v = base,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break done;
}
}

LongAccumulator

LongAccumulator的原理和LongAdder类似,只是功能更强大,下面为两者构造方法的对比:

LongAdder只能进行累加操作,并且初始值默认为0;LongAccumulator可以自己定义一个二元操 作符,并且可以传入一个初始值。

操作符的左值,就是base变量或者Cells[]中元素的当前值;右值,就是add()方法传入的参数x。

下面是LongAccumulator的accumulate(x)方法,与LongAdder的add(x)方法类似,最后都是调用 的Striped64的LongAccumulate(...)方法。
唯一的差别就是LongAdder的add(x)方法调用的是casBase(b, b+x),这里调用的是casBase(b, r), 其中,r=function.applyAsLong(b=base, x)。

DoubleAdder与DoubleAccumulator

DoubleAdder 其实也是用 long 型实现的,因为没有 double 类型的 CAS 方法。下面是 DoubleAdder的add(x)方法,和LongAdder的add(x)方法基本一样,只是多了long和double类型的相互 转换。

其中的关键Double.doubleToRawLongBits(Double.longBitsToDouble(b) + x),在读出来的时候, 它把 long 类型转换成 double 类型,然后进行累加,累加的结果再转换成 long 类型,通过CAS写回 去。

DoubleAccumulate也是Striped64的成员方法,和longAccumulate类似,也是多了long类型和 double类型的互相转换。
DoubleAccumulator和DoubleAdder的关系,与LongAccumulator和LongAdder的关系类似,只 是多了一个二元操作符。