今でもあなたは私の光丶

并发编程(6)ForkJoinPool

ForkJoinPool用法

ForkJoinPool就是JDK7提供的一种“分治算法”的多线程并行计算框架。Fork意为分叉,Join意为合 并,一分一合,相互配合,形成分治算法。此外,也可以将ForkJoinPool看作一个单机版的 Map/Reduce,多个线程并行计算。
相比于ThreadPoolExecutor,ForkJoinPool可以更好地实现计算的负载均衡,提高资源利用率。

假设有5个任务,在ThreadPoolExecutor中有5个线程并行执行,其中一个任务的计算量很大,其余 4个任务的计算量很小,这会导致1个线程很忙,其他4个线程则处于空闲状态。
利用ForkJoinPool,可以把大的任务拆分成很多小任务,然后这些小任务被所有的线程执行,从而 实现任务计算的负载均衡。

例子1:快排

快排有2个步骤:

  1. 利用数组的第1个元素把数组划分成两半,左边数组里面的元素小于或等于该元素,右边数组 里面的元素比该元素大;
  2. 对左右的两个子数组分别排序。

左右两个子数组相互独立可以并行计算。利用ForkJoinPool,代码如下:

package com.lagou.concurrent.demo;
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
public class ForkJoinPoolDemo01 {
static class SortTask extends RecursiveAction {
final long[] array;
final int lo;
final int hi;
public SortTask(long[] array) {
this.array = array;
this.lo = 0;
this.hi = array.length - 1;
}
public SortTask(long[] array, int lo, int hi) {
this.array = array;
this.lo = lo;
this.hi = hi;
}
private int partition(long[] array, int lo, int hi) {
long x = array[hi];
int i = lo - 1;
for (int j = lo; j < hi; j++) {
if (array[j] <= x) {
i++;
swap(array, i, j);
}
}
swap(array, i + 1, hi);
return i + 1;
}
private void swap(long[] array, int i, int j) {
if (i != j) {
long temp = array[i];
array[i] = array[j];
array[j] = temp;
}
}
@Override
protected void compute() {
if (lo < hi) {
// 找到分区的元素下标
int pivot = partition(array, lo, hi);
// 将数组分为两部分
SortTask left = new SortTask(array, lo, pivot - 1);
SortTask right = new SortTask(array, pivot + 1, hi);
left.fork();
right.fork();
left.join();
right.join();
}
}
}
public static void main(String[] args) throws InterruptedException {
long[] array = {5, 3, 7, 9, 2, 4, 1, 8, 10};
// 一个任务
ForkJoinTask sort = new SortTask(array);
// 一个pool
ForkJoinPool pool = new ForkJoinPool();
// ForkJoinPool开启多个线程,同时执行上面的子任务
pool.submit(sort);
// 结束ForkJoinPool
pool.shutdown();
// 等待结束Pool
pool.awaitTermination(10, TimeUnit.SECONDS);
System.out.println(Arrays.toString(array));
}
}

例子2:求1到n个数的和

package com.lagou.concurrent.demo;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class ForkJoinPoolDemo02 {
static class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10;
private long start;
private long end;
public SumTask(long n) {
this(1, n);
}
public SumTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
// 如果计算的范围在threshold之内,则直接进行计算
if ((end - start) <= THRESHOLD) {
for (long l = start; l <= end; l++) {
sum += l;
}
} else {
// 否则找出起始和结束的中间值,分割任务
long mid = (start + end) >>> 1;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid + 1, end);
left.fork();
right.fork();
// 收集子任务计算结果
sum = left.join() + right.join();
}
return sum;
}
}
public static void main(String[] args) throws ExecutionException,
InterruptedException {
SumTask sum = new SumTask(100);
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> future = pool.submit(sum);
Long aLong = future.get();
System.out.println(aLong);
pool.shutdown();
}
}

上面的代码用到了 RecursiveAction 和 RecursiveTask 两个类,它们都继承自抽象类 ForkJoinTask,用到了其中关键的接口 fork()、join()。二者的区别是一个有返回值,一个没有返回值。

RecursiveAction/RecursiveTask类继承关系:

在ForkJoinPool中,对应的接口如下:

核心数据结构

与ThreadPoolExector不同的是,除一个全局的任务队列之外,每个线程还有一个自己的局部队 列。

核心数据结构如下所示:

public class ForkJoinPool extends AbstractExecutorService {
// 状态变量,类似于ThreadPoolExecutor中的ctl变量。
volatile long ctl;
// 工作线程队列
WorkQueue[] workQueues;
// 工作线程工厂
final ForkJoinWorkerThreadFactory factory;
// 下一个worker的下标
int indexSeed;
static final class WorkQueue {
volatile int source; // source queue id, or sentinel
int id; // 在ForkJoinPool的workQueues数组中的下标
int base; // 队列尾部指针
int top; // 队列头指针
volatile int phase; // versioned, negative: queued, 1: locked
int stackPred; // pool stack (ctl) predecessor link
int nsteals; // number of steals
ForkJoinTask<?>[] array; // 工作线程的局部队列
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // 该工作队列的所有者线程,null表示共享
的
}
}
public class ForkJoinWorkerThread extends Thread {
// 当前工作线程所在的线程池,反向引用
final ForkJoinPool pool;
// 工作队列
final ForkJoinPool.WorkQueue workQueue;
}

