Java中的多线程与线程池——线程池篇

线程池

线程池是什么?

简单来讲,线程池是指提早建立若干个线程,当有任务须要处理时,线程池里的线程就会处理任务,处理完成后的线程并不会被销毁,而是继续等待下一个任务。因为建立和销毁线程都是消耗系统资源的,因此,当某个业务须要频繁进行线程的建立和销毁时,就能够考虑使用线程池来提升系统的性能啦。java

线程池能够作什么?

借由《Java并发编程的艺术》,使用线程池可以帮助 :git

  • 下降资源消耗。经过重复利用已经建立的线程,可以下降线程建立和销毁形成的消耗。
  • 提升响应速度。当任务到达时,任务能够不须要等待线程的建立就能当即执行。
  • 提升线程的可管理性。线程是稀缺资源,若是无限制地建立,不只会消耗系统资源,还会下降系统的稳定性,使用线程池能够进行统一的分配,调优和监控。

如何建立一个线程池

首先建立一个 Runnable 接口实现类。编程

package demo;

import java.util.Date;

/** * @author yuanyiwen * @create 2020-02-28 16:05 * @description */
public class DemoThread implements Runnable {

    private String command;

    public DemoThread(String command) {
        this.command = command;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " 开始时间 : " + new Date());
        processCommand();
        System.out.println(Thread.currentThread().getName() + " 结束时间 : " + new Date());
    }

    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return "DemoThread{" +
                "command='" + command + '\'' +
                '}';
    }
}
复制代码

这里让咱们使用 ThreadPoolExecutor 来建立一个线程池进行测试:缓存

package demo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/** * @author yuanyiwen * @create 2020-02-28 16:19 * @description */
public class DemoThreadPoolExecutor {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE  = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;

    public static void main(String[] args) {
        // 使用线程池来建立线程
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                // 核心线程数为 :5
                CORE_POOL_SIZE,
                // 最大线程数 :10
                MAX_POOL_SIZE,
                // 等待时间 :1L
                KEEP_ALIVE_TIME,
                // 等待时间的单位 :秒
                TimeUnit.SECONDS,
                // 任务队列为 ArrayBlockingQueue,且容量为 100
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                // 饱和策略为 CallerRunsPolicy
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        for(int i = 0; i < 15; i++) {
            // 建立WorkerThread对象,该对象须要实现Runnable接口
            Runnable worker = new DemoThread("任务" + i);
            // 经过线程池执行Runnable
            threadPoolExecutor.execute(worker);
        }
        // 终止线程池
        threadPoolExecutor.shutdown();
        while (!threadPoolExecutor.isTerminated()) {

        }
        System.out.println("所有线程已终止");
    }
}
复制代码

最后让咱们来看一下运行结果 :多线程

能够看到,当核心线程数为 5 时,即便总共要运行的线程有 15 个,每次也只会同时执行 5 个任务,剩下的任务则会被放入等待队列,等待核心线程空闲后执行。总的来讲步骤以下 :并发

Executor框架

Executor 框架是 Java5 以后引进的。在 Java5 以后,经过 Executor 来启动线程比使用 Thread 的 start 方法更好。除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点 :有助于避免 this 逃逸问题。框架

this 逃逸

this 逃逸是指在构造函数返回以前其余线程就持有该对象的引用,调用还没有构造彻底的对象的方法时可能引起奇怪的错误。ide

引起 this 逃逸一般须要知足两个条件 :一个是在构造函数中建立内部类,另外一个就是在构造函数中将这个内部类发布了出去。函数

因为发布出去的内部类对象自带对外部类 this 的访问权限,这就致使在经过内部类对象访问外部类 this 时,外部类可能并未构造完成,从而致使一些意想不到的问题。性能

典型的 this 逃逸情景以下 :

public class DemoThisEscape {

    private int a = 10;

    public DemoThisEscape() {
        // 在外部类的构造函数中调用内部类
        new Thread(new InnerClass()).start();
    }

    private class InnerClass implements Runnable {
        @Override
        public void run() {
            // 在这里经过 DemoThisEscape.this 引用还没有构造完毕的对象,好比这样 :
            System.out.println(DemoThisEscape.this.a);
        }
    }
}
复制代码

经过使用线程池进行统一的线程调度,省去了在程序中手动启动线程的步骤,从而避免了在构造器中启动一个线程的状况,所以可以有效规避 this 逃逸。

ThreadPoolExecutor经常使用参数

1. corePoolSize :核心线程线程数

定义了最小能够同时运行的线程数量。

2. maximumPoolSize :最大线程数

当队列中存放的任务达到队列容量时,当前能够同时运行的线程数量会扩大到最大线程数。

3. keepAliveTime :等待时间

当线程数大于核心线程数时,多余的空闲线程存活的最长时间。

4. unit :时间单位。

keepAliveTime 参数的时间单位,包括 TimeUnit.SECONDSTimeUnit.MINUTESTimeUnit.HOURSTimeUnit.DAYS 等等。

5. workQueue :任务队列

任务队列,用来储存等待执行任务的队列。

6. threadFactory :线程工厂

线程工厂,用来建立线程,通常默认便可。

7. handler :拒绝策略

也称饱和策略;当提交的任务过多而不能及时处理时,能够经过定制策略来处理任务。

ThreadPoolExecutor 饱和策略 : 指当前同时运行的线程数量达到最大线程数量而且队列也已经被放满时,ThreadPoolTaskExecutor 所执行的策略。

