Dubbo 线程池模型

前言

你们好,今天开始给你们分享 — Dubbo 专题之 Dubbo 线程池模型。在前面上个章节中咱们讨论了 Dubbo SPI,了解了 Dubbo SPI 其本质是从 JDK 标准的 SPI (Service Provider Interface) 扩展点发现机制增强而来,同时解决了 Java 中 SPI 的一些缺陷。以及咱们使用 Dubbo SPI 实现自定义能力的拓展。那本章节咱们要讨论的 Dubbo 线程模型也是基于 SPI 实现,那什么是线程模型呢?以及其在咱们的项目中有什么做用呢?那么咱们在本章节中进行讨论。下面就让咱们快速开始吧!java

1. 线程模型简介

小伙伴若是对 Servlet 熟悉就知道,从 Servlet 3.x 开始支持异步非阻塞模式。至于什么异步非阻塞前面我在前面的章节中有讨论小伙伴能够自行学习以前的文章。咱们经过一个访问Web应用流程图简单说明:git

线程模型1

在上面的流程图中咱们能够看到第一个请求发起同步 Web 调用,而后 Web 再发起对第三方服务的调用,整个过程全链路是同步调用。第二个请求一样也是发起同步调用,可是在发起第三方调用的时候切换了线程(基于 Servlet 3.x 咱们不须要手动的建立线程来切换)。这么作的好处在于咱们能够用专门处理线程池去作业务处理或第三方服务的调用。那什么状况下咱们须要切换线程不使用主线程呢?若是事件处理的逻辑能迅速完成,而且不会发起新的 IO 请求,好比只是在内存中记个标识,则直接在 IO 线程上处理更快,由于减小了线程池调度。但若是事件处理逻辑较慢,或者须要发起新的 IO 请求,好比须要查询数据库或其它服务调用时,则必须派发到线程池,不然 IO 线程阻塞,将致使不能接收其它请求。spring

2. 使用方式

那在 Dubbo 中给咱们提供了经过不一样的派发策略和不一样的线程池配置的组合来应对不一样的场景。配置方式以下:数据库

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

下面咱们简单描述下dispatcherthreadpool的参数说明:apache

  1. Dispatcher
  • all 全部消息都派发到线程池,包括请求,响应,链接事件,断开事件,心跳等。(默认)
  • direct 全部消息都不派发到线程池,所有在 IO 线程上直接执行。
  • message 只有请求响应消息派发到线程池,其它链接断开事件,心跳等消息,直接在 IO 线程上执行。
  • execution 只有请求消息派发到线程池,不含响应,响应和其它链接断开事件,心跳等消息,直接在 IO 线程上执行。
  • connection 在 IO 线程上,将链接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
  1. ThreadPool
  • fixed 固定大小线程池,启动时创建线程,不关闭,一直持有。(默认)
  • cached 缓存线程池,空闲一分钟自动删除,须要时重建。
  • limited 可伸缩线程池,但池中的线程数只会增加不会收缩。只增加不收缩的目的是为了不收缩时忽然来了大流量引发的性能问题。
  • eager 优先建立Worker线程池。在任务数量大于corePoolSize可是小于maximumPoolSize时,优先建立Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比于cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)

3. 使用场景

经过前面的介绍咱们应该明白咱们为何须要切换线程,遵循一个很简单的原则:若是咱们处理的任务须要操做新的 IO 或者处理任务须要很长的时间那么咱们就能够把这部分工做放到咱们的任务线程池去处理。那么咱们简单的总结下在工做常遇到的场景:编程

  1. 计算型服务:在我以前的工做中遇到这样的一个需求:咱们的车机实时上报数据给服务器,服务器记录数据而且实时计算和纠正导航数据。那么这里咱们须要一个计算型的微服务,主要的工做就是计算和修正实时数据,那么这个服务就是典型的计算型服务,全部咱们计算过程当中尽可能减小线程的切换并尽量的在一个线程内进行计算。这样减小线程切换的开销提供计算速度。
  2. 网关服务:首先咱们须要了解什么是网关,简单的理解就是全部的服务入口,对每一个服务的调用必须通过网关转发到对应服务上(相似 Nginx )。那这里网关主要工做就是服务转发(鉴权、限流等等),能够理解为发起请求。很明显发起请求就是开启新的 IO 全部咱们能够切换到线程池去处理。

4. 示例演示

下面咱们经过以获取图书列表为例进行演示。如下是项目的结构图:api

idea1

