线程间通讯剖析

CountDownLatch

CountDownLatch适用场景

Java多线程编程中常常会碰到这样一种场景——某个线程须要等待一个或多个线程操做结束(或达到某种状态)才开始执行。好比开发一个并发测试工具时,主线程须要等到全部测试线程均执行完成再开始统计总共耗费的时间,此时能够经过CountDownLatch轻松实现。java

CountDownLatch实例

package com.test.thread;

import java.util.Date;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
  public static void main(String[] args) throws InterruptedException {
    int totalThread = 3;
    long start = System.currentTimeMillis();
    CountDownLatch countDown = new CountDownLatch(totalThread);
    for(int i = 0; i < totalThread; i++) {
      final String threadName = "Thread " + i;
      new Thread(() -> {
        System.out.println(String.format("%s\t%s %s", new Date(), threadName, "started"));
        try {
          Thread.sleep(1000);
        } catch (Exception ex) {
          ex.printStackTrace();
        }
        countDown.countDown();
        System.out.println(String.format("%s\t%s %s", new Date(), threadName, "ended"));
      }).start();;
    }
    countDown.await();
    long stop = System.currentTimeMillis();
    System.out.println(String.format("Total time : %sms", (stop - start)));
  }
}

执行结果编程

Sun Jun 19 20:34:31 CST 2016  Thread 1 started
Sun Jun 19 20:34:31 CST 2016  Thread 0 started
Sun Jun 19 20:34:31 CST 2016  Thread 2 started
Sun Jun 19 20:34:32 CST 2016  Thread 2 ended
Sun Jun 19 20:34:32 CST 2016  Thread 1 ended
Sun Jun 19 20:34:32 CST 2016  Thread 0 ended
Total time : 1072ms

能够看到,主线程等待全部3个线程都执行结束后才开始执行。安全

CountDownLatch主要接口分析

CountDownLatch工做原理相对简单,能够简单当作一个倒计时器,在构造方法中指定初始值,每次调用countDown()方法时讲计数器减1,而await()会等待计数器变为0。CountDownLatch关键接口以下多线程

  • countDown() 若是当前计数器的值大于1,则将其减1;若当前值为1,则将其置为0并唤醒全部经过await等待的线程;若当前值为0,则什么也不作直接返回。
  • await() 等待计数器的值为0,若计数器的值为0则该方法返回;若等待期间该线程被中断,则抛出InterruptedException并清除该线程的中断状态。
  • await(long timeout, TimeUnit unit) 在指定的时间内等待计数器的值为0,若在指定时间内计数器的值变为0,则该方法返回true;若指定时间内计数器的值仍未变为0,则返回false;若指定时间内计数器的值变为0以前当前线程被中断,则抛出InterruptedException并清除该线程的中断状态。
  • getCount() 读取当前计数器的值,通常用于调试或者测试。

CyclicBarrier

CyclicBarrier适用场景

在《当咱们说线程安全时,到底在说什么》一文中讲过内存屏障,它能保证屏障以前的代码必定在屏障以后的代码以前被执行。CyclicBarrier能够译为循环屏障,也有相似的功能。CyclicBarrier能够在构造时指定须要在屏障前执行await的个数,全部对await的调用都会等待,只到调用await的次数达到预约指,全部等待都会当即被唤醒。并发

从使用场景上来讲,CyclicBarrier是让多个线程互相等待某一事件的发生,而后同时被唤醒。而上文讲的CountDownLatch是让某一线程等待多个线程的状态,而后该线程被唤醒。ide

CyclicBarrier实例

package com.test.thread;

import java.util.Date;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {

  public static void main(String[] args) {
    int totalThread = 5;
    CyclicBarrier barrier = new CyclicBarrier(totalThread);
    
    for(int i = 0; i < totalThread; i++) {
      String threadName = "Thread " + i;
      new Thread(() -> {
        System.out.println(String.format("%s\t%s %s", new Date(), threadName, " is waiting"));
        try {
          barrier.await();
        } catch (Exception ex) {
          ex.printStackTrace();
        }
        System.out.println(String.format("%s\t%s %s", new Date(), threadName, "ended"));
      }).start();
    }
  }
}

执行结果以下工具

Sun Jun 19 21:04:49 CST 2016  Thread 1  is waiting
Sun Jun 19 21:04:49 CST 2016  Thread 0  is waiting
Sun Jun 19 21:04:49 CST 2016  Thread 3  is waiting
Sun Jun 19 21:04:49 CST 2016  Thread 2  is waiting
Sun Jun 19 21:04:49 CST 2016  Thread 4  is waiting
Sun Jun 19 21:04:49 CST 2016  Thread 4 ended
Sun Jun 19 21:04:49 CST 2016  Thread 0 ended
Sun Jun 19 21:04:49 CST 2016  Thread 2 ended
Sun Jun 19 21:04:49 CST 2016  Thread 1 ended
Sun Jun 19 21:04:49 CST 2016  Thread 3 ended

从执行结果能够看到,每一个线程都不会在其它全部线程执行await()方法前继续执行,而等全部线程都执行await()方法后全部线程的等待都被唤醒从而继续执行。测试

CyclicBarrier主要接口分析

CyclicBarrier提供的关键方法以下spa

  • await() 等待其它参与方的到来(调用await())。若是当前调用是最后一个调用,则唤醒全部其它的线程的等待而且若是在构造CyclicBarrier时指定了action,当前线程会去执行该action,而后该方法返回该线程调用await的次序(getParties()-1说明该线程是第一个调用await的,0说明该线程是最后一个执行await的),接着该线程继续执行await后的代码;若是该调用不是最后一个调用,则阻塞等待;若是等待过程当中,当前线程被中断,则抛出InterruptedException;若是等待过程当中,其它等待的线程被中断,或者其它线程等待超时,或者该barrier被reset,或者当前线程在执行barrier构造时注册的action时由于抛出异常而失败,则抛出BrokenBarrierException
  • await(long timeout, TimeUnit unit) 与await()惟一的不一样点在于设置了等待超时时间,等待超时时会抛出TimeoutException
  • reset() 该方法会将该barrier重置为它的初始状态,并使得全部对该barrier的await调用抛出BrokenBarrierException

Phaser

Phaser适用场景

CountDownLatch和CyclicBarrier都是JDK 1.5引入的,而Phaser是JDK 1.7引入的。Phaser的功能与CountDownLatch和CyclicBarrier有部分重叠,同时也提供了更丰富的语义和更灵活的用法。线程

Phaser顾名思义,与阶段相关。Phaser比较适合这样一种场景,一种任务能够分为多个阶段,现但愿多个线程去处理该批任务,对于每一个阶段,多个线程能够并发进行,可是但愿保证只有前面一个阶段的任务完成以后才能开始后面的任务。这种场景可使用多个CyclicBarrier来实现,每一个CyclicBarrier负责等待一个阶段的任务所有完成。可是使用CyclicBarrier的缺点在于,须要明确知道总共有多少个阶段,同时并行的任务数须要提早预约义好,且没法动态修改。而Phaser可同时解决这两个问题。

Phaser实例

public class PhaserDemo {

  public static void main(String[] args) throws IOException {
    int parties = 3;
    int phases = 4;
    final Phaser phaser = new Phaser(parties) {
      @Override  
      protected boolean onAdvance(int phase, int registeredParties) {  
          System.out.println("====== Phase : " + phase + " ======");  
          return registeredParties == 0;  
      }  
    };
    
    for(int i = 0; i < parties; i++) {
      int threadId = i;
      Thread thread = new Thread(() -> {
        for(int phase = 0; phase < phases; phase++) {
          System.out.println(String.format("Thread %s, phase %s", threadId, phase));
          phaser.arriveAndAwaitAdvance();
        }
      });
      thread.start();
    }
  }
}

执行结果以下

Thread 0, phase 0
Thread 1, phase 0
Thread 2, phase 0
====== Phase : 0 ======
Thread 2, phase 1
Thread 0, phase 1
Thread 1, phase 1
====== Phase : 1 ======
Thread 1, phase 2
Thread 2, phase 2
Thread 0, phase 2
====== Phase : 2 ======
Thread 0, phase 3
Thread 1, phase 3
Thread 2, phase 3
====== Phase : 3 ======

从上面的结果能够看到,多个线程必须等到其它线程的同一阶段的任务所有完成才能进行到下一个阶段,而且每当完成某一阶段任务时,Phaser都会执行其onAdvance方法。

Phaser主要接口分析

Phaser主要接口以下

  • arriveAndAwaitAdvance() 当前线程当前阶段执行完毕,等待其它线程完成当前阶段。若是当前线程是该阶段最后一个未到达的,则该方法直接返回下一个阶段的序号(阶段序号从0开始),同时其它线程的该方法也返回下一个阶段的序号。
  • arriveAndDeregister() 该方法当即返回下一阶段的序号,而且其它线程须要等待的个数减一,而且把当前线程从以后须要等待的成员中移除。若是该Phaser是另一个Phaser的子Phaser(层次化Phaser会在后文中讲到),而且该操做致使当前Phaser的成员数为0,则该操做也会将当前Phaser从其父Phaser中移除。
  • arrive() 该方法不做任何等待,直接返回下一阶段的序号。
  • awaitAdvance(int phase) 该方法等待某一阶段执行完毕。若是当前阶段不等于指定的阶段或者该Phaser已经被终止,则当即返回。该阶段数通常由arrive()方法或者arriveAndDeregister()方法返回。返回下一阶段的序号,或者返回参数指定的值(若是该参数为负数),或者直接返回当前阶段序号(若是当前Phaser已经被终止)。
  • awaitAdvanceInterruptibly(int phase) 效果与awaitAdvance(int phase)至关,惟一的不一样在于若该线程在该方法等待时被中断,则该方法抛出InterruptedException
  • awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) 效果与awaitAdvanceInterruptibly(int phase)至关,区别在于若是超时则抛出TimeoutException
  • bulkRegister(int parties) 注册多个party。若是当前phaser已经被终止,则该方法无效,并返回负数。若是调用该方法时,onAdvance方法正在执行,则该方法等待其执行完毕。若是该Phaser有父Phaser则指定的party数大于0,且以前该Phaser的party数为0,那么该Phaser会被注册到其父Phaser中。
  • forceTermination() 强制让该Phaser进入终止状态。已经注册的party数不受影响。若是该Phaser有子Phaser,则其全部的子Phaser均进入终止状态。若是该Phaser已经处于终止状态,该方法调用不形成任何影响。