FutureTask类源码剖析

FutureTask类源码剖析

一、FutrueTask类的简介

FutureTask为Future提供了基础实现,如获取任务执行结果和取消任务等,如果任务尚未完成,获取任务执行结果时将会阻塞。一旦执行结束,任务就不能被重启或取消,除非使用runAndReset执行计算。FutureTask常用来封装Callable和Runnable(本质上其实还是Callable),也可以作为一个任务提交到线程池中执行。

FutureTask类当前设计中的同步控制依赖于通过CAS更新的状态字段来跟踪完成情况,以及一个简单的Treiber堆栈来保存等待线程。

image-20231107163623828

可以看到,FutureTask实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable接口和Future接口,因此FutureTask既能当做一个Runnable对象直接被Thread执行,也能作为Future对象用来异步计算得到Callable的计算结果。

我们先来介绍涉及的两个接口,这会有助于我们后面的理解:

Callable接口

1
2
3
4
5
6
public interface Callable<V> {
/**
* 计算结果,如果无法计算结果,则抛出异常
*/
V call() throws Exception;
}

Callable是个泛型接口,泛型V就是要call()方法返回的类型。对比Runnable接口,Runnable接口的run方法不会返回数据也不能抛出异常。Executors类中包含一些实用的工具方法帮助我们从其他常见形式转换为Callable类如下面:

image-20231107182256974

Future接口

Future接口代表异步计算的结果,通过Future接口提供的方法可以查看异步计算是否执行完成,或者等待执行完成并获取执行结果,同时还可以取消执行。

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
  • cancel():cancel方法用来取消异步任务的执行。

    • 如果任务已经执行完成或者已经被取消,或者由于某些原因不能取消,则会返回false。
    • 如果任务还没有被执行,则会返回true并且异步任务不会被执行。
    • 如果任务已经开始执行了但是还没有执行完成:
      • 若mayInterruptIfRunning为true,则会立即中断执行任务的线程并返回true。
      • 若mayInterruptIfRunning为false,则不会中断任务执行线程并返回true。
  • isCanceled():判断任务是否被取消,如果任务在结束(正常执行结束或者执行异常结束)前被取消则返回true,否则返回false。

  • isDone():判断任务是否已经完成,如果完成则返回true,否则返回false。需要注意的是:任务执行过程中发生异常、任务被取消也属于任务已完成,也会返回true。

  • get():获取任务执行结果,如果任务还没完成则会阻塞等待直到任务执行完成。

    • 如果任务被取消则会抛出CancellationException异常。
    • 如果任务执行过程发生异常则会抛出ExecutionException异常。
    • 如果阻塞等待过程中被中断则会抛出InterruptedException异常。
  • get(long timeout,Timeunit unit):带超时时间的get()版本,如果阻塞等待过程中超时则会抛出TimeoutException异常。

二、FutureTask类的重要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 内部持有的Callable任务对象,运行完毕后置空
private Callable<V> callable;

// 从get()中返回的结果或抛出的异常
private Object outcome; // non-volatile, protected by state reads/writes

// 运行Callable的线程
private volatile Thread runner;

// 使用Treiber栈保存等待线程
private volatile WaitNode waiters;

// 任务状态
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

其中需要注意的是state是volatile类型的,也就是说只要有任何一个线程修改了这个变量,那么其他所有的线程都会知道最新的值。7种状态具体表示:

  • NEW:表示是个新的任务或者还没被执行完的任务,这是初始状态

  • COMPLETING:任务已经执行完成或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有保存到outcome字段(outcome字段用来保存任务执行结果,如果发生异常,则用来保存异常原因)的时候,状态会从NEW变更到COMPLETING。但是这个状态会时间会比较短,属于中间状态

  • NORMAL:任务已经执行完成并且任务执行结果已经保存到outcome字段,状态会从COMPLETING转换到NORMAL。这是一个最终状态

  • EXCEPTIONAL:任务执行发生异常并且异常原因已经保存到outcome字段中后,状态会从COMPLETING转换到EXCEPTIONAL。这是一个最终状态

  • CANCELLED:任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了cancel(false)方法取消任务且不中断任务执行线程,这个时候状态会从NEW转化为CANCELLED状态。这是一个最终状态

  • INTERRUPTING: 任务还没开始执行或者已经执行但是还没有执行完成的时候,用户调用了cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从NEW转化为INTERRUPTING。这是一个中间状态

  • INTERRUPTED:调用interrupt()中断任务执行线程之后状态会从INTERRUPTING转换到INTERRUPTED。这是一个最终状态

    有一点需要注意的是,所有值大于COMPLETING的状态都表示任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消)。

img

上图是FutureTask类的状态迁移图,其中蓝色为初始状态,表示任务尚未启动或者启动后还没有结束;粉色为中间状态,表示短暂的过渡情况;红色为最终状态,表示任务执行结束。

三、FutureTask类的构造方法

  • FutureTask(Callable<V> callable)
1
2
3
4
5
6
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

这个构造函数会把传入的Callable变量保存在callable字段中,该字段定义为private Callable<V> callable;用来保存底层的任务调用,在被执行完成以后会指向null即置空,接着会初始化state字段为NEW。

  • FutureTask(Runnable runnable, V result)
1
2
3
4
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

这个构造函数会把传入的Runnable封装成一个Callable对象保存在callable字段中,同时如果任务执行成功的话就会返回传入的result。这种情况下如果不需要返回值的话可以传入一个null。

顺带看下Executors.callable()这个方法,这个方法的功能是把Runnable转换成Callable,代码如下:

