Welcome everyone

FutureTask源码阅读

java 汪明鑫 1208浏览 0评论

FutureTask实现了RunnableFuture,RunnableFuture继承了Runnable和Future

可以理解FutureTask是一个异步的执行任务

包含任务本身,还有任务执行的结果,执行结果会存在FutureTask里,什么时候需要再去拿

先看一个FutureTask的小demo,作为任务传递又可以从中获取结果

public class FutureTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        FutureTask futureTask = new FutureTask(new CallableTask());
        new Thread(futureTask).start();

        System.out.println(futureTask.get(3500, TimeUnit.MILLISECONDS));
    }

    static class CallableTask implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            Thread.sleep(3000);
            return 1;
        }
    }
}








看一个类,先看成员变量

核心任务状态

注释中也描述了任务状态的流转
Runnable 执行入口是run, 那FutureTask的执行入口也是run,我们先来看下run 方法
public void run() {
         // run是futureTask执行入口
         // 状态必须是new, cas修改runner为当前执行的线程
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;

            // 这里在 double check 下任务状态
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 执行任务
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    // 异常情况
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    // 设置结果到outcome
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()

            // 干掉runner
            runner = null;

            // 是否被中断
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

我们看到异常情况会setException

protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;

            // 这段代码不会重排序 但不保证可见性
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

其实就是把异常信息放到outcome

finishCompletion主要是完成任务,通过LockSupport唤醒所有等待线程

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {

            // 如果有别的线程在等待当前任务完成,那么将他们唤醒
            // CAS保证可见性
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;

                        // 唤醒
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;

                    // 断链
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        // 钩子函数
        done();

        // 任务完成,置空
        callable = null;        

正常执行完走的代码也一样

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

其他线程可以调用cancel取消任务执行

public boolean cancel(boolean mayInterruptIfRunning) {

        // NEW 没有执行完成
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception

            // 前面的CAS修改状态,保证只有一个线程会走到这里,且任务是初始状态或者没执行完
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        // 中断线程
                        t.interrupt();
                } finally { // final state
                    // 修改状态 INTERRUPTING -> INTERRUPTED
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

最后来看下get

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

awaitDone 主要是任务没执行完的话当然没结果,那就通过LockSupport阻塞线程

获取结果

private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

FutureTask源码就先看到这里啦 =-=

转载请注明:汪明鑫的个人博客 » FutureTask源码阅读

喜欢 (1)

说点什么

您将是第一位评论人!

提醒
avatar
wpDiscuz