下面看一下这些核心数据结构的构造过程。

public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode,
int corePoolSize,
int maximumPoolSize,
int minimumRunnable,
Predicate<? super ForkJoinPool> saturate,
long keepAliveTime,
TimeUnit unit) {
// check, encode, pack parameters
if (parallelism <= 0 || parallelism > MAX_CAP ||
maximumPoolSize < parallelism || keepAliveTime <= 0L)
throw new IllegalArgumentException();
if (factory == null)
throw new NullPointerException();
long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
(((long)(-parallelism) << RC_SHIFT) & RC_MASK));
int m = parallelism | (asyncMode ? FIFO : 0);
int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
//
int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
n = (n + 1) << 1; // power of two, including space for submission queues
// 工作线程名称前缀
this.workerNamePrefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
// 初始化工作线程数组为n,2的幂次方
this.workQueues = new WorkQueue[n];
// worker线程工厂,有默认值
this.factory = factory;
this.ueh = handler;
this.saturate = saturate;
this.keepAlive = ms;
this.bounds = b;
this.mode = m;
// ForkJoinPool的状态
this.ctl = c;
checkPermission();
}

工作窃取队列

关于上面的全局队列,有一个关键点需要说明:它并非使用BlockingQueue,而是基于一个普通的 数组得以实现。

这个队列又名工作窃取队列,为 ForkJoinPool 的工作窃取算法提供服务。在 ForkJoinPool开篇的注 释中,Doug Lea 特别提到了工作窃取队列的实现,其陈述来自如下两篇论文:"Dynamic Circular Work-Stealing Deque" by Chase and Lev,SPAA 2005与"Idempotent work stealing" by Michael,Saraswat,and Vechev,PPoPP 2009。读者可以在网上查阅相应论文。

所谓工作窃取算法,是指一个Worker线程在执行完毕自己队列中的任务之后,可以窃取其他线程队 列中的任务来执行,从而实现负载均衡,以防有的线程很空闲,有的线程很忙。这个过程要用到工作窃 取队列。

这个队列只有如下几个操作:

  1. Worker线程自己,在队列头部,通过对top指针执行加、减操作,实现入队或出队,这是单线 程的。
  2. 其他Worker线程,在队列尾部,通过对base进行累加,实现出队操作,也就是窃取,这是多 线程的,需要通过CAS操作。

这个队列,在Dynamic Circular Work-Stealing Deque这篇论文中被称为dynamic-cyclic-array。之所 以这样命名,是因为有两个关键点:

  1. 整个队列是环形的,也就是一个数组实现的RingBuffer。并且base会一直累加,不会减小; top会累加、减小。最后,base、top的值都会大于整个数组的长度,只是计算数组下标的时 候,会取top&(queue.length-1),base&(queue.length-1)。因为queue.length是2的整 数次方,这里也就是对queue.length进行取模操作。
    当top-base=queue.length-1 的时候,队列为满,此时需要扩容;
    当top=base的时候,队列为空,Worker线程即将进入阻塞状态。
  2. 当队列满了之后会扩容,所以被称为是动态的。但这就涉及一个棘手的问题:多个线程同时在 读写这个队列,如何实现在不加锁的情况下一边读写、一边扩容呢?

通过分析工作窃取队列的特性,我们会发现:在 base 一端,是多线程访问的,但它们只会使base 变大,也就是使队列中的元素变少。所以队列为满,一定发生在top一端,对top进行累加的时候,这一 端却是单线程的!队列的扩容恰好利用了这个单线程的特性!即在扩容过程中,不可能有其他线程对top 进行修改,只有线程对base进行修改!

下图为工作窃取队列扩容示意图。扩容之后,数组长度变成之前的二倍,但top、base的值是不变 的!通过top、base对新的数组长度取模,仍然可以定位到元素在新数组中的位置。

下面结合WorkQueue扩容的代码进一步分析。

final void growArray(boolean locked) {
ForkJoinTask<?>[] newA = null;
try {
ForkJoinTask<?>[] oldA; int oldSize, newSize;
// 当旧的array不是null,旧的array包含元素
// 并且新的数组长度小于队列最大长度,并且新的长度大于0
if ((oldA = array) != null && (oldSize = oldA.length) > 0 &&
(newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY &&
newSize > 0) {
try {
// 创建新数组
newA = new ForkJoinTask<?>[newSize];
} catch (OutOfMemoryError ex) {
}
if (newA != null) { // poll from old array, push to new
int oldMask = oldSize - 1, newMask = newSize - 1;
for (int s = top - 1, k = oldMask; k >= 0; --k) {
// 逐个复制
ForkJoinTask<?> x = (ForkJoinTask<?>)
// 获取旧的值,将原来的设置为null
QA.getAndSet(oldA, s & oldMask, null);
if (x != null)
newA[s-- & newMask] = x;
else
break;
}
array = newA;
VarHandle.releaseFence();
}
}
} finally {
if (locked)
phase = 0;
}
if (newA == null)
throw new RejectedExecutionException("Queue capacity exceeded");
}

ForkJoinPool状态控制

状态变量ctl解析

类似于ThreadPoolExecutor,在ForkJoinPool中也有一个ctl变量负责表达ForkJoinPool的整个生命 周期和相关的各种状态。不过ctl变量更加复杂,是一个long型变量,代码如下所示。

public class ForkJoinPool extends AbstractExecutorService {
// ...
// 线程池状态变量
volatile long ctl;
private static final long SP_MASK = 0xffffffffL;
private static final long UC_MASK = ~SP_MASK;
private static final int RC_SHIFT = 48;
private static final long RC_UNIT = 0x0001L << RC_SHIFT;
private static final long RC_MASK = 0xffffL << RC_SHIFT;
private static final int TC_SHIFT = 32;
private static final long TC_UNIT = 0x0001L << TC_SHIFT;
private static final long TC_MASK = 0xffffL << TC_SHIFT;
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); //
sign
// ...
}

ctl变量的64个比特位被分成五部分:

  1. AC:最高的16个比特位,表示Active线程数-parallelism,parallelism是上面的构造方法传进 去的参数;
  2. TC:次高的16个比特位,表示Total线程数-parallelism;
  3. ST:1个比特位,如果是1,表示整个ForkJoinPool正在关闭;
  4. EC:15个比特位,表示阻塞栈的栈顶线程的wait count(关于什么是wait count,接下来解 释)
  5. ID:16个比特位,表示阻塞栈的栈顶线程对应的id。

阻塞栈Treiber Stack

什么叫阻塞栈呢?
要实现多个线程的阻塞、唤醒,除了park/unpark这一对操作原语,还需要一个无锁链表实现的阻 塞队列,把所有阻塞的线程串在一起。

在ForkJoinPool中,没有使用阻塞队列,而是使用了阻塞栈。把所有空闲的Worker线程放在一个栈 里面,这个栈同样通过链表来实现,名为Treiber Stack。前面讲解Phaser的实现原理的时候,也用过这 个数据结构。

下图为所有阻塞的Worker线程组成的Treiber Stack。

首先,WorkQueue有一个id变量,记录了自己在WorkQueue[]数组中的下标位置,id变量就相当于 每个WorkQueue或ForkJoinWorkerThread对象的地址;

其次,ForkJoinWorkerThread还有一个stackPred变量,记录了前一个阻塞线程的id,这个 stackPred变量就相当于链表的next指针,把所有的阻塞线程串联在一起,组成一个Treiber Stack。

最后,ctl变量的最低16位,记录了栈的栈顶线程的id;中间的15位,记录了栈顶线程被阻塞的次 数,也称为wait count。

ctl变量的初始值

构造方法中,有如下的代码:

因为在初始的时候,ForkJoinPool 中的线程个数为 0,所以 AC=0-parallelism,TC=0- parallelism。这意味着只有高32位的AC、TC 两个部分填充了值,低32位都是0填充。

ForkJoinWorkerThread状态与个数分析

在ThreadPoolExecutor中,有corePoolSize和maxmiumPoolSize 两个参数联合控制总的线程数, 而在ForkJoinPool中只传入了一个parallelism参数,且这个参数并不是实际的线程数。那么, ForkJoinPool在实际的运行过程中,线程数究竟是由哪些因素决定的呢?

要回答这个问题,先得明白ForkJoinPool中的线程都可能有哪几种状态?可能的状态有三种:

  1. 空闲状态(放在Treiber Stack里面)。
  2. 活跃状态(正在执行某个ForkJoinTask,未阻塞)。
  3. 阻塞状态(正在执行某个ForkJoinTask,但阻塞了,于是调用join,等待另外一个任务的结果 返回)。

ctl变量很好地反映出了三种状态:
高32位:u=(int) (ctl >>> 32),然后u又拆分成tc、ac 两个16位;
低32位:c=(int) ctl。

  1. c>0,说明Treiber Stack不为空,有空闲线程;c=0,说明没有空闲线程;
  2. ac>0,说明有活跃线程;ac<=0,说明没有空闲线程,并且还未超出parallelism;
  3. tc>0,说明总线程数 >parallelism。

在提交任务的时候:

在通知工作线程的时候,需要判断ctl的状态,如果没有闲置的线程,则开启新线程:

Worker线程的阻塞-唤醒机制

