Callable和Future
Callable
Callable与Runnable的功能大致相似,Callable中有一个call()函数, 但是call()函数有返回值 ,而Runnable的run()函数不能将结果返回给客户程序。Callable的声明如下 :
@FunctionalInterface public interface Callable { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
可以看到,这是一个泛型接口,call()函数返回的类型就是客户程序传递进来的V类型,且该接口往往与Future结合使用。 Future
package java.util.concurrent; public interface Future { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
关于Future,源码中的注释如下:
Future表示异步计算的结果。 提供了检查计算是否完成、等待其完成以及检索计算结果的方法。 结果只能在计算完成后使用方法get检索,必要时阻塞,直到它准备好。 取消是通过cancel方法执行的。 提供了额外的方法来确定任务是正常完成还是被取消。 一旦计算完成,就不能取消计算。 如果您想使用Future来取消可取消性但不提供可用的结果,您可以声明Future<?>形式的类型并返回null作为底层任务的结果。不难猜测该类的作用是异步进程计算结果。下面我们来看看它是如何工作的,话不多说我们以源码的研究之旅。cancel(boolean)
尝试取消此任务的执行。 如果任务已完成、已被取消或由于某些其他原因无法取消,则此尝试将失败。 如果成功,并且在调用cancel时此任务尚未启动,则此任务不应该运行。 如果任务已经开始,则根据传入的参数确定是否应该中断执行该任务的线程以尝试停止该任务。
此方法返回后,对isDone的后续调用将始终返回true 。 如果此方法返回true ,则对isCancelled的后续调用将始终返回true 。isDone()
如果此任务完成,则返回true 。 完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法都将返回true 。get()
等待计算完成,获取结果。get(long timeout,TimeUnit unit)
在给定时间范围内等待计算完成,获取结果。超过时间还未计算完成将抛出timeout异常。
下面我们来来看看Future一个常用的实现类FutureTask。
FutureTask类实现RunnableFuture接口,RunnableFuture接口继承Runnable和Future接口。到这里我们嗅到了一丝线程的味道,接着往下看。
FutureTask对于任务运行的状态使用volatile关键字去修饰保证了线程的安全性,至于volatile关键字为何能保证线程安全后续文章解答。
同时定义了0、1、2、3、4、5、6七个连续大小的整型变量来表示任务的状态变化,具体代码如下:
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
FutureTask成员变量
/** The underlying callable; nulled out after running */ private Callable callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters;
Callable callable
将要执行的任务
Object outcome
Object类型,表示通过get()方法获取到的结果数据或者异常信息。
volatile Thread runner
运行Callable的线程,运行期间会使用CAS保证线程安全,这里大家只需要知道CAS是Java保证线程安全的一种方式, 后续文章中会深度分析CAS如何保证线程安全。
volatile WaitNode waiters
WaitNode是一个静态内部类,表示等待线程的堆栈中的链表节点,在FutureTask的实现中,会通过CAS结合此堆栈交换任务的运行状 态。
WaitNode的定义如下:
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
类中定义了一个Thread成员变量和指向下一个WaitNode节点的引用。其中 通过构造方法将thread变量设置为当前线程。
FutureTask构造方法
FutureTask(Callable callable)
接收一个Callable变量,同时设置任务状态为NEW
public FutureTask(Callable callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
FutureTask(Runnable runnable, V result)
接受一个Runnable线程变量以及返回结果类型,同时设置任务状态为NEW
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
run
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
如果任务状态state不等于NEW则直接return。如果state等于NEW,则调用UNSAFE.compareAndSwapObject方法执行CAS算法判断当前对象与当前线程是否一致,不一致则直接return。
如果当前任务不等于null且状态等于NEW,则执行call方法同时调用set方法设置返回结果。最后将当前运行线程runner设置为null,如果任务状态>=INTERRUPTING会执行handlePossibleCancellationInterrupt方法将状态为INTERRUPTING的任务线程设置为就绪状态,但该任务线程不会被CPU再次执行调度。因为state没有被重置。
handlePossibleCancellationInterrupt方法
private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let"s spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }
set(V v)
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
CAS算法计算NEW偏移一个变量是否等于COMPLETING(2),是则将返回结果复制给outcome成员变量,同时设置stateOffset=NORMAL(2)为正常状态。
最后调用finishCompletion()方法。
finishCompletion()
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
在finishCompletion()方法中,首先定义一个for循环,循环终止因子为waiters为null,在循环中,判断CAS操作是否成功,如果成功 进行if条件中的逻辑。首先,定义一个for自旋循环,在自旋循环体中,唤醒WaitNode堆栈中的线程,使其运行完成。当WaitNode 堆栈中的线程运行完成后,通过break退出外层for循环。接下来调用done()方法。
done()
默认实现什么也不做,子类可以覆盖此方法以调用完成回调。
get()
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
判断任务状态是否运行中,如果是调用awaitDone()方法等待任务完成。否则调用report方法返回结果。
report(int s)
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
如果任务状态等于NORMAL(3)则返回正常计算结果。如果任务状态大于等于CANCELLED(4)则返回异常结果。
看到这里我们大致对Callable和Future的源码有了大致的了解。但是对于它们二者是怎么与多线程关联的,我们在下一个章节讲解