Java 多线程线程池-ThreadPoolExecutor分析

0.合理使用线程池的好处

下降资源消耗--经过重复利用已建立的线程,下降线程的建立和销毁形成的消耗
提升响应速度-当任务到达时,任务能够不用等待线程建立就能当即执行

提升线程的可管理性-线程是稀缺资源,若是无限制的建立线程,不只会消耗系统的资源,还会下降系统的稳java

定性,使线程能够进行统一的分配、调优和监控数组

 

 

 

 

1.线程池状态

2.线程池ThreadPoolExecutor类主要成员变量的含义

成员变量 含义
private final BlockingQueue<Runnable> workQueue 任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock(); 线程池状态发生改变,都要使用这个锁
private final HashSet<Worker> workers = new HashSet<Worker>(); 用来存放工做集
private int largestPoolSize; 线程池中曾经存在过的最大线程数
private long completedTaskCount; 线程池已经完成的全部任务记录数
private volatile ThreadFactory threadFactory; 线程工厂,用来建立线程
private volatile RejectedExecutionHandler handler; 任务拒绝策略
private volatile long keepAliveTime;

当线程数大于corePoolSize时,缓存

线程池中线程的最大存活时间ide

private volatile int corePoolSize;

核心线程池大小,当线程池中的线程数量大于corePoolSize时this

会把任务放到任务缓存队列中去atom

private volatile int maximumPoolSize;

线程池最大容忍的线程大小,即当任务缓存队列已满来的新任务会建立新的线程去处理spa

private volatile boolean allowCoreThreadTimeOut; 是否容许核心线程设置存活时间

 

3.线程池的执行流程

 

4.线程池的任务缓存队列以及排队策略

类型 做用
ArrayBlockingQueue 基于数组的先进先出队列,在使用的时候必须指定大小
LinkedBlockingQueue 基于链表的先进先出队列,若是没有指定大小,默认Integer.MAX_VALUE
synchronousQueue 不会保存队列,直接建立新的线程去执行任务

 

 

 

 

 

5.线程池的拒绝策略

策略 说明
ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常
ThreadPoolExecutor.DiscardPolicy 也是丢弃任务,可是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,而后从新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy 调用线程处理该任务

 

6.线程池的关闭

方法 说明
shutdown()

不会当即终止线程池,而是要等全部任务缓存队列中的任务都执行完后才终止,但不再会接受新的任务线程

shutdownNow() 当即终止线程池,并尝试打断正在执行的任务,而且清空任务缓存队列,返回还没有执行的任务

 

注意:code

  1) Since1.6 经过方法 public void allowCoreThreadTimeOut(boolean value) 设置核心线程容许设置存货时间blog

  2) 经过方法 public void setRejectedExecutionHandler(RejectedExecutionHandler handler)设置拒绝策略

7.自定义线程池

package com.roger.threadpool;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomThreadPoolExecutor {

    private ThreadPoolExecutor threadPoolExecutor;


    /**
     * 线程池的初始化
     *   核心线程数 10
     *   最大线程数 30
     *   线程空闲时间 30分钟
     *   有界队列
     *   自定义线程工厂类
     *   自定义拒绝策略:
     *      1) 当提交的任务超过  maximumPoolSie + 队列的容量时
     *      2) 线程池中没有空闲线程
     *   此时:会把超过的任务交给这个类进行处理
     */
    public void init(){
        threadPoolExecutor = new ThreadPoolExecutor(
                10,
                30,
                30,
                TimeUnit.MINUTES,
                new ArrayBlockingQueue<Runnable>(10),
                new CustomThreadFacotry(),
                new CustomRejectedExceptionHandler()
        );
    }

    public ExecutorService getCustomThreadPoolExecutor(){
        return this.threadPoolExecutor;
    }

    public void destory(){
        if(threadPoolExecutor != null){
            //等待全部正在执行线程执行完,才会终止
            threadPoolExecutor.shutdown();
        }
    }

    private class CustomThreadFacotry implements ThreadFactory {

        private AtomicInteger count = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            String threadName = CustomThreadPoolExecutor.class.getSimpleName() + "-" + count.getAndIncrement();
            thread.setName(threadName);
            return thread;
        }
    }

    private class CustomRejectedExceptionHandler implements RejectedExecutionHandler{

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                // 核心改造点,由blockingqueue的offer改为put阻塞方法
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
package com.roger.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolMain {

    public static void main(String[] args) {
        CustomThreadPoolExecutor customThreadPoolExecutor = new CustomThreadPoolExecutor();
        customThreadPoolExecutor.init();


        ExecutorService executorService = customThreadPoolExecutor.getCustomThreadPoolExecutor();

        for (int i =0; i < 100; i ++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + " Task running >>>>  ");
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}