不会用Java Future,我怀疑你泡茶没我快, 又是超长图文!!

  • 你有一个思想,我有一个思想,咱们交换后,一我的就有两个思想安全

  • If you can NOT explain it simply, you do NOT understand it well enough多线程

现陆续将Demo代码和技术文章整理在一块儿 Github实践精选 ,方便你们阅读查看,本文一样收录在此,以为不错,还请Star并发


前言

建立线程有几种方式?这个问题的答案应该是能够脱口而出的吧ide

  • 继承 Thread 类函数

  • 实现 Runnable 接口高并发

但这两种方式建立的线程是属于”三无产品“:工具

  • 没有参数源码分析

  • 没有返回值优化

  • 没办法抛出异常this

class MyThread implements Runnable{
   @Override
   public void run() {
      log.info("my thread");
   }
}

Runnable 接口是 JDK1.0 的核心产物

 /**
 * @since   JDK1.0
 */
@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

用着 “三无产品” 老是有一些弊端,其中没办法拿到返回值是最让人不能忍的,因而 Callable 就诞生了

Callable

又是 Doug Lea 大师,又是 Java 1.5 这个神奇的版本

 /**
 * @see Executor
 * @since 1.5
 * @author Doug Lea
 * @param <V> the result type of method {@code call}
 */
@FunctionalInterface
public interface Callable<V> {
    
    V call() throws Exception;
}

Callable 是一个泛型接口,里面只有一个 call() 方法,该方法能够返回泛型值 V ,使用起来就像这样:

Callable<String> callable = () -> {
    // Perform some computation
    Thread.sleep(2000);
    return "Return some result";
};

两者都是函数式接口,里面都仅有一个方法,使用上又是如此类似,除了有无返回值,Runnable 与 Callable 就点差异吗?

Runnable VS Callable

两个接口都是用于多线程执行任务的,但他们仍是有很明显的差异的

执行机制

先从执行机制上来看,Runnable 你太清楚了,它既能够用在 Thread 类中,也能够用在 ExecutorService 类中配合线程池的使用;Bu~~~~t,  Callable 只能在 ExecutorService 中使用,你翻遍 Thread 类,也找不到Callable 的身影

异常处理

Runnable 接口中的 run 方法签名上没有 throws ,天然也就没办法向上传播受检异常;而 Callable 的 call() 方法签名却有 throws,因此它能够处理受检异常;

因此概括起来看主要有这几处不一样点:

总体差异虽然不大,可是这点差异,却具备重大意义

返回值和处理异常很好理解,另外,在实际工做中,咱们一般要使用线程池来管理线程(缘由已经在 为何要使用线程池? 中明确说明),因此咱们就来看看 ExecutorService 中是如何使用两者的

ExecutorService

先来看一下 ExecutorService 类图

我将上图标记的方法单独放在此处

void execute(Runnable command);

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

能够看到,使用ExecutorService 的 execute() 方法依旧得不到返回值,而 submit() 方法清一色的返回 Future 类型的返回值

细心的朋友可能已经发现, submit() 方法已经在 CountDownLatch 和 CyclicBarrier 傻傻的分不清楚?文章中屡次使用了,只不过咱们没有获取其返回值罢了,那么

  • Future 究竟是什么呢?

  • 怎么经过它获取返回值呢?

咱们带着这些疑问一点点来看

Future

Future 又是一个接口,里面只有五个方法:

从方法名称上相信你已经能看出这些方法的做用

// 取消任务
boolean cancel(boolean mayInterruptIfRunning);

// 获取任务执行结果
V get() throws InterruptedException, ExecutionException;

// 获取任务执行结果,带有超时时间限制
V get(long timeout, TimeUnit unit) throws InterruptedException,                             ExecutionException,  TimeoutException;

// 判断任务是否已经取消
boolean isCancelled();

// 判断任务是否已经结束
boolean isDone();

铺垫了这么多,看到这你也许有些乱了,我们赶忙看一个例子,演示一下几个方法的做用

@Slf4j
public class FutureAndCallableExample {