由于这里咱们主要是对服务提供端的配置,全部咱们主要看dubbo-provider-xml.xml配置内容:缓存

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <!-- 指定分发策略为:all 线程池:fixed 固定大小为:100 -->
    <dubbo:protocol port="20880" name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

    <dubbo:application name="demo-provider" metadata-type="remote"/>

    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>

    <bean id="bookFacade" class="com.muke.dubbocourse.test.provider.BookFacadeImpl"/>

    <!--暴露服务为Dubbo服务-->
    <dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade" ref="bookFacade" />

</beans>

上面的 XML 配置中dispatcher="all"指定事件的分发策略、threadpool="fixed" threads="100"指定线程池固定大小为100服务器

5. 原理分析

这里分发策略和线程池采用 Dubbo 中的 SPI 方式加载的小伙伴能够参考前面的 《Dubbo SPI》章节进行了解。下面咱们进入主题,首先看看在 Dubbo 中为咱们提供的5种事件分发策略:微信

idea2

咱们这里简单的分析 all分发策略其它的都是相似的小伙伴自行查阅源码分析。下面咱们看看org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler核心源码:

/***
 *@className AllChannelHandler
 *       
 *@description 全部处理分发到线程池去处理
 *       
 *@author <a href="http://youngitman.tech">青年IT男</a>
 *       
 *@date 12:50 2020-03-05
 *       
 *@JunitTest: {@link  }     
 *
 *@version v1.0.0
 *       
**/
public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    /**
     *
     * 远程链接事件回调
     *
     * @author liyong 
     * @date 1:34 PM 2020/12/6 
     * @param channel 
     * @exception 
     * @return void 
     **/
    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            //链接到远程事件放入线程池执行
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    /**
     *
     * 端口远程链接
     *
     * @author liyong 
     * @date 1:34 PM 2020/12/6 
     * @param channel 
     * @exception 
     * @return void 
     **/
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            //断开链接处理事件放入线程池执行
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    /**
     *
     * 接收到数据回调
     *
     * @author liyong 
     * @date 1:34 PM 2020/12/6 
     * @param channel 
     * @param message 
     * @exception 
     * @return void 
     **/
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            //接收到数据放入线程池处理
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    /**
     *
     * 发生异常回调
     *
     * @author liyong 
     * @date 1:35 PM 2020/12/6 
     * @param channel 
     * @param exception 
     * @exception 
     * @return void 
     **/
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            //发生异常放入线程池处理
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

从上面的代码注释中能够看到 all 这种处理策略就是全部消息都派发到线程池,包括请求、响应、链接事件、断开事件、心跳等。

接下来咱们看看线程池的处理策略主要支持4种:

idea3

咱们以fixed策略进行分析。咱们看到org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool核心源码:

/**
 * 建立固定大小线程池
 *
 * @see java.util.concurrent.Executors#newFixedThreadPool(int)
 */
public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        //线程池名称
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        //线程池大小
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        //队列大小
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                //若是队列大小为0使用同步队列
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        //不然使用指定大小到阻塞队列
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

上面的源码中使用指定大小的队列建立线程池,若是队列大小为0使用同步队列。

6. 小结

在本小节中咱们主要学习了 Dubbo 中的线程池模型,在 Dubbo 中为咱们提供了两种策略调整线程池模型分别是:DispatcherThreadPool。其中Dispatcher提供了5种策略:alldirectmessageexecutionconnectionThreadPool提供了4种策略:fixedcachedlimitedeager。同时咱们分别从源码中学习了底层的实现逻辑。

本节课程的重点以下:

  1. 理解 Dubbo 中线程模型
  2. 了解什么是 Dispatcher模式
  3. 了解什么是 ThreadPool模式
  4. 了解线程模型实现原理

写在最后

本小节是 Dubbo 入门到精通系列 (《从零开始学习Dubbo》、《Dubbo高阶应用》、《Dubbo源码分析》) 中 《从零开始学习Dubbo》基础课程最后一小节,感谢你们长期的支持。因为本人时间精力有限后面课程的相关专题更新可能比较缓慢请多多包含,再次感谢小伙伴的关注。若是想得到最新的专题分享请关注个人微信公众号。

做者

我的从事金融行业,就任过易极付、思建科技、某网约车平台等重庆一流技术团队,目前就任于某银行负责统一支付系统建设。自身对金融行业有强烈的爱好。同时也实践大数据、数据存储、自动化集成和部署、分布式微服务、响应式编程、人工智能等领域。同时也热衷于技术分享创立公众号和博客站点对知识体系进行分享。关注公众号: 青年IT男 获取最新技术文章推送!

博客地址: http://youngitman.tech

微信公众号: