Java多线程:线程池

1、 背景

线程是稀缺资源,若是无限制的建立,不只会消耗系统资源,还会下降系统的稳定性,合理的使用线程池能够对线程进行统一的分配、调优和监控,并有如下好处:
    第一:下降资源消耗。经过重复利用已建立的线程下降线程建立和销毁形成的消耗。
    第二:提升响应速度。当任务到达时,任务能够不须要等到线程建立就能当即执行。
    第三:提升线程的可管理性。
 

2、线程池的架构

3、Executors

    用于建立线程池, 包含五种建立线程池的方法

newFixedThreadPool(固定大小线程池)

   
    初始化一个定长线程数的线程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene做为阻塞队列,不过当线程池没有可执行任务时,也不会释放线程,超出的线程会在队列中等待
 
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }
 

newCachedThreadPool(无界线程池,能够进行自动线程回收)

    一、初始化一个能够缓存线程的线程池,默认缓存60s,使用SynchronousQueue做为阻塞队列;
    二、newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,若是没有空闲线程,则建立新线程执行任务,会致使必定的系统开销;
因此,该方法返回的线程池是没有线程上限的,在使用时必定要小心,由于没有办法控制整体的线程数量,而每一个线程都是消耗内存的,这可能会致使过多的内存被占用。建议尽可能不要用这个方法返回的线程池,而要使用有上限的线程池
 
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 
 
 

newSingleThreadExecutor(单个后台线程)

   
    初始化的线程池中只有一个线程,若是该线程异常结束,会从新建立一个新的线程继续执行任务,惟一的线程能够保证所提交任务的顺序执行,内部使用LinkedBlockingQueue做为阻塞队列。
 
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 
 
 

newScheduledThreadPool

  建立一个固定长度的线程池,并且以延迟或定时的方式来执行任务。
 
经过如上配置的线程池的建立方法源代码,咱们能够发现:
   1. 除了CachedThreadPool使用的是直接提交策略的缓冲队列之外,其他两个采用的都是无界缓冲队列
   2. 三个线程池采用的ThreadPoolExecutor构造方法都是同一个,使用的都是默认的ThreadFactory和handler:
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
 

4、ExecutorService任务周期管理接口

    Executor的实现一般都会建立线程来执行任务,可是使用异步方式来执行任务时,因为以前提交任务的状态不是当即可见的,因此若是要关闭应用程序时,就须要将受影响的任务状态反馈给应用程序。
    为了解决执行服务的生命周期问题,Executor扩展了EecutorService接口,添加了一些用于生命周期管理的方法。以下:
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 省略部分方法 
}

submit() 与execute()区别数据库

 

一、接收的参数不同 缓存

  submit()能够接受runnable无返回值和callable有返回值 
  execute()接受runnable 无返回值多线程

 

二、submit有返回值,而execute没有架构

 

  Method submit extends base method Executor.execute by creating and returning a Future that can be used to cancel execution and/or wait for completion.app

 

  用到返回值的例子,好比说我有不少个作validation的task,我但愿全部的task执行完,而后每一个task告诉我它的执行结果,是成功仍是失败,若是是失败,缘由是什么。dom

 

三、submit方便Exception处理异步

 

  There is a difference when looking at exception handling. If your tasks throws an exception and if it was submitted with execute this exception will go to the uncaught exception handler (when you don’t have provided one explicitly, the default one will just print the stack trace to System.err). If you submitted the task with submit any thrown exception, checked or not, is then part of the task’s return status. For a task that was submitted with submit and that terminates with an exception, the Future.get will rethrow this exception, wrapped in an ExecutionException.ide

 

  意思就是若是你在你的task里会抛出checked或者unchecked exception,而你又但愿外面的调用者可以感知这些exception并作出及时的处理,那么就须要用到submit,经过捕获Future.get抛出的异常。this