ForkerJoinPool 没有使用 BlockingQueue,也就不利用其阻塞/唤醒机制,而是利用了park/unpark 原语,并自行实现了Treiber Stack。
下面进行详细分析ForkerJoinPool,在阻塞和唤醒的时候,分别是如何入栈的。

阻塞–入栈

当一个线程窃取不到任何任务,也就是处于空闲状态时就会阻塞入栈。

final void runWorker(WorkQueue w) {
// 随机数
int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng
// 初始化任务数组
w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
for (;;) {
int phase;
// 扫描是否有需要执行的一个或多个顶级任务
// 其中包含了窃取的任务执行,以及线程局部队列中任务的执行
// 如果发现了就执行,返回true
// 如果获取不到任务,就需要将该线程入队列,阻塞
if (scan(w, r)) {
// 随机数
r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
}
// 如果是已经入队列阻塞的,因为phase大于0表示加锁
else if ((phase = w.phase) >= 0) { // enqueue, then rescan
long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK;
long c, nc;
do {
w.stackPred = (int)(c = ctl);
// ForkJoinPool中status表示运行中的线程的,数字减一,因为入队列了。
nc = ((c - RC_UNIT) & UC_MASK) | np;
// CAS操作,自旋,直到操作成功
} while (!CTL.weakCompareAndSet(this, c, nc));
}
else { // already queued
int pred = w.stackPred;
Thread.interrupted(); // clear before park
w.source = DORMANT; // enable signal
long c = ctl;
int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT);
// 如果ForkJoinPool停止,则break,跳出循环
if (md < 0)
break;
// 优雅关闭
else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
tryTerminate(false, false))
break;
else if (rc <= 0 && pred != 0 && phase == (int)c) {
long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
long d = keepAlive + System.currentTimeMillis();
// 线程阻塞,计时等待
LockSupport.parkUntil(this, d);
//
if (ctl == c && // drop on timeout if all idle
d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
CTL.compareAndSet(this, c, nc)) {
// 不再扫描,需要入队列
w.phase = QUIET;
break;
}
}
// phase为1,表示加锁,phase为负数表示入队列
else if (w.phase < 0)
// 如果phase小于0,表示阻塞,排队中
LockSupport.park(this);
w.source = 0;
}
}
}
// 从一个队列中扫描一个或多个顶级任务,如果有,就执行
// 对于非空队列,执行任务,返回true
private boolean scan(WorkQueue w, int r) {
WorkQueue[] ws; int n;
// 如果workQueues不是null,并且workQueue的长度大于0,并且w非空,w是线程的
workQueue
if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) {
// m是ws长度减一,获取ws顶部workQueue
for (int m = n - 1, j = r & m;;) {
WorkQueue q; int b;
// 随机获取workQueue,如果该workQueue的顶指针和底指针不相等,表示有需要执
行的任务
if ((q = ws[j]) != null && q.top != (b = q.base)) {
int qid = q.id;
ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t;
// 如果workQueue的任务队列不是null,并且元素非空
if ((a = q.array) != null && (cap = a.length) > 0) {
// 获取队列顶部任务
t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) &
b);
// 如果q的base值没有被别的线程修改过,t不是null,并且将t从数组中
移除成功
// 即可在当前工作线程执行该任务
if (q.base == b++ && t != null &&
QA.compareAndSet(a, k, t, null)) {
// base+1
q.base = b;
// 更改source为当前id
w.source = qid;
// 如果还有任务需要执行,通知其他闲置的线程执行
if (q.top - b > 0)
signalWork();
// 让workQueue中的工作线程来执行不管是窃取来的,还是本地的任
务,还是从queue中获取的其他任务
// 公平起见,添加一个随机的边界;剩下的让别的线程来执行。
w.topLevelExec(t, q, // random fairness bound
r & ((n << TOP_BOUND_SHIFT) - 1));
}
}
return true;
}
else if (--n > 0)
j = (j + 1) & m;
else
break;
}
}
return false;
}

唤醒–出栈

在新的任务到来之后,空闲的线程被唤醒,其核心逻辑在signalWork方法里面。

final void signalWork() {
for (;;) {
long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
if ((c = ctl) >= 0L) // 足够的worker线程
break;
else if ((sp = (int)c) == 0) { // 没有闲置的worker线程
if ((c & ADD_WORKER) != 0L) // worker线程太少
tryAddWorker(c); // 尝试添加新的worker线程
break;
}
else if ((ws = workQueues) == null)
break; // 线程池没有启动或已经停止了
else if (ws.length <= (i = sp & SMASK))
break; // 线程池停止了
else if ((v = ws[i]) == null)
break; // 线程池正在停止中
else {
int np = sp & ~UNSIGNALLED;
int vp = v.phase;
long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
Thread vt = v.owner;
if (sp == vp && CTL.compareAndSet(this, c, nc)) {
v.phase = np;
// 如果栈顶元素存在,并且
if (vt != null && v.source < 0)
// 唤醒线程vt
LockSupport.unpark(vt);
break;
}
}
}
}

任务的提交过程分析

在明白了工作窃取队列、ctl变量的各种状态、Worker的各种状态,以及线程阻塞—唤醒机制之后, 接下来综合这些知识,详细分析任务的提交和执行过程。

关于任务的提交,ForkJoinPool最外层的接口如下所示。

/**
* 将一个可能是外部任务的子任务入队列
*/
private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
Thread t; ForkJoinWorkerThread w; WorkQueue q;
// 任务为null,抛异常
if (task == null)
throw new NullPointerException();
// 如果当前线程是ForkJoinWorkerThread类型的,并且该线程的pool就是当前对象
// 并且当前pool的workQueue不是null,则将当前任务入队列。
if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
(w = (ForkJoinWorkerThread)t).pool == this &&
(q = w.workQueue) != null)
// 当前任务入队局部队列
q.push(task);
else
// 否则该任务不是当前线程的子任务,调用外部入队方法,加入全局队列
externalPush(task);
return task;
}

如何区分一个任务是内部任务,还是外部任务呢?
可以通过调用该方法的线程类型判断。
如果线程类型是ForkJoinWorkerThread,说明是线程池内部的某个线程在调用该方法,则把该任务 放入该线程的局部队列;
否则,是外部线程在调用该方法,则将该任务加入全局队列。

内部提交任务push

内部提交任务,即上面的q.push(task),会放入该线程的工作窃取队列中,代码如下所示。

由于工作窃取队列的特性,操作是单线程的,所以此处不需要执行CAS操作。

外部提交任务

final void externalPush(ForkJoinTask<?> task) {
int r;
// 生成随机数
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue q;
int md = mode, n;
WorkQueue[] ws = workQueues;
// 如果ForkJoinPool关闭,或者任务队列是null,或者ws的长度小于等于0,拒收任务
if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
throw new RejectedExecutionException();
// 如果随机数计算的workQueues索引处的元素为null,则添加队列
// 即提交任务的时候,是随机向workQueue中添加workQueue,负载均衡的考虑。
else if ((q = ws[(n - 1) & r & SQMASK]) == null) { // add queue
// 计算新workQueue对象的id值
int qid = (r | QUIET) & ~(FIFO | OWNED);
// worker线程名称前缀
Object lock = workerNamePrefix;
// 创建任务数组
ForkJoinTask<?>[] qa =
new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
// 创建WorkQueue,将当前线程作为
q = new WorkQueue(this, null);
// 将任务数组赋值给workQueue
q.array = qa;
// 设置workQueue的id值
q.id = qid;
// 由于是通过客户端线程添加的workQueue,没有前置workQueue
// 内部提交任务有源workQueue,表示子任务
q.source = QUIET;
if (lock != null) { // unless disabled, lock pool to install
synchronized (lock) {
WorkQueue[] vs; int i, vn;
// 如果workQueues数组不是null,其中有元素,
// 并且qid对应的workQueues中的元素为null,则赋值
// 因为有可能其他线程将qid对应的workQueues处的元素设置了,
// 所以需要加锁,并判断元素是否为null
if ((vs = workQueues) != null && (vn = vs.length) > 0 &&
vs[i = qid & (vn - 1) & SQMASK] == null)
//
vs[i] = q;
}
}
}
// CAS操作,使用随机数
else if (!q.tryLockPhase()) // move if busy
r = ThreadLocalRandom.advanceProbe(r);
else {
// 如果任务添加成功,通知线程池调度,执行。
if (q.lockedPush(task))
signalWork();
return;
}
}
}

lockedPush(task)方法的实现:

外部多个线程会调用该方法,所以要加锁,入队列和扩容的逻辑和线程内部的队列基本相同。最 后,调用signalWork(),通知一个空闲线程来取。

工作窃取算法:任务的执行过程分析

全局队列有任务,局部队列也有任务,每一个Worker线程都会不间断地扫描这些队列,窃取任务来 执行。下面从Worker线程的run方法开始分析:

run()方法调用的是所在ForkJoinPool的runWorker方法,如下所示。

final void runWorker(WorkQueue w) {
int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng
w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // initialize
for (;;) {
int phase;
if (scan(w, r)) { // scan until apparently empty
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift)
}
else if ((phase = w.phase) >= 0) { // enqueue, then rescan
long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK;
long c, nc;
do {
w.stackPred = (int)(c = ctl);
nc = ((c - RC_UNIT) & UC_MASK) | np;
} while (!CTL.weakCompareAndSet(this, c, nc));
}
else { // already queued
int pred = w.stackPred;
Thread.interrupted(); // clear before park
w.source = DORMANT; // enable signal
long c = ctl;
int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT);
if (md < 0) // terminating
break;
else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
tryTerminate(false, false))
break; // quiescent shutdown
else if (rc <= 0 && pred != 0 && phase == (int)c) {
long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
long d = keepAlive + System.currentTimeMillis();
LockSupport.parkUntil(this, d);
if (ctl == c && // drop on timeout if all idle
d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
CTL.compareAndSet(this, c, nc)) {
w.phase = QUIET;
break;
}
}
else if (w.phase < 0)
LockSupport.park(this); // OK if spuriously woken
w.source = 0; // disable signal
}
}
}

下面详细看扫描过程scan(w, a)。

private boolean scan(WorkQueue w, int r) {
WorkQueue[] ws; int n;
if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) {
for (int m = n - 1, j = r & m;;) {
WorkQueue q; int b;
if ((q = ws[j]) != null && q.top != (b = q.base)) {
int qid = q.id;
ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t;
if ((a = q.array) != null && (cap = a.length) > 0) {
t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) &
b);
if (q.base == b++ && t != null &&
QA.compareAndSet(a, k, t, null)) {
q.base = b;
w.source = qid;
if (q.top - b > 0)
signalWork();
w.topLevelExec(t, q, // random fairness bound
r & ((n << TOP_BOUND_SHIFT) - 1));
}
}
return true;
}
else if (--n > 0)
j = (j + 1) & m;
else
break;
}
}
return false;
}

ForkJoinTask的fork/join

如果局部队列、全局中的任务全部是相互独立的,就很简单了。但问题是,对于分治算法来说,分 解出来的一个个任务并不是独立的,而是相互依赖,一个任务的完成要依赖另一个前置任务的完成。

这种依赖关系是通过ForkJoinTask中的join()来体现的。且看前面的代码:

protected void compute() {
if (lo < hi) {
// 分区
int pivot = partition(array, lo, hi);
SortTask left = new SortTask(array, lo, pivot - 1);
SortTask right = new SortTask(array, pivot + 1, hi);
left.fork();
right.fork();
left.join();
right.join();
}
}

线程在执行当前ForkJoinTask的时候,产生了left、right 两个子Task。

fork是指把这两个子Task放入队列里面;
join则是要等待2个子Task完成。

而子Task在执行过程中,会再次产生两个子Task。如此层层嵌套,类似于递归调用,直到最底层的 Task计算完成,再一级级返回。

fork

fork()的代码很简单,就是把自己放入当前线程所在的局部队列中。
如果是外部线程调用fork方法,则直接将任务添加到共享队列中。

join的嵌套

1、 join的层层嵌套阻塞原理

join会导致线程的层层嵌套阻塞,如图所示:

线程1在执行 ForkJoinTask1,在执行过程中调用了 forkJoinTask2.join(),所以要等ForkJoinTask2 完成,线程1才能返回;
线程2在执行ForkJoinTask2,但由于调用了forkJoinTask3.join(),只有等ForkJoinTask3完成后,线 程2才能返回;
线程3在执行ForkJoinTask3。

结果是:线程3首先执行完,然后线程2才能执行完,最后线程1再执行完。所有的任务其实组成一 个有向无环图DAG。如果线程3调用了forkJoinTask1.join(),那么会形成环,造成死锁。

那么,这种层次依赖、层次通知的 DAG,在 ForkJoinTask 内部是如何实现的呢?站在 ForkJoinTask的角度来看,每个ForkJoinTask,都可能有多个线程在等待它完成,有1个线程在执行它。 所以每个ForkJoinTask就是一个同步对象,线程在调用join()的时候,阻塞在这个同步对象上面,执行完 成之后,再通过这个同步对象通知所有等待的线程。

利用synchronized关键字和Java原生的wait()/notify()机制,实现了线程的等待-唤醒机制。调用 join()的这些线程,内部其实是调用ForkJoinTask这个对象的wait();执行该任务的Worker线程,在任务 执行完毕之后,顺便调用notifyAll()。

2、ForkJoinTask的状态解析

要实现fork()/join()的这种线程间的同步,对应的ForkJoinTask一定是有各种状态的,这个状态变量 是实现fork/join的基础。

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
// ...
// 由ForkJoinPool和workers直接调用
// 需要是volatile的
volatile int status;
private static final int DONE = 1 << 31; // 负值
private static final int ABNORMAL = 1 << 18; // 设置DONE的时候自动设置
private static final int THROWN = 1 << 17; // 设置ABNORMAL的时候自动设置
private static final int SIGNAL = 1 << 16; // 如果在调用join的线程正在等
待,则为true
private static final int SMASK = 0xffff; // short bits for tags
// ...
}

初始时,status=0。共有五种状态,可以分为两大类:

  1. 未完成:status>=0。
  2. 已完成:status<0。

所以,通过判断是status>=0,还是status<0,就可知道任务是否完成,进而决定调用join()的线 程是否需要被阻塞。

3、 join的详细实现

下面看一下代码的详细实现。

getRawResult()是ForkJoinTask中的一个模板方法,分别被RecursiveAction和RecursiveTask实 现,前者没有返回值,所以返回null,后者返回一个类型为V的result变量。

阻塞主要发生在上面的doJoin()方法里面。在dojoin()里调用t.join()的线程会阻塞,然后等待任务t执 行完成,再唤醒该阻塞线程,doJoin()返回。

注意:当 doJoin()返回的时候,就是该任务执行完成的时候,doJoin()的返回值就是任务的完成状 态,也就是上面的几种状态。

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
// ...
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
// 如果status < 0表示任务已经完成,不用阻塞,直接返回。
return (s = status) < 0 ? s :
// 否则判断线程是否是工作线程
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
// ...
}

上面的返回值可读性比较差,变形之后:

// 如果status < 0,直接返回s值
if ((s = status) < 0) {
return s;
} else {
// 如果线程是工作线程
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
// 将任务能够从局部队列弹出,并调用doExec()方法执行成功
if (w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) &&
(s = doExec()) < 0) {
// 返回s值
return s;
} else {
// 否则等待,线程阻塞
wt.pool.awaitJoin(w, this, 0L)
}
} else {
// 如果线程不是工作线程,则外部等待任务完成,线程阻塞
externalAwaitDone();
}
}

先看一下externalAwaitDone(),即外部线程的阻塞过程,相对简单。

private int externalAwaitDone() {
int s = tryExternalHelp();
if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
boolean interrupted = false;
synchronized (this) {
for (;;) {
if ((s = status) >= 0) {
try {
// 如果任务还没有完成,阻塞
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
else {
// 唤醒线程,开始执行
notifyAll();
break;
}
}
}
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}

内部Worker线程的阻塞,即上面的wt.pool.awaitJoin(w, this, 0L),相比外部线程的阻塞要做更多工 作。它现不在ForkJoinTask里面,而是在ForkJoinWorkerThread里面。

final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
int seed = ThreadLocalRandom.nextSecondarySeed();
if (w != null && task != null &&
(!(task instanceof CountedCompleter) ||
(s = w.helpCC((CountedCompleter<?>)task, 0, false)) >= 0)) {
// 尝试执行该任务
w.tryRemoveAndExec(task);
int src = w.source, id = w.id;
int r = (seed >>> 16) | 1, step = (seed & ~1) | 2;
s = task.status;
while (s >= 0) {
WorkQueue[] ws;
int n = (ws = workQueues) == null ? 0 : ws.length, m = n - 1;
while (n > 0) {
WorkQueue q; int b;
if ((q = ws[r & m]) != null && q.source == id &&
q.top != (b = q.base)) {
ForkJoinTask<?>[] a; int cap, k;
int qid = q.id;
if ((a = q.array) != null && (cap = a.length) > 0) {
ForkJoinTask<?> t = (ForkJoinTask<?>)
QA.getAcquire(a, k = (cap - 1) & b);
if (q.source == id && q.base == b++ &&
t != null && QA.compareAndSet(a, k, t, null)) {
q.base = b;
w.source = qid;
// 执行该任务
t.doExec();
w.source = src;
}
}
break;
}
else {
r += step;
--n;
}
}
// 如果任务的status < 0,任务执行完成,则退出循环,返回s的值
if ((s = task.status) < 0)
break;
else if (n == 0) { // empty scan
long ms, ns; int block;
if (deadline == 0L)
ms = 0L; // untimed
else if ((ns = deadline - System.nanoTime()) <= 0L)
break; // timeout
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
ms = 1L; // avoid 0 for timed wait
if ((block = tryCompensate(w)) != 0) {
task.internalWait(ms);
CTL.getAndAdd(this, (block > 0) ? RC_UNIT : 0L);
}
s = task.status;
}
}
}
return s;
}

上面的方法有个关键点:for里面是死循环,并且只有一个返回点,即只有在task.status<0,任务 完成之后才可能返回。否则会不断自旋;若自旋之后还不行,就会调用task.internalWait(ms);阻塞。

task.internalWait(ms);的代码如下。

4、 join的唤醒

调用t.join()之后,线程会被阻塞。接下来看另外一个线程在任务t执行完毕后如何唤醒阻塞的线程。

任务的执行发生在doExec()方法里面,任务执行完成后,调用一个setDone()通知所有等待的线程。 这里也做了两件事:

  1. 把status置为完成状态。
  2. 如果s != 0,即 s = SIGNAL,说明有线程正在等待这个任务执行完成。调用Java原生的 notifyAll()通知所有线程。如果s = 0,说明没有线程等待这个任务,不需要通知。

ForkJoinPool的优雅关闭

同ThreadPoolExecutor一样,ForkJoinPool的关闭也不可能是“瞬时的”,而是需要一个平滑的过渡 过程。

工作线程的退出

对于一个Worker线程来说,它会在一个for循环里面不断轮询队列中的任务,如果有任务,则执 行,处在活跃状态;如果没有任务,则进入空闲等待状态
这个线程如何退出呢?

**
* 工作线程的顶级循环,通过ForkJoinWorkerThread.run调用
*/
final void runWorker(WorkQueue w) {
int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng
w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // 初始化任务数组。
for (;;) {
int phase;
if (scan(w, r)) { // scan until apparently empty
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift)
}
else if ((phase = w.phase) >= 0) { // enqueue, then rescan
long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK;
long c, nc;
do {
w.stackPred = (int)(c = ctl);
nc = ((c - RC_UNIT) & UC_MASK) | np;
} while (!CTL.weakCompareAndSet(this, c, nc));
}
else { // already queued
int pred = w.stackPred;
Thread.interrupted(); // clear before park
w.source = DORMANT; // enable signal
long c = ctl;
int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT);
if (md < 0) // terminating
break;
// 优雅退出
else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
tryTerminate(false, false))
break;
else if (rc <= 0 && pred != 0 && phase == (int)c) {
long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
long d = keepAlive + System.currentTimeMillis();
LockSupport.parkUntil(this, d);
if (ctl == c && // drop on timeout if all idle
d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
CTL.compareAndSet(this, c, nc)) {
w.phase = QUIET;
break;
}
}
else if (w.phase < 0)
LockSupport.park(this); // OK if spuriously woken
w.source = 0; // disable signal
}
}
}