1
2
3
4
5
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

可以看到这里采用的是适配器模式,调用RunnableAdapter<T>(task, result)方法来适配,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}

这个适配器很简单,就是简单的实现了Callable接口,在call()实现中调用Runnable.run()方法,然后把传入的result作为任务的结果返回。

四、FutureTask类的重要方法

1.核心方法 - run()

在new了一个FutureTask对象之后,接下来就是在另一个线程中执行这个Task,无论是通过直接new一个Thread还是通过线程池,执行的都是run()方法,接下来就看看run()方法的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void run() {
// CAS更新runner为当前线程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
// 保存callable对象,防止运行期间cancel(false)把callable对象置空
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行任务并得到计算结果
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 设置异常结果
setException(ex);
}
if (ran)
// 设置执行结果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 任务执行结束,运行线程为null
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
//
int s = state;
if (s >= INTERRUPTING)
// 处理中断逻辑
handlePossibleCancellationInterrupt(s);
}
}

说明:

  • 运行任务,如果任务状态为NEW状态,则利用CAS修改为当前线程,执行完毕调用set(result)方法设置执行结果。
1
2
3
4
5
6
7
8
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 执行完毕,唤醒等待线程
finishCompletion();
}
}
  • 结果设置完毕后,调用finishCompletion()方法唤醒等待线程,因为可能有一些调用FutureTask的get方法阻塞等待执行结果的线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// 移除等待线程
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 自旋遍历等待线程
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 唤醒等待线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
// 遍历结束
if (next == null)
break;
// unlink to help gc
q.next = null;
q = next;
}
break;
}
}
//任务完成后调用函数,自定义扩展
done();
// to reduce footprint
callable = null;
}

再回到run方法,如果在运行期间被中断,此时需要调用handlePossibleCancellationInterrupt方法来处理中断逻辑,确保任何中断(例如cancel(true))只停留在当前run或runAndReset的任务中,源码如下:

1
2
3
4
5
6
private void handlePossibleCancellationInterrupt(int s) {
// 在中断者中断线程之前可能会延迟,所以我们只需要让出CPU时间片自旋等待
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}

2.核心方法 - get()

1
2
3
4
5
6
7
// 获取执行结果
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

说明:FutureTask通过get()方法获取任务执行结果。如果任务处于未完成的状态(state <= COMPLETING),就调用awaitDone方法(后面单独讲解)等待任务完成。任务完成后,通过report方法获取执行结果或抛出执行期间的异常。

1
2
3
4
5
6
7
8
9
10
11
12
// 返回执行结果或抛出异常
private V report(int s) throws ExecutionException {
Object x = outcome;
// 任务正常运行结束会返回执行结果
if (s == NORMAL)
return (V)x;
// 任务已经被取消,抛出CancellationException
if (s >= CANCELLED)
throw new CancellationException();
// 任务异常退出,抛出ExecutionException并把任务内部异常作为原因cause
throw new ExecutionException((Throwable)x);
}

3.核心方法 - awaitDone(boolean timed, long nanos)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 计算等待的截止日期
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 获取并清除当前等待线程的中断状态
if (Thread.interrupted()) {
// 移除等待WaitNode,并抛出InterruptedException
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 任务已经结束,返回任务状态
if (s > COMPLETING) {
if (q != null)
// 置空等待节点的线程,表示不再等待
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
// 创建当前线程的等待链表节点
q = new WaitNode();
else if (!queued)
// 当前线程的等待链表节点插入FutureTask的等待链表头前
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
// 计算剩余的等待时间
nanos = deadline - System.nanoTime();
// 超时则移除等待节点,返回任务状态
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 阻塞当前线程
LockSupport.parkNanos(this, nanos);
}
else
// 阻塞当前线程
LockSupport.park(this);
}
}

说明:awaitDone用于等待任务完成,或任务因为中断或超时而终止,返回任务的完成状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void removeWaiter(WaitNode node) {
if (node != null) {
// 首先置空线程
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
// 依次遍历查找
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
// 后继节点
s = q.next;
// 前序节点
if (q.thread != null)
pred = q;
// 待删除的节点不是头节点,直接删除
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
// 待删除的节点是头节点,利用CAS更新头节点
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
continue retry;
}
break;
}
}
}
  • 如果当前状态为结束状态(state>COMPLETING),则根据需要置空等待节点的线程,并返回FutureTask状态;

  • 如果当前状态为正在完成(COMPLETING),说明此时FutureTask还不能做出超时动作,为任务让出CPU执行时间片;

  • 如果state为NEW,先新建一个WaitNode,然后CAS修改当前waiters;

  • 如果等待超时,则调用removeWaiter移除等待节点,返回任务状态;如果设置了超时时间但是尚未超时,则park阻塞当前线程;

  • 其他情况直接阻塞当前线程。

4.核心方法 - cancel(boolean mayInterruptIfRunning)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public boolean cancel(boolean mayInterruptIfRunning) {
// 如果当前Future状态为NEW,根据传入参数修改FutureTask状态为INTERRUPTING或CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// 如果任务尚未启动或者尚未执行结束,会执行下面的代码
if (mayInterruptIfRunning) {
try {
// 中断执行中的任务线程
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 移除并唤醒所有等待线程
finishCompletion();
}
return true;
}

说明:尝试取消任务。如果任务已经完成或已经被取消,此操作会失败。

5.核心方法 - isCancelled()/isDone()

1
2
3
4
5
6
7
public boolean isCancelled() {
return state >= CANCELLED;
}

public boolean isDone() {
return state != NEW;
}