public class ExecutorServiceTest {  
    public static void main(String[] args) {  
        ExecutorService executorService = Executors.newCachedThreadPool();  
        List<Future<String>> resultList = new ArrayList<Future<String>>();  

        // 建立10个任务并执行  
        for (int i = 0; i < 10; i++) {  
            // 使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中  
            Future<String> future = executorService.submit(new TaskWithResult(i));  
            // 将任务执行结果存储到List中  
            resultList.add(future);  
        }  
        executorService.shutdown();  

        // 遍历任务的结果  
        for (Future<String> fs : resultList) {  
            try {  
                System.out.println(fs.get()); // 打印各个线程(任务)执行的结果  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            } catch (ExecutionException e) {                    
                e.printStackTrace();    
            }  finally {
        executorService.shutdownNow();
       }
        }  
    }  
}  

class TaskWithResult implements Callable<String> {  
    private int id;  

    public TaskWithResult(int id) {  
        this.id = id;  
    }  

    /** 
     * 任务的具体过程,一旦任务传给ExecutorService的submit方法,则该方法自动在一个线程上执行。 
     *  
     * @return 
     * @throws Exception 
     */  
    public String call() throws Exception {  
        System.out.println("call()方法被自动调用,干活!!!             " + Thread.currentThread().getName());  
        if (new Random().nextBoolean())  
            throw new TaskException("Meet error in task." + Thread.currentThread().getName());  
        // 一个模拟耗时的操做  
        for (int i = 999999999; i > 0; i--)  
            ;  
        return "call()方法被自动调用,任务的结果是:" + id + "    " + Thread.currentThread().getName();  
    }  
}  

class TaskException extends Exception {  
    public TaskException(String message) {  
        super(message);  
    }  
}

 

 

5、ThreadPoolExecutor提交任务流程

线程池的主要工做流程以下图:
 
 
 
类中定义的重要变量,以下:
 
  1. private final BlockingQueue<Runnable> workQueue;              // 阻塞队列 
  2. private final ReentrantLock mainLock = new ReentrantLock();   // 互斥锁 
  3. private final HashSet<Worker> workers = new HashSet<Worker>();// 线程集合.一个Worker对应一个线程 
  4. private final Condition termination = mainLock.newCondition();// 终止条件 
  5. private int largestPoolSize;           // 线程池中线程数量曾经达到过的最大值。 
  6. private long completedTaskCount;       // 已完成任务数量 
  7. private volatile ThreadFactory threadFactory;     // ThreadFactory对象,用于建立线程。 
  8. private volatile RejectedExecutionHandler handler;// 拒绝策略的处理句柄 
  9. private volatile long keepAliveTime;   // 线程池维护线程所容许的空闲时间 
  10. private volatile boolean allowCoreThreadTimeOut; 11. private volatile int corePoolSize;     // 线程池维护线程的最小数量,哪怕是空闲的 
  12. private volatile int maximumPoolSize;  // 线程池维护的最大线程数量

 

其中有几个重要的规则须要说明一下:

一、 corePoolSize与maximumPoolSize  

   线程池将根据 corePoolSize和 maximumPoolSize设置的边界自动调整池大小,当新任务在方法 execute() 中提交时:
  • 若是当前线程池中的线程数目<corePoolSize,则每来一个任务,就会建立一个线程去执行这个任务;
  • 若是当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;当队列满时才建立新线程去处理请求;
  • 若是当前线程池中的线程数目达到maximumPoolSize,即队列已经满了,则经过handler所指定的任务拒绝策略来处理新请求;
  • 若是线程池中的线程数量大于corePoolSize时,而且某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;
 
也就是说,处理任务的优先级为: 
  • 1. 核心线程corePoolSize > 任务队列workQueue > 最大线程maximumPoolSize,若是三者都满了,使用handler处理被拒绝的任务。
  • 2. 当池中的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,若是无请求可处理就自行销毁。
 

二、 workQueue 

线程池所使用的缓冲队列,该缓冲队列的长度决定了可以缓冲的最大数量,缓冲队列有三种通用策略:
   1) 直接提交。SynchronousQueue,它将任务直接提交给线程执行而不保存它们。在此,若是不存在可用于当即运行任务的线程,则试图把任务加入队列将失败,所以会构造一个新的线程。此策略能够避免在处理可能具备内部依赖性的请求集时出现锁。直接提交一般要求无界 maximumPoolSizes 以免拒绝新提交的任务。
   2) 无界队列。使用无界队列将致使在全部 corePoolSize 线程都忙时新任务在队列中等待。这样,建立的线程就不会超过 corePoolSize。(所以,maximumPoolSize 的值也就无效了。)当每一个任务彻底独立于其余任务,即任务执行互不影响时,适合于使用无界队列;
   3) 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,可是可能较难调整和控制。队列大小和最大池大小可能须要相互折衷:使用大型队列和小型池能够最大限度地下降 CPU 使用率、操做系统资源和上下文切换开销,可是可能致使人工下降吞吐量。若是任务频繁阻塞(例如,若是它们是 I/O 边界),则系统可能超过您许可的更多线程安排时间。使用小型队列一般要求较大的池大小,CPU 使用率较高,可是可能遇到不可接受的调度开销,这样也会下降吞吐量.

三、ThreadFactory  

    使用 ThreadFactory 建立新线程。经过提供不一样的 ThreadFactory,能够改变线程的名称、线程组、优先级、守护进程状态等等。若是从 newThread 返回 null 时 ThreadFactory 未能建立线程,则执行程序将继续运行,但不能执行任何任务。
public interface ThreadFactory {  
    Thread newThread(Runnable r);  
}
  
而构造方法中的threadFactory对象,是经过 Executors.defaultThreadFactory()返回的。 

四、RejectedExecutionHandler   

    当Executor已经关闭(即执行了executorService.shutdown()方法后),而且Executor将有限边界用于最大线程和工做队列容量,且已经饱和时,在方法execute()中提交的新任务将被拒绝.
   在以上述状况下,execute 方法将调用RejectedExecutionHandler.rejectedExecution() 方法。
下面提供了四种预约义的处理程序策略:
       1) AbortPolicy            直接抛出异常 RejectedExecutionException;
    2) CallerRunsPolicy       用调用者所在的线程来执行任务
    3) DiscardPolicy          不能执行的任务将被删除;
    4) DiscardOldestPolicy    若是执行程序还没有关闭,则位于工做队列头部的任务将被删除,而后重试执行程序(若是再次失败,则重复此过程)。

五、keepAliveTime

线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认状况下,该参数只在线程数大于corePoolSize时才有用;

6、线程池的关闭

    经过调用线程池的shutdown或shutdownNow方法来关闭线程池,可是它们的实现原理不一样:
    shutdown只是将线程池的状态设置成SHUTDOWN状态,而后中断全部没有正在执行任务的线程。
    shutdownNow是遍历线程池中的工做线程,而后逐个调用线程的interrupt方法来中断线程,因此没法响应中断的任务可能永远没法终止。shutdownNow会首先将线程池的状态设置成STOP,而后尝试中止全部的正在执行或暂停任务的线程,并返回等待执行任务的列表。
    只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当全部的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于咱们应该调用哪种方法来关闭线程池,应该由提交到线程池的任务特性决定,一般调用shutdown来关闭线程池,若是任务不必定须要执行完,则能够调用shutdownNow。
 

7、线程池的配置

能够从如下几个角度来进行分析:
    1. 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
    2. 任务的优先级:高,中和低。
    3. 任务的执行时间:长,中和短。
    4. 任务的依赖性:是否依赖其余系统资源,如数据库链接。
    CPU密集型任务:配置尽量少的线程数量,如配置Ncpu+1个线程的线程池。
    IO密集型任务:因为须要等待IO操做,线程并非一直在执行任务,则配置尽量多的线程,如2*Ncpu。
    混合型的任务:若是能够拆分,则将其拆分红一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,若是这两个任务执行时间相差太大,则不必进行分解。
    咱们能够经过Runtime.getRuntime().availableProcessors()方法得到当前设备的CPU个数。
    优先级不一样的任务可使用优先级队列PriorityBlockingQueue来处理。它可让优先级高的任务先获得执行,须要注意的是若是一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
    执行时间不一样的任务能够交给不一样规模的线程池来处理,或者也可使用优先级队列,让执行时间短的任务先执行。
    依赖数据库链接池的任务,由于线程提交SQL后须要等待数据库返回结果,若是等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。
    建议使用有界队列,有界队列能增长系统的稳定性和预警能力,能够根据须要设大一点