   public static void main(String[] args) throws InterruptedException, ExecutionException {
      ExecutorService executorService = Executors.newSingleThreadExecutor();

      // 使用 Callable ,能够获取返回值
      Callable<String> callable = () -> {
         log.info("进入 Callable 的 call 方法");
         // 模拟子线程任务,在此睡眠 2s,
         // 小细节:因为 call 方法会抛出 Exception,这里不用像使用 Runnable 的run 方法那样 try/catch 了
         Thread.sleep(5000);
         return "Hello from Callable";
      };

      log.info("提交 Callable 到线程池");
      Future<String> future = executorService.submit(callable);

      log.info("主线程继续执行");

      log.info("主线程等待获取 Future 结果");
      // Future.get() blocks until the result is available
      String result = future.get();
      log.info("主线程获取到 Future 结果: {}", result);

      executorService.shutdown();
   }
}

程序运行结果以下:

若是你运行上述示例代码,主线程调用 future.get() 方法会阻塞本身,直到子任务完成。咱们也可使用 Future 方法提供的 isDone 方法,它能够用来检查 task 是否已经完成了,咱们将上面程序作点小修改:

// 若是子线程没有结束,则睡眠 1s 从新检查
while(!future.isDone()) {
   System.out.println("Task is still not done...");
   Thread.sleep(1000);
}

来看运行结果:

若是子程序运行时间过长,或者其余缘由,咱们想 cancel 子程序的运行,则咱们可使用 Future 提供的 cancel 方法,继续对程序作一些修改

while(!future.isDone()) {
   System.out.println("子线程任务尚未结束...");
   Thread.sleep(1000);

   double elapsedTimeInSec = (System.nanoTime() - startTime)/1000000000.0;

   // 若是程序运行时间大于 1s,则取消子线程的运行
   if(elapsedTimeInSec > 1) {
      future.cancel(true);
   }
}

来看运行结果:

为何调用 cancel 方法程序会出现 CancellationException 呢?是由于调用 get() 方法时,明确说明了:

调用 get() 方法时,若是计算结果被取消了,则抛出 CancellationException (具体缘由,你会在下面的源码分析中看到)

有异常不处理是很是不专业的,因此咱们须要进一步修改程序,以更友好的方式处理异常

// 经过 isCancelled 方法判断程序是否被取消,若是被取消,则打印日志,若是没被取消,则正常调用 get() 方法
if (!future.isCancelled()){
   log.info("子线程任务已完成");
   String result = future.get();
   log.info("主线程获取到 Future 结果: {}", result);
}else {
   log.warn("子线程任务被取消");
}

查看程序运行结果:

相信到这里你已经对 Future 的几个方法有了基本的使用印象,但 Future 是接口,其实使用 ExecutorService.submit() 方法返回的一直都是 Future 的实现类 FutureTask

接下来咱们就进入这个核心实现类一探究竟

FutureTask

一样先来看类结构

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

很神奇的一个接口,FutureTask 实现了 RunnableFuture 接口,而  RunnableFuture  接口又分别实现了 RunnableFuture 接口,因此能够推断出 FutureTask 具备这两种接口的特性:

  • Runnable 特性,因此能够用在 ExecutorService 中配合线程池使用

  • Future 特性,因此能够从中获取到执行结果

FutureTask源码分析

若是你完整的看过 AQS 相关分析的文章,你也许会发现,阅读 Java 并发工具类源码,咱们无非就是要关注如下这三点:

- 状态 (代码逻辑的主要控制)
- 队列 (等待排队队列)
- CAS (安全的set 值)

脑海中牢记这三点,我们开始看 FutureTask 源码,看一下它是如何围绕这三点实现相应的逻辑的

文章开头已经提到,实现 Runnable 接口形式建立的线程并不能获取到返回值,而实现 Callable 的才能够,因此 FutureTask 想要获取返回值,一定是和 Callable 有联系的,这个推断一点都没错,从构造方法中就能够看出来:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

即使在 FutureTask 构造方法中传入的是 Runnable 形式的线程,该构造方法也会经过 Executors.callable 工厂方法将其转换为 Callable 类型:

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

可是 FutureTask 实现的是 Runnable 接口,也就是只能重写 run() 方法,run() 方法又没有返回值,那问题来了:

  • FutureTask 是怎样在 run() 方法中获取返回值的?

  • 它将返回值放到哪里了?

  • get() 方法又是怎样拿到这个返回值的呢?

