FutureTask实现了RunnableFuture,RunnableFuture继承了Runnable和Future
可以理解FutureTask是一个异步的执行任务
包含任务本身,还有任务执行的结果,执行结果会存在FutureTask里,什么时候需要再去拿
先看一个FutureTask的小demo,作为任务传递又可以从中获取结果
public class FutureTest {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
FutureTask futureTask = new FutureTask(new CallableTask());
new Thread(futureTask).start();
System.out.println(futureTask.get(3500, TimeUnit.MILLISECONDS));
}
static class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return 1;
}
}
}
看一个类,先看成员变量
核心任务状态
注释中也描述了任务状态的流转 Runnable 执行入口是run, 那FutureTask的执行入口也是run,我们先来看下run 方法
public void run() {
// run是futureTask执行入口
// 状态必须是new, cas修改runner为当前执行的线程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 这里在 double check 下任务状态
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行任务
result = c.call();
ran = true;
} catch (Throwable ex) {
// 异常情况
result = null;
ran = false;
setException(ex);
}
if (ran)
// 设置结果到outcome
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 干掉runner
runner = null;
// 是否被中断
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
我们看到异常情况会setException
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
// 这段代码不会重排序 但不保证可见性
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
其实就是把异常信息放到outcome
finishCompletion主要是完成任务,通过LockSupport唤醒所有等待线程
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// 如果有别的线程在等待当前任务完成,那么将他们唤醒
// CAS保证可见性
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 唤醒
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
// 断链
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 钩子函数
done();
// 任务完成,置空
callable = null;
正常执行完走的代码也一样
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
其他线程可以调用cancel取消任务执行
public boolean cancel(boolean mayInterruptIfRunning) {
// NEW 没有执行完成
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// 前面的CAS修改状态,保证只有一个线程会走到这里,且任务是初始状态或者没执行完
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
// 中断线程
t.interrupt();
} finally { // final state
// 修改状态 INTERRUPTING -> INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
最后来看下get
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
awaitDone 主要是任务没执行完的话当然没结果,那就通过LockSupport阻塞线程
获取结果
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
FutureTask源码就先看到这里啦 =-=
转载请注明:汪明鑫的个人博客 » FutureTask源码阅读
说点什么
您将是第一位评论人!