经常使用的拒绝策略包括 :

  • ThreadPoolExecutor.AbortPolicy: 抛出 RejectedExecutionException 来拒绝新任务的处理,是 Spring 中使用的默认拒绝策略。
  • ThreadPoolExecutor.CallerRunsPolicy: 线程调用运行该任务的 execute 自己,也就是直接在调用 execute 方法的线程中运行 (run) 被拒绝的任务,若是执行程序已关闭,则会丢弃该任务。此策略提供简单的反馈控制机制,可以减缓新任务的提交速度,但可能形成延迟。若应用程序能够承受此延迟且不能丢弃任何一个任务请求,能够选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最先的未处理的任务请求。

为何推荐使用 ThreadPoolExecutor 来建立线程?

规约一 :线程资源必须经过线程池提供,不容许在应用中自行显示建立线程。

使用线程池的好处是减小在建立和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。若是不使用线程池,有可能会形成系统建立大量同类线程而致使消耗完内存或者“过分切换”的问题。

规约二 :强制线程池不容许使用 Executors 去建立,而是经过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同窗更加明确线程池的运行规则,规避资源耗尽的风险。

Executors 返回线程池对象的弊端以下:

FixedThreadPoolSingleThreadExecutor : 容许请求的队列长度为 Integer.MAX_VALUE,可能会堆积大量请求,从而致使 OOM。

CachedThreadPoolScheduledThreadPool : 容许建立的线程数量为 Integer.MAX_VALUE,可能会建立大量线程,从而致使 OOM。

几种常见的线程池

FixThreadPool 固定线程池

FixThreadPool :可重用固定线程数的线程池。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(
            nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            threadFactory);
    }
复制代码

执行机制 :

  • 若当前运行的线程数小于 corePoolSize,来新任务时,就建立新的线程来执行任务;
  • 当前运行的线程数等于 corePoolSize 后,若是再来新任务的话,会将任务加到 LinkedBlockingQueue
  • 线程池中的线程执行完手头的工做后,会在循环中反复从 LinkedBlockingQueue 中获取任务来执行。

FixThreadPool 使用的是无界队列 LinkedBlockingQueue(队列容量为 Integer.MAX_VALUE),而它会给线程池带来以下影响 :

  • 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,所以线程池中的线程数不会超过 corePoolSize
  • 因为使用的是一个无界队列,因此 maximumPoolSize 将是一个无效参数,由于不可能存在任务队列满的状况,因此 FixedThreadPool 的 corePoolSizemaximumPoolSize 被设置为同一个值,且 keepAliveTime 将是一个无效参数;
  • 运行中的 FixedThreadPool(指未执行 shutdown()shutdownNow() 的)不会拒绝任务,所以在任务较多的时候可能会致使 OOM。

SingleThreadExecutor 单一线程池

SingleThreadExecutor 是只有一个线程的线程池。

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(
                    1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(),
                    threadFactory));
}
复制代码

除了池中只有一个线程外,其余和 FixThreadPool 是基本一致的。

CachedThreadPool 缓存线程池

CachedThreadPool 是一个会根据须要建立新线程的线程池,但会在先前构建的线程可用时重用它。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(
            0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(),
            threadFactory);
}
复制代码

corePoolSize 被设置为 0,maximumPoolSize 被设置为 Integer.MAX.VALUE,也就是无界的。虽然是无界,但因为该线程池还存在一个销毁机制,即若是一个线程 60 秒内未被使用过,则该线程就会被销毁,这样就节省了不少资源。

可是,若是主线程提交任务的速度高于 maximunPool 中线程处理任务的速度,CachedThreadPool 将会源源不断地建立新的线程,从而依然可能致使 CPU 耗尽或内存溢出。

执行机制 :

  • 首先执行 offer 操做,提交任务到任务队列。若当前 maximumPool 中有空闲线程正在执行 poll 操做,且主线程的 offer 与空闲线程的 poll 配对成功时,主线程将把任务交给空闲线程执行,此时视做 execute() 方法执行完成;不然,将执行下面的步骤。
  • 当初始 maximum 为空,或 maximumPool 中没有空闲线程时,将没有线程执行 poll 操做。此时,CachedThreadPool 会建立新线程执行任务,execute() 方法执行完成。

如何拟定线程池的大小?

上下文切换

多线程变编程中通常线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用。为了让这些线程都能获得有效执行,CPU 采起的策略是为每一个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会从新处于就绪状态让给其余线程使用,这个过程就属于一次上下文切换。

归纳来讲就是,当前任务在执行完 CPU 时间片切换到另外一个任务以前,会先保存本身的状态,以便下次再切换回这个任务时,能够直接加载到上次的状态。任务从保存到再加载的过程就是一次上下文切换。

上下文切换一般是计算密集型的。也就是说,它须要至关可观的处理器时间,在每秒几十上百次的切换中,每次切换都须要纳秒量级的时间。因此,上下文切换对系统来讲意味着消耗大量的 CPU 时间,事实上,多是操做系统中时间消耗最大的操做。

Linux 相比与其余操做系统(包括其余类 Unix 系统)有许多,其中有一项就是,其上下文切换和模式切换的时间消耗很是少。

简单的拟定判断

CPU 密集型任务(N+1):

这种任务消耗的主要是 CPU 资源,能够将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它缘由致使的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种状况下多出来的一个线程就能够充分利用 CPU 的空闲时间。

I/O 密集型任务(2N):

这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就能够将 CPU 交出给其它线程使用。所以在 I/O 密集型任务的应用中,咱们能够多配置一些线程,具体的计算方法是 2N。


参考文章 :JavaGuide