咱们来看一下 run() 方法(关键代码都已标记注释)

public void run() {
   // 若是状态不是 NEW,说明任务已经执行过或者已经被取消,直接返回
   // 若是状态是 NEW,则尝试把执行线程保存在 runnerOffset(runner字段),若是赋值失败,则直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
       // 获取构造函数传入的 Callable 值
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
               // 正常调用 Callable 的 call 方法就能够获取到返回值
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
               // 保存 call 方法抛出的异常
                setException(ex);
            }
            if (ran)
               // 保存 call 方法的执行结果
                set(result);
        }
    } finally {        
        runner = null;       
        int s = state;
       // 若是任务被中断,则执行中断处理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

run() 方法没有返回值,至于 run() 方法是如何将 call() 方法的返回结果和异常都保存起来的呢?其实很是简单, 就是经过 set(result) 保存正常程序运行结果,或经过 setException(ex) 保存程序异常信息

/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes

// 保存异常结果
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

// 保存正常结果
protected void set(V v) {
  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    outcome = v;
    UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    finishCompletion();
  }
}

setExceptionset 方法很是类似,都是将异常或者结果保存在 Object 类型的 outcome 变量中,outcome 是成员变量,就要考虑线程安全,因此他们要经过 CAS方式设置 outcome 变量的值,既然是在 CAS 成功后 更改 outcome 的值,这也就是 outcome 没有被 volatile 修饰的缘由所在。

保存正常结果值(set方法)与保存异常结果值(setException方法)两个方法代码逻辑,惟一的不一样就是 CAS 传入的 state 不一样。咱们上面提到,state 多数用于控制代码逻辑,FutureTask 也是这样,因此要搞清代码逻辑,咱们须要先对 state 的状态变化有所了解

 /*
 *
 * Possible state transitions:
 * NEW -> COMPLETING -> NORMAL  //执行过程顺利完成
 * NEW -> COMPLETING -> EXCEPTIONAL //执行过程出现异常
 * NEW -> CANCELLED // 执行过程当中被取消
 * NEW -> INTERRUPTING -> INTERRUPTED //执行过程当中,线程被中断
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

7种状态,千万别慌,整个状态流转其实只有四种线路

FutureTask 对象被建立出来,state 的状态就是 NEW 状态,从上面的构造函数中你应该已经发现了,四个最终状态 NORMAL ,EXCEPTIONAL , CANCELLED , INTERRUPTED 也都很好理解,两个中间状态稍稍有点让人困惑:

  • COMPLETING: outcome 正在被set 值的时候

  • INTERRUPTING:经过 cancel(true) 方法正在中断线程的时候

总的来讲,这两个中间状态都表示一种瞬时状态,咱们将几种状态图形化展现一下:

咱们知道了 run() 方法是如何保存结果的,以及知道了将正常结果/异常结果保存到了 outcome 变量里,那就须要看一下 FutureTask 是如何经过 get() 方法获取结果的:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
   // 若是 state 还没到 set outcome 结果的时候,则调用 awaitDone() 方法阻塞本身
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
   // 返回结果
    return report(s);
}

awaitDone 方法是 FutureTask 最核心的一个方法

// get 方法支持超时限制,若是没有传入超时时间,则接受的参数是 false 和 0L
// 有等待就会有队列排队或者可响应中断,从方法签名上看有 InterruptedException,说明该方法这是能够被中断的
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
   // 计算等待截止时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
       // 若是当前线程被中断,若是是,则在等待对立中删除该节点,并抛出 InterruptedException
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
       // 状态大于 COMPLETING 说明已经达到某个最终状态(正常结束/异常结束/取消)
       // 把 thread 只为空,并返回结果
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
       // 若是是COMPLETING 状态(中间状态),表示任务已结束,但 outcome 赋值还没结束,这时主动让出执行权,让其余线程优先执行(只是发出这个信号,至因而否别的线程执行必定会执行但是不必定的)
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
       // 等待节点为空
        else if (q == null)
           // 将当前线程构造节点
            q = new WaitNode();
       // 若是尚未入队列,则把当前节点加入waiters首节点并替换原来waiters
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
       // 若是设置超时时间
        else if (timed) {
            nanos = deadline - System.nanoTime();
           // 时间到,则再也不等待结果
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
           // 阻塞等待特定时间
            LockSupport.parkNanos(this, nanos);
        }
        else
           // 挂起当前线程,知道被其余线程唤醒
            LockSupport.park(this);
    }
}

总的来讲,进入这个方法,一般会经历三轮循环

  1. 第一轮for循环,执行的逻辑是 q == null, 这时候会新建一个节点 q, 第一轮循环结束。

  2. 第二轮for循环,执行的逻辑是 !queue,这个时候会把第一轮循环中生成的节点的 next 指针指向waiters,而后CAS的把节点q 替换waiters, 也就是把新生成的节点添加到waiters 中的首节点。若是替换成功,queued=true。第二轮循环结束。

  3. 第三轮for循环,进行阻塞等待。要么阻塞特定时间,要么一直阻塞知道被其余线程唤醒。

对于第二轮循环,你们可能稍稍有点迷糊,咱们前面说过,有阻塞,就会排队,有排队天然就有队列,FutureTask 内部一样维护了一个队列

/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

说是等待队列,其实就是一个 Treiber 类型 stack,既然是 stack, 那就像手枪的弹夹同样(脑补一会儿弹放入弹夹的情形),后进先出,因此刚刚说的第二轮循环,会把新生成的节点添加到 waiters stack 的首节点

若是程序运行正常,一般调用 get() 方法,会将当前线程挂起,那谁来唤醒呢?天然是 run() 方法运行完会唤醒,设置返回结果(set方法)/异常的方法(setException方法) 两个方法中都会调用 finishCompletion 方法,该方法就会唤醒等待队列中的线程

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                   // 唤醒等待队列中的线程
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

将一个任务的状态设置成终止态只有三种方法:

  • set

  • setException

  • cancel

前两种方法已经分析完,接下来咱们就看一下 cancel 方法

查看 Future cancel(),该方法注释上明确说明三种 cancel 操做必定失败的情形

  1. 任务已经执行完成了

  2. 任务已经被取消过了

  3. 任务由于某种缘由不能被取消

其它状况下,cancel操做将返回true。值得注意的是,cancel操做返回 true 并不表明任务真的就是被取消, 这取决于发动cancel状态时,任务所处的状态

  • 若是发起cancel时任务尚未开始运行,则随后任务就不会被执行;

  • 若是发起cancel时任务已经在运行了,则这时就须要看 mayInterruptIfRunning 参数了:

    • 若是mayInterruptIfRunning 为true, 则当前在执行的任务会被中断

    • 若是mayInterruptIfRunning 为false, 则能够容许正在执行的任务继续运行,直到它执行完

有了这些铺垫,看一下 cancel 代码的逻辑就秒懂了

public boolean cancel(boolean mayInterruptIfRunning) {
  
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
       // 须要中断任务执行线程
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
               // 中断线程
                if (t != null)
                    t.interrupt();
            } finally { // final state
               // 修改成最终状态 INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
       // 唤醒等待中的线程
        finishCompletion();
    }
    return true;
}

核心方法终于分析完了,到这我们喝口茶休息一下吧

我是想说,使用 FutureTask 来演练烧水泡茶经典程序

如上图:

  • 洗水壶 1 分钟

  • 烧开水 15 分钟

  • 洗茶壶 1 分钟

  • 洗茶杯 1 分钟

  • 拿茶叶 2 分钟

最终泡茶

让我心算一下,若是串行总共须要 20 分钟,但很显然在烧开水期间,咱们能够洗茶壶/洗茶杯/拿茶叶

这样总共须要 16 分钟,节约了 4分钟时间,烧水泡茶尚且如此,在如今高并发的时代,4分钟能够作的事太多了,学会使用 Future 优化程序是必然(其实优化程序就是寻找关键路径,关键路径找到了,非关键路径的任务一般就能够和关键路径的内容并行执行了

@Slf4j
public class MakeTeaExample {

   public static void main(String[] args) throws ExecutionException, InterruptedException {
      ExecutorService executorService = Executors.newFixedThreadPool(2);

      // 建立线程1的FutureTask
      FutureTask<String> ft1 = new FutureTask<String>(new T1Task());
      // 建立线程2的FutureTask
      FutureTask<String> ft2 = new FutureTask<String>(new T2Task());

      executorService.submit(ft1);
      executorService.submit(ft2);

      log.info(ft1.get() + ft2.get());
      log.info("开始泡茶");

      executorService.shutdown();
   }

   static class T1Task implements Callable<String> {

      @Override
      public String call() throws Exception {
         log.info("T1:洗水壶...");
         TimeUnit.SECONDS.sleep(1);

         log.info("T1:烧开水...");
         TimeUnit.SECONDS.sleep(15);

         return "T1:开水已备好";
      }
   }

   static class T2Task implements Callable<String> {
      @Override
      public String call() throws Exception {
         log.info("T2:洗茶壶...");
         TimeUnit.SECONDS.sleep(1);

         log.info("T2:洗茶杯...");
         TimeUnit.SECONDS.sleep(2);

         log.info("T2:拿茶叶...");
         TimeUnit.SECONDS.sleep(1);
         return "T2:福鼎白茶拿到了";
      }
   }
}

上面的程序是主线程等待两个 FutureTask 的执行结果,线程1 烧开水时间更长,线程1但愿在水烧开的那一刹那就能够拿到茶叶直接泡茶,怎么半呢?

那只须要在线程 1 的FutureTask 中获取 线程 2 FutureTask 的返回结果就能够了,咱们稍稍修改一下程序:

@Slf4j
public class MakeTeaExample1 {

   public static void main(String[] args) throws ExecutionException, InterruptedException {
      ExecutorService executorService = Executors.newFixedThreadPool(2);

      // 建立线程2的FutureTask
      FutureTask<String> ft2 = new FutureTask<String>(new T2Task());
      // 建立线程1的FutureTask
      FutureTask<String> ft1 = new FutureTask<String>(new T1Task(ft2));
      
      executorService.submit(ft1);
      executorService.submit(ft2);

      executorService.shutdown();
   }

   static class T1Task implements Callable<String> {

      private FutureTask<String> ft2;
      public T1Task(FutureTask<String> ft2) {
         this.ft2 = ft2;
      }

      @Override
      public String call() throws Exception {
         log.info("T1:洗水壶...");
         TimeUnit.SECONDS.sleep(1);

         log.info("T1:烧开水...");
         TimeUnit.SECONDS.sleep(15);

         String t2Result = ft2.get();
         log.info("T1 拿到T2的 {}, 开始泡茶", t2Result);
         return "T1: 上茶!!!";
      }
   }

   static class T2Task implements Callable<String> {
      @Override
      public String call() throws Exception {
         log.info("T2:洗茶壶...");
         TimeUnit.SECONDS.sleep(1);

         log.info("T2:洗茶杯...");
         TimeUnit.SECONDS.sleep(2);

         log.info("T2:拿茶叶...");
         TimeUnit.SECONDS.sleep(1);
         return "福鼎白茶";
      }
   }
}

来看程序运行结果:

知道这个变化后咱们再回头看 ExecutorService 的三个 submit 方法:

<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> Future<T> submit(Callable<T> task);

第一种方法,逐层代码查看到这里:

你会发现,和咱们改造烧水泡茶的程序思惟是类似的,能够传进去一个 result,result 至关于主线程和子线程之间的桥梁,经过它主子线程能够共享数据

第二个方法参数是 Runnable 类型参数,即使调用 get() 方法也是返回 null,因此仅是能够用来断言任务已经结束了,相似 Thread.join()

第三个方法参数是 Callable 类型参数,经过get() 方法能够明确获取 call() 方法的返回值

到这里,关于 Future 的整块讲解就结束了,文字比较多仍是须要你去消化一下的。

往期推荐:

原来这就是Java代码生成器的原理啊,太简单了

2020-07-09

Spring注解@Import实现多模块中Bean的导入

2020-07-08

Spring Boot读取配置属性的经常使用方式

2020-07-06

欢迎 点赞 再看和转发 ↓↓