Executor线程池的简单使用

  咱们都知道建立一个线程能够继承Thread类或者实现Runnable接口,实际Thread类就是实现了Runnable接口。html

  到今天才明白后端线程的做用:咱们能够开启线程去执行一些比较耗时的操做,相似于前台的ajax异步操做,好比说用户上传一个大的文件,咱们能够获取到文件以后开启一个线程去操做该文件,可是能够提早将结果返回去,若是同步处理有可能太耗时,影响系统可用性。java

一、new Thread的弊端

原生的开启线程执行异步任务的方式:ajax

new Thread(new Runnable() {

    @Override
    public void run() {
        // TODO Auto-generated method stub
    }
}).start();

弊端以下:后端

  • 线程生命周期的开销很是高。建立线程都会须要时间,延迟处理的请求,而且须要JVM和操做系统提供一些辅助操做。
  • 资源消耗。活跃的线程会消耗系统资源,尤为是内存。若是可运行的线程数量多于可用处理器的数量,那么有些线程将会闲置。大量空闲的线程会占用许多内存,给GC带来压力,并且大量线程在竞争CPU资源时会产生其余的性能开销。
  •  稳定性。在可建立线程的数量上存在一个限制,这个限制受多个因素的制约,包括JVM的启动参数、Thread构造函数中请求栈的大小以及底层操做系统的限制。若是破坏了这些限制,极可能抛出  outOfMemoryError异常。

  也就是说在必定的范围内增长线程的数量能够提升系统的吞吐率,可是若是超出了这个范围,再建立更多的线程只会下降程序的执行效率甚至致使系统的崩溃。缓存

 例如:并发

 使用线程池:能够了解线程池的用法以及线程池的正确的关闭方法:shutdown以后立刻调用awaitTermination阻塞等待实现同步关闭。框架

package cn.qlq.thread.twenty;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Demo1 {
    private static ExecutorService executorService = Executors.newFixedThreadPool(20);
    private static volatile AtomicInteger atomicInteger = new AtomicInteger(0);

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < 2000; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    atomicInteger.incrementAndGet();
                }
            });
        }
        executorService.shutdown();
        try {
            executorService.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(System.currentTimeMillis() - startTime);
        System.out.println(atomicInteger);
    }
}

结果:less

14
2000异步

 

package cn.qlq.thread.twenty;

import java.util.concurrent.atomic.AtomicInteger;

public class Demo2 {
    private static volatile AtomicInteger atomicInteger = new AtomicInteger(0);

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < 2000; i++) {
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    atomicInteger.incrementAndGet();
                }
            });
            t.start();
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(System.currentTimeMillis() - startTime);
        System.out.println(atomicInteger);
    }
}

结果:ide

257
2000

  不使用线程池话费的时间比使用线程池长了好几倍,也看出了效率问题。

 

2.核心类结构以下:

 

一、Executor是一个顶级接口,它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。