(int) (c = ctl) < 0,即低32位的最高位为1,说明线程池已经进入了关闭状态。但线程池进入关闭状 态,不代表所有的线程都会立马关闭。

shutdown()与shutdownNow()的区别

public void shutdown() {
checkPermission();
tryTerminate(false, true);
}
public List<Runnable> shutdownNow() {
checkPermission();
tryTerminate(true, true);
return Collections.emptyList();
}

二者的代码基本相同,都是调用tryTerminate(boolean, boolean)方法,其中一个传入的是false, 另一个传入的是true。tryTerminate意为试图关闭ForkJoinPool,并不保证一定可以关闭成功:

private boolean tryTerminate(boolean now, boolean enable) {
int md; // 三个阶段:尝试设置为SHUTDOWN,之后STOP,最后TERMINATED
while (((md = mode) & SHUTDOWN) == 0) {
if (!enable || this == common) // cannot shutdown
return false;
else
// 将mode变量CAS操作设置为SHUTDOWN
MODE.compareAndSet(this, md, md | SHUTDOWN);
}
while (((md = mode) & STOP) == 0) { // try to initiate termination
if (!now) { // check if quiescent & empty
for (long oldSum = 0L;;) { // repeat until stable
boolean running = false;
long checkSum = ctl;
WorkQueue[] ws = workQueues;
if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
// 还有正在运行的线程
running = true;
else if (ws != null) {
WorkQueue w;
for (int i = 0; i < ws.length; ++i) {
if ((w = ws[i]) != null) {
int s = w.source, p = w.phase;
int d = w.id, b = w.base;
if (b != w.top ||
((d & 1) == 1 && (s >= 0 || p >= 0))) {
running = true;
// 还正在运行
break;
}
checkSum += (((long)s << 48) + ((long)p << 32) +
((long)b << 16) + (long)d);
}
}
}
if (((md = mode) & STOP) != 0)
break; // already triggered
else if (running)
return false;
else if (workQueues == ws && oldSum == (oldSum = checkSum))
break;
}
}
if ((md & STOP) == 0)
// 如果需要立即停止,同时md没有设置为STOP,则设置为STOP
MODE.compareAndSet(this, md, md | STOP);
}
// 如果mode还没有设置为TERMINATED,则进行循环
while (((md = mode) & TERMINATED) == 0) { // help terminate others
for (long oldSum = 0L;;) { // repeat until stable
WorkQueue[] ws; WorkQueue w;
long checkSum = ctl;
if ((ws = workQueues) != null) {
for (int i = 0; i < ws.length; ++i) {
if ((w = ws[i]) != null) {
ForkJoinWorkerThread wt = w.owner;
// 清空任务队列
w.cancelAll();
if (wt != null) {
try {
// 中断join或park的线程
wt.interrupt();
} catch (Throwable ignore) {
}
}
checkSum += ((long)w.phase << 32) + w.base;
}
}
}
// 如果已经设置了TERMINATED,则跳出for循环,while循环条件为false,整个方
法返回true,停止
if (((md = mode) & TERMINATED) != 0 ||
(workQueues == ws && oldSum == (oldSum = checkSum)))
break;
}
if ((md & TERMINATED) != 0)
break;
else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
break;
else if (MODE.compareAndSet(this, md, md | TERMINATED)) {
synchronized (this) {
// 通知调用awaitTermination的线程,关闭ForkJoinPool了
notifyAll();
}
break;
}
}
return true;
}

总结:shutdown()只拒绝新提交的任务;shutdownNow()会取消现有的全局队列和局部队列中的 任务,同时唤醒所有空闲的线程,让这些线程自动退出。