二、ExecutorService扩展了Executor。添加了一些用于生命周期管理的方法(同时还提供一些用于任务提交的便利方法

三、下面两个分支,AbstractExecutorService分支就是普通的线程池分支,ScheduledExecutorService是用来建立定时任务的。

3.Executor介绍

  线程池简化了线程的管理工做。在Java类库中,任务执行的主要抽象不是Thread,而是Executor,以下:

public interface Executor {
    void execute(Runnable command);
}

 

  Executor是个简单的接口,它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。Executor还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视机制。

  Executor基于"生产者-消费者"模式,提交任务的操做至关于生产者,执行任务的则至关于消费者。

生命周期:

  Executor的实现一般会建立线程来执行任务。但JVM只有在全部非守护线程所有终止才会退出。所以,若是没法正确的关闭Executor,那么JVM将没法结束。ExecutorService扩展了Executor接口,添加了一些用于生命周期管理的方法(同时还提供一些用于任务提交的便利方法

package java.util.concurrent;
import java.util.List;
import java.util.Collection;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

  ExecutorService的生命周期有三种:运行、关闭和已终止。ExecutorService在建立时处于运行状态。shutdown方法将执行平缓的关闭过程:再也不接受新的任务,同时等待已经提交的任务执行完毕----包括还没开始的任务,这种属于正常关闭。shutdownNow方法将执行粗暴的关闭过程:它将取消全部运行中的任务,而且再也不启动队列中还没有开始的任务,这种属于强行关闭(关闭当前正在执行的任务,而后返回全部还没有启动的任务清单)。

  在ExecutorService关闭后提交的任务将由"拒绝执行处理器"来处理,它会抛弃任务,或者使得execute方法抛出一个RejectedExecutionException。等全部任务执行完成后,ExecutorService将转入终止状态。能够调用awaitTermination来等待ExecutorService到达终止状态,或者经过isTerminated来轮询ExecutorService是否已经终止。一般在调用shutdown以后会当即调用awaitTermination阻塞等待,从而产生同步地关闭ExecutorService的效果。

 4.线程池--ThreadPoolExecutor

  线程池,从字面意义上看,是指管理一组同构工做线程的资源池。线程池是与工做队列(work queue)密切相关的,其中在工做队列保存了全部等待执行的任务。工做者线程的任务很简单:从工做队列中获取一个任务并执行任务,而后返回线程池等待下一个任务。(线程池启动初期线程不会启动,有任务提交(调用execute或submit)才会启动,直到到达最大数量就再也不建立而是进入阻塞队列)。

  "在线程池中执行任务"比"为每个任务分配一个线程"优点更多。经过重用现有的线程而不是建立新线程,能够处理多个请求时分摊在建立线程和销毁过程当中产生的巨大开销。另一个额外的好处是,当请求到达时,工做线程一般已经存在,所以不会因为建立线程而延迟任务的执行,从而提升了性能。

  ThreadPoolExecutor为Executor提供了一些基本实现。ThreadPoolExecutor是一个灵活的、稳定的线程池,容许各类容许机制。ThreadPoolExecutor定义了不少构造函数,最多见的是下面这个:

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

一、corePoolSize

  核心池的大小。在建立了线程池以后,默认状况下,线程池中没有任何线程,而是等待有任务到来才建立线程去执行任务。默认状况下,在建立了线程池以后,线程池钟的线程数为0,当有任务到来后就会建立一个线程去执行任务

二、maximumPoolSize

  池中容许的最大线程数,这个参数表示了线程池中最多能建立的线程数量,当任务数量比corePoolSize大时,任务添加到workQueue,当workQueue满了,将继续建立线程以处理任务,maximumPoolSize表示的就是wordQueue满了,线程池中最多能够建立的线程数量

三、keepAliveTime

  只有当线程池中的线程数大于corePoolSize时,这个参数才会起做用。当线程数大于corePoolSize时,终止前多余的空闲线程等待新任务的最长时间.

四、unit

  keepAliveTime时间单位

五、workQueue

  存储还没来得及执行的任务

六、threadFactory

  执行程序建立新线程时使用的线程工厂

七、handler

  因为超出线程范围和队列容量而使执行被阻塞时所使用的处理程序(拒绝执行处理器)

 

拒绝执行处理器其实是定义了拒绝执行线程的行为:实际上也是一种饱和策略,当有界队列被填满后,饱和队列开始发挥做用。

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

 

在类库中定义了四种实现:

1.  AbortPolicy-终止策略

  直接抛出一个RejectedExecutionException,也是JDK默认的拒绝策略

    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

2.CallerRunsPolicy-调运者运行策略

  若是线程池没有被关闭,就尝试执行任务。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

 3.DiscardOldestPolicy-抛弃最旧的策略

  若是线程池没有关闭,就移除队列中最早进入的任务,而且尝试执行任务。

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

4. DiscardPolicy-抛弃策略

  什么也不作,安静的丢弃任务

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

 

5.Executors

5.1ThreadFactory

  在将这个以前先介绍一下ThreadFactory。每当线程池须要一个线程时,都是经过线程工厂建立的线程。默认的线程工厂方法将建立一个新的、非守护的线程,而且不包含特殊的线程信息。固然能够经过线程工厂定制线程的信息。此工厂也有好多实现:

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

  其实现类:

 

5.2Executors

  能够经过Executors中的静态工厂方法之一建立一个线程池。Executors的静态工厂能够建立经常使用的四种线程池:

newFixedThreadPool(采用LinkedBlockingQueue队列--基于链表的阻塞队列)

  建立一个定长线程池,每当提交一个任务时就建立一个线程,直到线程池的最大数量,这时线程池的规模将再也不变化(若是因为某个线程因为发生了未预期的exception而结束,那么线程池会补充一个新的线程)。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newCachedThreadPool(使用SynchronousQueue同步队列)

  建立一个可缓存线程池,若是线程池长度超过处理须要,可灵活回收空闲线程,若无可回收,则新建线程。线程池的规模不受限。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

newScheduledThreadPool(使用DelayedWorkQueue延迟队列)

   建立一个定长线程池,支持定时及周期性任务执行。相似于Timer。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

 

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }

 

newSingleThreadExecutor(采用LinkedBlockingQueue队列--基于链表的阻塞队列)

  建立一个单线程化的线程池,它只会用惟一的工做线程来执行任务,保证全部任务按照指定顺序(FIFO, LIFO, 优先级)执行。若是这个线程异常结束会建立一个新的线程来替代。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

   

  newFixedThreadPool和newCachedThreadPool这两个工厂方法返回通用的ThreadPoolExecutor实例,这些实例能够直接用来构造专门用途的execotor。另外上面建立的时候都有一个能够指定线程工厂的方法:

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }  

  关于workqueue的选择: DelayQueue 能够实现有序加延迟的效果。 SynchronousQueue 同步队列,实际上它不是一个真正的队列,由于它不会维护队列中元素的存储空间,与其余队列不一样的是,它维护一组线程,这些线程在等待把元素加入或移除队列。LinkedBlockingQueue 相似于LinkedList,基于链表的阻塞队列。此队列若是不指定容量大小,默认采用Integer.MAX_VALUE(能够理解为无限队列)。

  关于队列的使用参考:http://www.noobyard.com/article/p-rpchycvy-a.html

6.Java线程池的使用

下面全部的测试都是基于Myrunnale进行测试

package cn.qlq.thread.twenty;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyRunnable implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(MyRunnable.class);

    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            log.info("threadName -> {},i->{} ", Thread.currentThread().getName(), i);
            try {
                Thread.sleep(1 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 

1.FixedThreadPool的用法

  建立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。在建立的时候并不会立刻建立2个线程,而是在提交任务的时候才建立线程。

建立方法:

    /**
     * 参数是初始化线程池子的大小
     */
    private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2);

 

查看源码:(使用了阻塞队列,超过池子容量的线程会在队列中等待)

 

测试代码:

package cn.qlq.thread.twenty;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo3 {
    /**
     * 参数是初始化线程池子的大小
     */
    private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new MyRunnable());
        }
    }
}

结果:(执行完线程并无销毁)

 

 解释:

  池子容量大小是2,因此前两个先被执行,第三个runable只是暂时的加到等待队列,前两个执行完成以后线程 pool-1-thread-1空闲以后从等待队列获取runnable进行执行。

  定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()

 

而且上面程序执行完毕以后JVM并无结束,所以线程池建立的线程默认是非守护线程:

 

2.CachedThreadPool

  建立一个可缓存线程池,若是线程池长度超过处理须要,可灵活回收空闲线程,若无可回收,则新建线程。

建立方法:

private static final ExecutorService batchTaskPool = Executors.newCachedThreadPool();

 

查看源码:(使用了同步队列)

 

测试代码:

package cn.qlq.thread.twenty;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo4 {
    /**
     * 参数是初始化线程池子的大小
     */
    private static final ExecutorService batchTaskPool = Executors.newCachedThreadPool();

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new MyRunnable());
        }
    }
}

结果:

 

执行完成执行线程并无结束

 

3.SingleThreadExecutor用法

   建立一个单线程化的线程池,它只会用惟一的工做线程来执行任务,保证全部任务按照指定顺序(FIFO, LIFO, 优先级)执行。相似于单线程执行的效果同样。

建立方法:

    private static final ExecutorService batchTaskPool = Executors.newSingleThreadExecutor();

 

查看源码;使用的阻塞队列

 

测试代码:

package cn.qlq.thread.twenty;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo5 {
    private static final ExecutorService batchTaskPool = Executors.newSingleThreadExecutor();

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new MyRunnable());
        }
    }
}

结果:

 

只有一个线程在执行任务:

 

 4.ScheduledThreadPool用法------能够实现任务调度功能

   建立一个定长线程池(会指定容量初始化大小),支持定时及周期性任务执行。能够实现一次性的执行延迟任务,也能够实现周期性的执行任务。

建立方法:

    private static final ScheduledExecutorService batchTaskPool = Executors.newScheduledThreadPool(2);

 

查看源码:(使用了延迟队列)

 

 

 测试代码:

package cn.qlq.thread.twenty;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Demo6 {
    private static final ScheduledExecutorService batchTaskPool = Executors.newScheduledThreadPool(2);

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            // 第一次执行是在3s后执行(延迟任务)
            batchTaskPool.schedule(new MyRunnable(), 3, TimeUnit.SECONDS);
            // 第一个参数是须要执行的任务,第二个参数是第一次的延迟时间,第三个参数是两次执行的时间间隔,第四个参数是时间的单位
            batchTaskPool.scheduleAtFixedRate(new MyRunnable(), 3, 7, TimeUnit.SECONDS);
            // 第一个参数是须要执行的任务,第二个参数是第一次的延迟时间,第三个参数是两次执行的时间间隔,第四个参数是时间的单位
            batchTaskPool.scheduleWithFixedDelay(new MyRunnable(), 3, 5, TimeUnit.SECONDS);
        }
    }
}
schedule是一次性的任务,能够指定延迟的时间。
scheduleAtFixedRate已固定的频率来执行某项计划(任务)
scheduleWithFixedDelay相对固定的延迟后,执行某项计划 (这个就是第一个任务执行完5s后再次执行,通常用这个方法任务调度)
  若是延迟时间传入的是负数会当即执行,不会报非法参数错误。

 关于两者的区别:

  scheduleAtFixedRate :这个是按照固定的时间来执行,简单来讲:到点执行
  scheduleWithFixedDelay:这个呢,是等上一个任务结束后,在等固定的时间,而后执行。简单来讲:执行完上一个任务后再执行

举例子

  scheduledThreadPool.scheduleAtFixedRate(new TaskTest("执行调度任务3"),0, 1, TimeUnit.SECONDS);  //这个就是每隔1秒,开启一个新线程
  scheduledThreadPool.scheduleWithFixedDelay(new TaskTest("第四个"),0, 3, TimeUnit.SECONDS); //这个就是上一个任务执行完,3秒后开启一个新线程

补充:好比想要实如今某一个时钟定时晚上11点执行任务,而且天天都执行

        long curDateSecneds = 0;
        try {
            String time = "21:00:00";
            DateFormat dateFormat = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
            DateFormat dayFormat = new SimpleDateFormat("yy-MM-dd");
            Date curDate = dateFormat.parse(dayFormat.format(new Date()) + " " + time);
            curDateSecneds = curDate.getTime();
        } catch (ParseException ignored) {
            // ignored
        }

        // 单位是s
        long initialDelay = (curDateSecneds - System.currentTimeMillis()) / 1000;
        int periodOneDaySeconds = 1 * 24 * 60 * 60;
        batchTaskPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("111222");
            }
        }, initialDelay, periodOneDaySeconds, TimeUnit.SECONDS);

注意: 上面的单位也就是四个参数是延迟时间和间隔的单位,也就是说第四个参数决定第二个和第三个参数。

 

 固然实现任务调度还能够采用quartz框架来实现,更加的灵活。参考:http://www.noobyard.com/article/p-shmixlrq-gn.html

 

5.测试建立线程池的时候传入一个线程工厂建立的线程是守护线程且自定义线程的name

package cn.qlq.thread.twenty;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class Demo7 {
    private static volatile AtomicInteger atomicInteger = new AtomicInteger();
    /**
     * 参数是初始化线程池子的大小
     */
    private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("t" + atomicInteger.incrementAndGet());
            thread.setDaemon(true);// 设置为守护线程
            return thread;
        }
    });

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new MyRunnable());
        }
        // 必须休眠。不然建立的是守护线程会直接关闭进程
        Thread.sleep(20 * 1000);
    }
}

结果:

 

补充:关于线程池中的线程销毁问题

  线程池中的线程若是不调用shutdown()方法线程是不会销毁的,即便是方法内部的局部变量线程池也不会销毁;调用shutdown方法以后在全部线程执行完后会线程线程池中的线程。因此在使用线程池的时候若是是方法内部使用必定要shutdown销毁线程,若是是全局使用的静态线程池能够不shutdown。

例如:不调用shutdown方法不会销毁线程。调用shutdown会销毁线程。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestThreadPoolDestroy {
    public static void main(String[] args) {
        TestPoolDestroy();
        try {
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        TestPoolDestroy();
        try {
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void TestPoolDestroy() {
        ExecutorService batchTaskPool = Executors.newFixedThreadPool(3);
        final CountDownLatch latch = new CountDownLatch(3);// 闭锁
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + "进入run");
                        Thread.sleep(5 * 10000);
                        System.out.println(Thread.currentThread().getName() + "退出run");
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        try {
            latch.await();// 闭锁产生同步效果
            System.out.println("三个都执行完毕");
            // batchTaskPool.shutdown();// 调用此方法等待执行完毕会销毁线程,若是不调用此方法即便方法退出也不会销毁线程
            System.out.println(batchTaskPool.isTerminated());
            System.out.println(batchTaskPool.isShutdown());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

结果:

 

 将上面的shutdown方法的注释去掉再次测试,结果以下:  调用shutdown以后线程会销毁

 

 

补充:若是想要判断线程池中的线程是否执行完毕,或者在多个线程在线程池中执行完毕以后处理某些事情能够结合闭锁来实现,参考:http://www.noobyard.com/article/p-mkwlywdy-n.html

 (1)闭锁实现 (建议使用这种)

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestThreadPoolDestroy {
    public static void main(String[] args) {
        TestPoolDestroy();
        System.out.println("main end");
    }

    private static void TestPoolDestroy() {
        ExecutorService batchTaskPool = Executors.newFixedThreadPool(3);
        final CountDownLatch latch = new CountDownLatch(3);// 闭锁
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + "进入run");
                        Thread.sleep(5 * 1000);
                        System.out.println(Thread.currentThread().getName() + "退出run");
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        try {
            latch.await();// 闭锁产生同步效果
            System.out.println("三个都执行完毕");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

结果:

pool-1-thread-1进入run
pool-1-thread-2进入run
pool-1-thread-3进入run
pool-1-thread-1退出run
pool-1-thread-3退出run
pool-1-thread-2退出run
三个都执行完毕
main end

 

(2)线程池自身携带的方法实现:  shuwdown后当即调用awaitTermination 实现

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class TestThreadPoolDestroy {
    public static void main(String[] args) {
        TestPoolDestroy();
        System.out.println("main end");
    }

    private static void TestPoolDestroy() {
        ExecutorService batchTaskPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + "进入run");
                        Thread.sleep(5 * 1000);
                        System.out.println(Thread.currentThread().getName() + "退出run");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        try {
            batchTaskPool.shutdown();
            batchTaskPool.awaitTermination(1, TimeUnit.DAYS);
            System.out.println("三个都执行完毕");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

结果:

pool-1-thread-2进入run
pool-1-thread-3进入run
pool-1-thread-1进入run
pool-1-thread-3退出run
pool-1-thread-2退出run
pool-1-thread-1退出run
三个都执行完毕
main end

 

补充:例如我系统中使用的一个ExcutorService的例子:

/**
 * 同步钉钉组织结构和人员的Action
 * 
 * @author Administrator
 *
 */
@Namespace("/sync")
public class SyncGroupAndUserAndBaseInfoAction extends DMSActionSupport {

    /**
     * serialID
     */
    private static final long serialVersionUID = 3526083465788431949L;
    
    private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2);

    private static Logger logger = LoggerFactory.getLogger(SyncGroupAndUserAndBaseInfoAction.class);

    @Autowired
    private GroupAndUserService groupService;

    @Autowired
    private BaseInfoService baseInfoService;

    /**
     * 同步基本信息的数据
     * 
     * @return
     */
    @Action(value = "syncGroupAndUser")
    public String syncGroupAndUser() {
        long startTime = System.currentTimeMillis();
        logger.info("manual sync groups and users!");

        String accessToken = FetchDataUtils.getAccessToken();
        if (StringUtils.isBlank(accessToken)) {
            setPreJs("accessToken is null!");
            return "js";
        }

        String groupStr = FetchDataUtils.getGroupStr(accessToken);
        if (StringUtils.isBlank(groupStr)) {
            setPreJs("groupStr is null");
            return "js";
        }
        
        
        Set<String> dingGroupIds = FetchDataUtils.getGroupIds(groupStr);// 钉钉同步回来的组
        //新开一个线程去获取钉钉用户和组织
        batchDisposeDingGroupAndUser(dingGroupIds,groupStr,accessToken);
        

        Map<String,Object> response = new HashMap<String,Object>();
        response.put("success", true);
        response.put("message", "success sync datas!");
        setPreJs(APIUtils.getJsonResultFromMap(response));
        
        long endTime = System.currentTimeMillis();
        logger.info("同步钉钉组织结构和用户完成-----用时:{}ms",(endTime-startTime));
        return "js";
    }

    private void batchDisposeDingGroupAndUser(final Set<String> dingGroupIds, final String groupStr,final String accessToken) {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                groupService.batchDisposeGroups(groupStr, dingGroupIds);
                groupService.fetchAndDisposeUsers(accessToken, dingGroupIds);                
            }
        };
        batchTaskPool.execute(run);
    }
    
}

注意:

  batchDisposeDingGroupAndUser()方法的形参必须声明为final,不然编译错误。


补充:阿里规约有一条

【强制】线程池不容许使用 Executors 去建立,而是经过 ThreadPoolExecutor 的方式,这样的处理方式让写的同窗更加明确线程池的运行规则,规避资源耗尽的风险。
说明: Executors 返回的线程池对象的弊端以下:
1) FixedThreadPool 和 SingleThreadPool:
  容许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而致使 OOM。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

 

阻塞队列默认是Integer.MAX_VALUE(能够理解为无限队列)

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

 

2) CachedThreadPool 和 ScheduledThreadPool:
  容许的建立线程数量为 Integer.MAX_VALUE, 可能会建立大量的线程,从而致使 OOM。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }