[netty源码分析]--EventLoopGroup与EventLoop 分析netty的线程模型

netty核心类源码解析:分析netty的运行机制java

  1. EventLoopGroup与EventLoop解析:分析netty的线程模型

这一篇博文主要是从源码层次分析netty的线程模型。netty之因此是高性能NIO框架,其中主要贡献之一就是netty的线程模型的高性能,咱们都知道netty的线程模型是基于Reactor线程模型,下面咱们就来分析一下对于netty的reactor线程模型是如何实现的。react

1. NioEventLoopGroup

netty的程序的启动(在服务端通常是两个NioEventLoopGroup线程池,一个boss, 一个worker; 对于客户端通常是一个线程池)。咱们通常是使用NIO,因此本文也所有是基于NIO来分析的,固然netty也支持BIO,不过本文就不作分析了。对于这个类的分析,咱们首先从Reactor线程模型开始分析git

1.1 Reactor线程模型

Reactor线程模型有三种github

  • 单线程模型
  • 多线程模型
  • 主从Reactor线程模型

关于这三种线程模型的原型,能够参考我以前的一片博文,我这里就不重复列出了,传送门:web

Reactor线程模型以及在netty中的应用
http://blog.csdn.net/u010853261/article/details/55805216bootstrap

1.2 NioEventLoopGroup与Reactor线程模型的对应

前面介绍了Reactor线程模型的原型实现,那么NIOEventLoopGroup是怎么与Reactor关联在一块儿的呢? 其实NIOEventLoopGroup就是一个线程池实现,经过设置不一样的NIOEventLoopGroup方式就能够对应三种不一样的Reactor线程模型。数组

这里我只给出服务端的配置,对于客户端都是同样的。promise

单线程模型

下面直接给出配置的实例:服务器

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup)
 .channel(NioServerSocketChannel.class);
//.....

上面实例化了一个NIOEventLoopGroup,构造参数是1表示是单线程的线程池。而后接着咱们调用 b.group(bossGroup) 设置了服务器端的 EventLoopGroup. 有些朋友可能会有疑惑: 我记得在启动服务器端的 Netty 程序时, 是须要设置 bossGroup 和 workerGroup 的, 为何这里就只有一个 bossGroup?
其实很简单, ServerBootstrap 重写了 group 方法:多线程

@Override
public ServerBootstrap group(EventLoopGroup group) {
    return group(group, group);
}

所以当传入一个 group 时, 那么 bossGroup 和 workerGroup 就是同一个 NioEventLoopGroup 了.

这时候呢, 由于 bossGroup 和 workerGroup 就是同一个 NioEventLoopGroup, 而且这个 NioEventLoopGroup 只有一个线程, 这样就会致使 Netty 中的 acceptor 和后续的全部客户端链接的 IO 操做都是在一个线程中处理的. 那么对应到 Reactor 的线程模型中, 咱们这样设置 NioEventLoopGroup 时, 就至关于 Reactor 单线程模型.

多线程模型

同理,先给出源码:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class);
//...

bossGroup 中只有一个线程, 在workerGroup线程池中我没有指定线程数量,因此默认是 CPU 核心数乘以2, 所以对应的到 Reactor 线程模型中, 咱们知道, 这样设置的 NioEventLoopGroup 其实就是 Reactor 多线程模型.

主从Reactor线程模型

实现方式以下:

EventLoopGroup bossGroup = new NioEventLoopGroup(4);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class);
//...

Netty 的服务器端的 acceptor 阶段, 没有使用到多线程, 所以上面的 主从多线程模型 在 Netty 的服务器端是不存在的.

服务器端的 ServerSocketChannel 只绑定到了 bossGroup 中的一个线程, 所以在调用 Java NIO 的 Selector.select 处理客户端的链接请求时, 其实是在一个线程中的, 因此对只有一个服务的应用来讲, bossGroup 设置多个线程是没有什么做用的, 反而还会形成资源浪费。

经 Google, Netty 中的 bossGroup 为何使用线程池的缘由你们众所纷纭, 不过我在 stackoverflow 上找到一个比较靠谱的答案:

the creator of Netty says multiple boss threads are useful if we share NioEventLoopGroup between different server bootstraps, but I don’t see the reason for it.

意思就是说:netty做者说:咱们在不一样的服务器引导之间共享NioEventLoopGroup,多个boss线程是有用的,但我没有看到它的缘由。

1.3 NioEventLoopGroup的实例化过程

下面来分析对NioEventLoopGroup类进行实例化的过程当中发生了什么。

NioEventLoopGroup 类层次结构

先给出类图:

这里写图片描述

对于NioEventLoopGroup核心的类继承关系就是:

NioEventLoopGroup –》MultithreadEventLoopGroup –》MultithreadEventExecutorGroup

下面从这三个类出发分析NioEventLoopGroup实例化过程。

NioEventLoopGroup实例化过程

这里首先盗用一下网上的一张示意图(画图实在是太耗时间了,毕竟我懒):

这里写图片描述

下面大体解释一下这个实例化过程作了什么:

(1)在NioEventLoopGroup构造器调用:
若是对于不指定线程数参数的构造器,默认设置为0(可是在后面的构造器中会判断,若是设置为0,就会初始化为2*CPU数量);

public NioEventLoopGroup() {
    this(0);
}

而后调用:

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}

这里设置了NioEventLoopGroup线程池中每一个线程执行器默认是null(这里设置为null, 在后面的构造器中会判断,若是为null就实例化一个线程执行器)。

再调用:

public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}

这里就存在于JDK的NIO的交互了,这里设置了线程池的SelectorProvider, 经过SelectorProvider.provider() 返回。

而后调用:

public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

在这个重载的构造器中又传入了默认的选择策略工厂DefaultSelectStrategyFactory;

最后调用:

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

这里就是调用父类MultithreadEventLoopGroup的构造器了, 这里还添加了线程的拒绝执行策略。

(2)在MultithreadEventLoopGroup构造器调用:

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

构造器被定义成protect,表示只能在NioEventLoopGroup中被调用,必定层度上的保护做用。这里就是对线程数进行了判断,当nThreads为0 的时候就设置成 DEFAULT_EVENT_LOOP_THREADS 这个常量。这个常量的定义以下:其实就是在MultithreadEventLoopGroup的静态代码段,其实就是将DEFAULT_EVENT_LOOP_THREADS赋值为CPU核心数*2;

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

接下来就是调用的基类MultithreadEventExecutorGroup的构造器:

(3)MultithreadEventExecutorGroup的构造器:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

这个构造器里面多传入了一个参数 DefaultEventExecutorChooserFactory.INSTANCE , 经过这个EventLoop选择器工厂能够实例化GenericEventExecutorChooser这个类, 这个类是EventLoopGroup线程池里面的EventLoop的选择器,调用GenericEventExecutorChooser.next() 方法能够从线程池中选择出一个合适的EventLoop线程。

而后就是重载调用MultithreadEventExecutorGroup类的构造器:

构造器调用到了这里其实也就是不断向上传递调用的终点了,因为构造器代码比较长,我就删除一些校验和不重要的代码,只保留核心代码:

/** * 最终的建立实例构造器 * * @param nThreads 该实例将使用的线程数 * @param executor 将要使用的executor, 默认为null * @param chooserFactory 将要使用的EventExecutorChooserFactory * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    /** 1.初始化线程池 */
    //参数校验nThread合法性,
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }
    //executor校验非空, 若是为空就建立ThreadPerTaskExecutor, 该类实现了 Executor接口
    // 这个executor 是用来执行线程池中的全部的线程,也就是全部的NioEventLoop,其实从
    //NioEventLoop构造器中也能够知道,NioEventLoop构造器中都传入了executor这个参数。
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    //这里的children数组, 其实就是线程池的核心实现,线程池中就是经过指定的线程数组来实现线程池;
    //数组中每一个元素其实就是一个EventLoop,EventLoop是EventExecutor的子接口。
    children = new EventExecutor[nThreads];

    //for循环实例化children数组,NioEventLoop对象
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            //newChild(executor, args) 函数在NioEventLoopGroup类中实现了, 
            // 实质就是就是存入了一个 NIOEventLoop类实例
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            //若是构造失败, 就清理资源
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }
                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }//end foreach

    /** 2.实例化线程工厂执行器选择器: 根据children获取选择器 */
    chooser = chooserFactory.newChooser(children);

    /** 3.为每一个EventLoop线程添加 线程终止监听器*/
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    /** 4. 将children 添加到对应的set集合中去重, 表示只可读。*/
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

总结一下上面的初始化步骤(3)中的一些重点:

1)NIOEventLoopGroup的线程池实现其实就是一个NIOEventLoop数组,一个NIOEventLoop能够理解成就是一个线程。


2)全部的NIOEventLoop线程是使用相同的 executor、SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler以及是属于某一个NIOEventLoopGroup的。 这一点从 newChild(executor, args); 方法就能够看出:newChild()的实现是在NIOEventLoopGroup中实现的。

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

3)当有IO事件来时,须要从线程池中选择一个线程出来执行,这时候的NioEventLoop选择策略是由GenericEventExecutorChooser实现的, 并调用该类的next() 方法获取到下一个 NioEventLoop.

到了这里线程池的初始化就已经结束了, 基本这部分就只涉及 netty线程池的内容,不涉及到channel 与 channelPipeline和ChannelHandler等内容。

下面的内容就是分析 NioEventLoop的构造器实现了。

2. NioEventLoop

NioEventLoop 继承于 SingleThreadEventLoop;
SingleThreadEventLoop 又继承于 SingleThreadEventExecutor;

SingleThreadEventExecutor 是 netty 中对本地线程的抽象, 它内部有一个 Thread thread 属性, 存储了一个本地 Java 线程. 所以咱们能够认为, 一个 NioEventLoop 其实和一个特定的线程绑定, 而且在其生命周期内, 绑定的线程都不会再改变。

2.1 NioEventLoop 类层次结构

这里写图片描述

NioEventLoop 的类层次结构图仍是比较复杂的, 不过咱们只须要关注几个重要的点便可. 首先 NioEventLoop 的继承链以下:

NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor

在 AbstractScheduledEventExecutor 中, Netty 实现了 NioEventLoop 的 schedule 功能, 即咱们能够经过调用一个 NioEventLoop 实例的 schedule()方法来运行一些定时任务. 而在 SingleThreadEventLoop 中, 又实现了任务队列的功能, 经过它, 咱们能够调用一个 NioEventLoop 实例的 execute() 方法来向任务队列中添加一个 task, 并由 NioEventLoop 进行调度执行.

一般来讲, NioEventLoop 肩负着两种任务,:
1)第一个是做为 IO 线程, 执行与 Channel 相关的 IO 操做, 包括 调用 select 等待就绪的 IO 事件、读写数据与数据的处理等;

2)而第二个任务是做为任务队列, 执行 taskQueue 中的任务, 例如用户调用 eventLoop.schedule 提交的定时任务也是这个线程执行的.

2.2 NioEventLoop 的实例化过程

这里仍是盗用一下网上的一张示意图(画图实在是太耗时间了,毕竟我懒):

这里写图片描述

相同的套路,想来分析实例化过程当中发生了什么:

对于NioEventLoop的实例化,基本就是在NioEventLoopGroup.newChild() 中调用的,下面先给出源码:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

从上面函数里面 new NioEventLoop()出发分析实例化过程:

(1)最早调用NioEventLoop 里面的构造函数:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {

    //调用父类构造器
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);

    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    selector = openSelector();//new 一个selector实例, 具体的类与平台和底层有关
    selectStrategy = strategy;
}

须要注意的是:构造器里面传入了 NioEventLoopGroup、Executor、SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler。从这里能够看出,一个NioEventLoop属于某一个NioEventLoopGroup, 且处于同一个NioEventLoopGroup下的全部NioEventLoop 公用Executor、SelectorProvider、SelectStrategyFactory和RejectedExecutionHandler。

还有一点须要注意的是,这里的SelectorProvider构造参数传入的是经过在NioEventLoopGroup里面的构造器里面的
SelectorProvider.provider(); 方式获取的, 而这个方法返回的是一个单例的SelectorProvider, 因此全部的NioEventLoop公用同一个单例SelectorProvider。

(2)核心的东西说完了,就是调用父类SingleThreadEventLoop的构造器:

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {

    //父类构造器
    super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);

    tailTasks = newTaskQueue(maxPendingTasks);
}

这里除了调用父类SingleThreadEventExecutor的构造器之外, 就是实例化了 tailTasks 这个变量;
对于tailTasks在SingleThreadEventLoop属性的定义以下:

private final Queue<Runnable> tailTasks;// 尾部任务队列

队列的数量maxPendingTasks参数默认是SingleThreadEventLoop.DEFAULT_MAX_PENDING_TASK,其实就是Integer.MAX_VALUE; 对于new的这个队列, 其实就是一个LinkedBlockingQueue 无界队列。

(3)再看调用的父类SingleThreadEventExecutor的构造器:

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
    super(parent);// 设置EventLoop所属于的EventLoopGroup
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);//默认是Integer.MAX_VALUE
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    taskQueue = newTaskQueue(this.maxPendingTasks);//建立EventLoop的任务队列, 默认是 LinkedBlockingQueue
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

自此,NioEventLoop的实例化过程已经分析完毕。

前面已经分析完了EventLoopGroup和EventLoop,那么有一个问题,咱们知道一个EventLoop其实是对应于一个线程,那么这个EventLoop是何时启动的呢?

2.3 NioEventLoop 的启动顺序 以及 将EventLoop 与 Channel 的关联

在前面咱们已经知道了, NioEventLoop 自己就是一个 SingleThreadEventExecutor, 所以 NioEventLoop 的启动, 其实就是 NioEventLoop 所绑定的本地 Java 线程的启动。 在NioEventLoop的构造器初始化分析过程当中,咱们知道,直到SingleThreadEventExecutor咱们传入了一个线程执行器 Executor,我想线程的启动就是经过这个线程执行器启动的。

在SingleThreadEventExecutor类中,有一个很是重要的属性 thread,这个线程也就是与NioEventLoop 所绑定的本地 Java 线程。咱们看看这个线程是在何时初始化和启动的。

经过定位 thread 变量,发如今 doStartThread() 函数中,有一行代码:

thread = Thread.currentThread();

除了这个地方,发现没有其他的地方对thread进行显示的实例化。并且这行代码在

executor.execute(new Runnable() {
}

也就是说在executor执行的一个线程里面, 含义也就是当executor 第一次执行提交的任务建立的线程 赋值给 thread对象。因而可知,thread的启动其实也就是在这里。

经过不断的分析 doStartThread() 函数 的父调用关系,最顶层的调用就是 SingleThreadEventExecutor.execute(Runnable task)

也就是说哪里第一次调用了execute()函数,也就是启动了该NioEventLoop。下面就来分析一下最初是哪里调用了execute()函数,也就是NioEventLoop的启动流程。

在分析NioEventLoop的启动流程以前先来看看EventLoop 与 Channel 怎样关联的?

EventLoop 与 Channel 的关联

Netty 中, 每一个 Channel 都有且仅有一个 EventLoop 与之关联, 它们的关联过程以下:
这里写图片描述

从上图中咱们能够看到, 当调用了 AbstractChannel#AbstractUnsafe.register 后, 就完成了 Channel 和 EventLoop 的关联. register 实现以下:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 删除条件检查.
    ...
    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new OneTimeTask() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            ...
        }
    }
}

在 AbstractChannel#AbstractUnsafe.register 中, 会将一个 EventLoop 赋值给 AbstractChannel 内部的 eventLoop 字段, 到这里就完成了 EventLoop 与 Channel 的关联过程.

EventLoop 的启动

根据以前的分析,咱们如今的任务就是寻找 在哪里第一次调用了 SingleThreadEventExecutor.execute() 方法。

留心的读者可能已经注意到了, 咱们在 EventLoop 与 Channel 的关联 这一小节时, 有提到到在注册 channel 的过程当中, 会在 AbstractChannel#AbstractUnsafe.register 中调用 eventLoop.execute 方法, 在 EventLoop 中进行 Channel 注册代码的执行, AbstractChannel#AbstractUnsafe.register 部分代码以下:

if (eventLoop.inEventLoop()) {
    register0(promise);
} else {
    try {
        //第一次调用eventLoop.execute()
        eventLoop.execute(new OneTimeTask() {
            @Override
            public void run() {
                register0(promise);
            }
        });
    } catch (Throwable t) {
        ...
    }
}

很显然, 一路从 Bootstrap.bind 方法跟踪到 AbstractChannel#AbstractUnsafe.register 方法, 整个代码都是在主线程中运行的, 所以上面的 eventLoop.inEventLoop() 就为 false, 因而进入到 else 分支, 在这个分支中调用了 eventLoop.execute. eventLoop 是一个 NioEventLoop 的实例, 而 NioEventLoop 没有实现 execute 方法, 所以调用的是 SingleThreadEventExecutor.execute()方法:

@Override
public void execute(Runnable task) {
    ...
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

咱们已经分析过了, inEventLoop == false, 所以执行到 else 分支, 在这里就调用了 startThread() 方法来启动 SingleThreadEventExecutor 内部关联的 Java 本地线程了。

总结一句话, 当 EventLoop.execute 第一次被调用时, 就会触发 startThread() 的调用, 进而致使了 EventLoop 所对应的 Java 线程的启动。

咱们将 EventLoop 与 Channel 的关联 小节中的时序图补全后, 就获得了 EventLoop 启动过程的时序图:
这里写图片描述

2.4 NioEventLoop 启动以后是怎么实现事件循环的?

通过上面的分析咱们已经找到了启动NIoEventLoop线程的入口,这里来分析一下NioEventLoop启动以后是如何实现所谓的事件循环机制的呢?

也就是从SingleThreadEventExecutor.execute(Runnable task); 从源码开始分析,对于一些不重要的源码我就直接删除了,只保留核心源码

public void execute(Runnable task) {
    /** * 判断Thread.currentThread()当前线程是否是与NioEventLoop绑定的本地线程; * 若是Thread.currentThread()== this.thread, 那么只用将execute()方法中的task添加到任务队列中就好; * 若是Thread.currentThread()== this.thread 返回false, 那就先调用startThread()方法启动本地线程,而后再将task添加到任务队列中. */
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();//启动 NioEventLoop
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

上面的addTask(task); 函数实际上也就是将task 任务入taskQueue 队列中。而后就是看startThread(); 是怎么启动 NioEventLoop.

private void startThread() {
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        //设置thread 的状态 this.state是 ST_STARTED 已启动
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            //真正启动线程的函数, 其做用相似于 thread.start()
            doStartThread();
        }
    }
}

再看doStartThread() 是怎么实现的, 这个函数实现依旧比较复杂,我只取出核心的业务逻辑:

private void doStartThread() {
    //启动线程以前,必须保证thread 是null,其实也就是thread尚未启动。
    assert thread == null;
    /** 经过executor启动一个新的task, 在task里面启动this.thread线程。*/
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // 1. 将当前thread 赋值给 this.thread 也就是将启动的线程赋值给本地绑定线程thread
            thread = Thread.currentThread();
            // 2. 其实是调用NioEventLoop.run() 方法实现 事件循环机制
            try {
                SingleThreadEventExecutor.this.run();//最核心所在,调用run()方法
                success = true;
            } catch (Throwable t) {//........
            } finally {
                //确保事件循环结束以后,关闭线程,清理资源。
                //...........
            }
        }//end run()
    });// end run() 这个task
}// end doStartThread method

thread的run() 函数

从上面可知,核心就是调用SingleThreadEventExecutor.this.run(); 这是一个抽象函数,在NioEventLoop实现了这个函数,下面依然只给出能说明核心思想的核心源码,不重要的源码被我删除了。

protected void run() {
    /** 死循环:NioEventLoop 事件循环的核心就是这里! */
    for (;;) {
        try {
            // 1.经过 select/selectNow 调用查询当前是否有就绪的 IO 事件
            // 当 selectStrategy.calculateStrategy() 返回的是 CONTINUE, 就结束此轮循环,进入下一轮循环;
            // 当返回的是 SELECT, 就表示任务队列为空,就调用select(Boolean);
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }// end switch

            //2. 当有IO事件就绪时, 就会处理这些IO事件
            cancelledKeys = 0;
            needsToSelectAgain = false;

            //ioRatio表示:此线程分配给IO操做所占的时间比(即运行processSelectedKeys耗时在整个循环中所占用的时间).
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    //查询就绪的 IO 事件, 而后处理它;
                    processSelectedKeys();
                } finally {
                    //运行 taskQueue 中的任务.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    //查询就绪的 IO 事件, 而后处理它;
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        //

    }
} //end of run()函数


上面函数中的一个死循环 for(;;) 就是NioEventLoop事件循环执行机制。死循环中的业务简单点就是:
(1)经过调用 select/selectNow 函数,等待 IO 事件;


(2)当有IO事件就绪时, 获取事件类型,分别处理这些IO事件,处理IO事件函数调用就是 processSelectedKeys();

下面对上面过程进行详解,分两步:IO事件轮询、IO事件的处理。

IO事件轮询

首先, 在 run() 方法中, 第一步是调用 hasTasks() 方法来判断当前任务队列中是否有任务:

protected boolean hasTasks() {
    assert inEventLoop();
    return !taskQueue.isEmpty();
}

这个方法很简单, 仅仅是检查了一下 taskQueue 是否为空. 至于 taskQueue 是什么呢, 其实它就是存放一系列的须要由此 EventLoop 所执行的任务列表. 关于 taskQueue, 咱们这里暂时不表, 等到后面再来详细分析它.

1)当 taskQueue 不为空时, hasTasks() 就会返回TRUE,那么selectStrategy.calculateStrategy() 的实现里面就会执行selectSupplier.get() 而get()方法里面会调用 selectNow(); 执行当即返回当前就绪的IO事件的个数,若是存在IO事件,那么在switch 语句中就会直接执行 default, 直接跳出switch语句,若是不存在,就是返回0, 对应于continue,忽略这次循环。

2)当taskQueue为空时,就会selectStrategy.calculateStrategy() 就会返回SelectStrategy.SELECT, 对用于switch case语句就是执行select()函数,阻塞等待IO事件就绪。

IO事件处理

在 NioEventLoop.run() 方法中, 第一步是经过 select/selectNow 调用查询当前是否有就绪的 IO 事件. 那么当有 IO 事件就绪时, 第二步天然就是处理这些 IO 事件啦.首先让咱们来看一下 NioEventLoop.run 中循环的剩余部分(核心部分):

final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
    processSelectedKeys();
    runAllTasks();
} else {
    final long ioStartTime = System.nanoTime();

    processSelectedKeys();

    final long ioTime = System.nanoTime() - ioStartTime;
    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}

上面列出的代码中, 有两个关键的调用, 第一个是 processSelectedKeys() 调用, 根据字面意思, 咱们能够猜出这个方法确定是查询就绪的 IO 事件, 而后处理它; 第二个调用是 runAllTasks(), 这个方法咱们也能够一眼就看出来它的功能就是运行 taskQueue 中的任务.

这里的代码还有一个十分有意思的地方, 即 ioRatio. 那什么是 ioRatio呢? 它表示的是此线程分配给 IO 操做所占的时间比(即运行 processSelectedKeys 耗时在整个循环中所占用的时间). 例如 ioRatio 默认是 50, 则表示 IO 操做和执行 task 的所占用的线程执行时间比是 1 : 1. 当知道了 IO 操做耗时和它所占用的时间比, 那么执行 task 的时间就能够很方便的计算出来了。

当咱们设置 ioRate = 70 时, 则表示 IO 运行耗时占比为70%, 即假设某次循环一共耗时为 100ms, 那么根据公式, 咱们知道 processSelectedKeys() 方法调用所耗时大概为70ms(即 IO 耗时), 而 runAllTasks() 耗时大概为 30ms(即执行 task 耗时).

当 ioRatio 为 100 时, Netty 就不考虑 IO 耗时的占比, 而是分别调用 processSelectedKeys()、runAllTasks(); 而当 ioRatio 不为 100时, 则执行到 else 分支, 在这个分支中, 首先记录下 processSelectedKeys() 所执行的时间(即 IO 操做的耗时), 而后根据公式, 计算出执行 task 所占用的时间, 而后以此为参数, 调用 runAllTasks().

咱们这里先分析一下 processSelectedKeys() 方法调用,源码以下:

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

这个方法中, 会根据 selectedKeys 字段是否为空, 而分别调用 processSelectedKeysOptimized 或 processSelectedKeysPlain. selectedKeys 字段是在调用 openSelector() 方法时, 根据 JVM 平台的不一样, 而有设置不一样的值, 在我所调试这个值是不为 null 的. 其实 processSelectedKeysOptimized 方法 processSelectedKeysPlain 没有太大的区别, 为了简单起见, 咱们以 processSelectedKeysOptimized 为例分析一下源码的工做流程吧.

private void processSelectedKeysOptimized() {
    //1. 迭代 selectedKeys 获取就绪的 IO 事件, 而后为每一个事件都调用 processSelectedKey 来处理它.
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;
        final Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            //2. 为事件调用processSelectedKey方法来处理 对应的事件
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}

其实你别看它代码挺多的, 可是关键的点就两个: 迭代 selectedKeys 获取就绪的 IO 事件, 而后为每一个事件都调用 processSelectedKey 来处理它。

还有一点须要注意的是, 咱们能够调用 selectionKey.attach(object) 给一个 selectionKey 设置一个附加的字段, 而后能够经过 Object attachedObj = selectionKey.attachment() 获取它. 上面代代码正是经过了 k.attachment() 来获取一个附加在 selectionKey 中的对象, 那么这个对象是什么呢? 它又是在哪里设置的呢? 咱们再来回忆一下 SocketChannel 是如何注册到 Selector 中的:
在客户端的 Channel 注册过程当中, 会有以下调用链:

Bootstrap.initAndRegister -> 
    AbstractBootstrap.initAndRegister -> 
        MultithreadEventLoopGroup.register -> 
            SingleThreadEventLoop.register -> 
                AbstractUnsafe.register ->
                    AbstractUnsafe.register0 ->
                        AbstractNioChannel.doRegister

最后的 AbstractNioChannel.doRegister 方法会调用 SocketChannel.register 方法注册一个 SocketChannel 到指定的 Selector:

@Override
protected void doRegister() throws Exception {
    // 省略错误处理
    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}

特别注意一下 register 的第三个参数, 这个参数是设置 selectionKey 的附加对象的, 和调用 selectionKey.attach(object) 的效果同样. 而调用 register 所传递的第三个参数是 this, 它其实就是一个 NioSocketChannel 的实例. 那么这里就很清楚了, 咱们在将 SocketChannel 注册到 Selector 中时, 将 SocketChannel 所对应的 NioSocketChannel 以附加字段的方式添加到了selectionKey 中.
再回到 processSelectedKeysOptimized 方法中, 当咱们获取到附加的对象后, 咱们就调用 processSelectedKey 来处理这个 IO 事件:

final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
    processSelectedKey(k, (AbstractNioChannel) a);
} else {
    @SuppressWarnings("unchecked")
    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
    processSelectedKey(k, task);
}

processSelectedKey 方法源码以下:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    //......
    //......
    try {
        int readyOps = k.readyOps();
        // 链接事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        //可写事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        //可读事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            // 这里就是核心中的核心了. 事件循环器读到了字节后, 就会将数据传递到pipeline
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

这个代码是否是很熟悉啊? 彻底是 Java NIO 的 Selector 的那一套处理流程嘛!
processSelectedKey 中处理了三个事件, 分别是:

OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取.
OP_WRITE, 可写事件, 即上层能够向 Channel 写入数据.
OP_CONNECT, 链接创建事件, 即 TCP 链接已经创建, Channel 处于 active 状态.
下面咱们分别根据这三个事件来看一下 Netty 是怎么处理的吧.

OP_READ 处理

当就绪的 IO 事件是 OP_READ, 代码会调用 unsafe.read() 方法, 即:

// 可读事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
    if (!ch.isOpen()) {
        // Connection already closed - no need to handle write.
        return;
    }
}

unsafe 这个字段, 咱们已经和它打了太多的交道了, 在分析Bootstrap 时咱们已经对它进行过浓墨重彩地分析了, 最后咱们肯定了它是一个 NioSocketChannelUnsafe 实例, 负责的是 Channel 的底层 IO 操做.
咱们能够利用 Intellij IDEA 提供的 Go To Implementations 功能, 寻找到这个方法的实现. 最后咱们发现这个方法没有在 NioSocketChannelUnsafe 中实现, 而是在它的父类 AbstractNioByteChannel 实现的, 它的实现源码以下:

@Override
public final void read() {
    ...
    ByteBuf byteBuf = null;
    int messages = 0;
    boolean close = false;
    try {
        int totalReadAmount = 0;
        boolean readPendingReset = false;
        do {
            byteBuf = allocHandle.allocate(allocator);
            int writable = byteBuf.writableBytes();
            int localReadAmount = doReadBytes(byteBuf);

            // 检查读取结果.
            ...

            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;

            ...

            totalReadAmount += localReadAmount;

            // 检查是不是配置了自动读取, 若是不是, 则当即退出循环.
            ...
        } while (++ messages < maxMessagesPerRead);

        pipeline.fireChannelReadComplete();
        allocHandle.record(totalReadAmount);

        if (close) {
            closeOnRead(pipeline);
            close = false;
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close);
    } finally {
    }
}

read() 源码比较长, 我为了篇幅起见, 删除了部分代码, 只留下了主干. 不过我建议读者朋友们本身必定要看一下 read() 源码, 这对理解 Netty 的 EventLoop 十分有帮助.
上面 read 方法其实概括起来, 能够认为作了以下工做:

  • 分配 ByteBuf
  • 从 SocketChannel 中读取数据;
  • 调用 pipeline.fireChannelRead 发送一个inbound 事件.

前面两点没什么好说的, 第三点 pipeline.fireChannelRead 读者朋友们看到了有没有会心一笑地感受呢? 反正我看到这里时是有的。 pipeline.fireChannelRead 正好就是 inbound 事件起点. 当调用了 pipeline.fireIN_EVT() 后, 那么就产生了一个 inbound 事件, 此事件会以 head -> customContext -> tail 的方向依次流经 ChannelPipeline 中的各个 handler.

调用了 pipeline.fireChannelRead 后, 就是 ChannelPipeline 中所须要作的工做了。

OP_WRITE 处理

OP_WRITE 可写事件代码以下. 这里代码比较简单, 没有详细分析的必要了.

if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
    ch.unsafe().forceFlush();
}

OP_CONNECT 处理

最后一个事件是 OP_CONNECT, 即 TCP 链接已创建事件.

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
    // See https://github.com/netty/netty/issues/924
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);

    unsafe.finishConnect();
}

OP_CONNECT 事件的处理中, 只作了两件事情:

  • 正如代码中的注释所言, 咱们须要将 OP_CONNECT 从就绪事件集中清除, 否则会一直有 OP_CONNECT 事件.
  • 调用 unsafe.finishConnect() 通知上层链接已创建 .

unsafe.finishConnect() 调用最后会调用到 pipeline().fireChannelActive(), 产生一个 inbound 事件, 通知 pipeline 中的各个 handler TCP 通道已创建(即 ChannelInboundHandler.channelActive 方法会被调用)

到了这里, 咱们整个 NioEventLoop 的 IO 操做部分已经了解完了, 接下来的一节咱们要重点分析一下 Netty 的任务队列机制.

2.5 netty的任务队列机制

咱们已经提到过, 在Netty 中, 一个 NioEventLoop 一般须要肩负起两种任务, 第一个是做为 IO 线程, 处理 IO 操做; 第二个就是做为任务线程, 处理 taskQueue 中的任务. 这一节的重点就是分析一下 NioEventLoop 的任务队列机制的.

任务的添加

普通 Runnable 任务

NioEventLoop 继承于 SingleThreadEventExecutor, 而 SingleThreadEventExecutor 中有一个 Queue taskQueue 字段, 用于存放添加的 Task. 在 Netty 中, 每一个 Task 都使用一个实现了 Runnable 接口的实例来表示.

例如当咱们须要将一个 Runnable 添加到 taskQueue 中时, 咱们能够进行以下操做:

EventLoop eventLoop = channel.eventLoop();
eventLoop.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("Hello, Netty!");
    }
});

当调用 execute 后, 其实是调用到了 SingleThreadEventExecutor.execute() 方法, 它的实现以下:

@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

而添加任务的 addTask 方法的源码以下:

protected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (isShutdown()) {
        reject();
    }
    taskQueue.add(task);
}

所以实际上, taskQueue 是存放着待执行的任务的队列。

schedule 任务

除了经过 execute 添加普通的 Runnable 任务外, 咱们还能够经过调用 eventLoop.scheduleXXX 之类的方法来添加一个定时任务.

EventLoop 中实现任务队列的功能在超类 SingleThreadEventExecutor 实现的, 而 schedule 功能的实现是在 SingleThreadEventExecutor 的父类, 即 AbstractScheduledEventExecutor 中实现的.

在 AbstractScheduledEventExecutor 中, 有以 scheduledTaskQueue 字段:

Queue<ScheduledFutureTask<?>> scheduledTaskQueue;

scheduledTaskQueue 是一个队列(Queue), 其中存放的元素是 ScheduledFutureTask. 而 ScheduledFutureTask 咱们很容易猜到, 它是对 Schedule 任务的一个抽象.

咱们来看一下 AbstractScheduledEventExecutor 所实现的 schedule 方法吧:

@Override
public  ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    ObjectUtil.checkNotNull(command, "command");
    ObjectUtil.checkNotNull(unit, "unit");
    if (delay < 0) {
        throw new IllegalArgumentException(
                String.format("delay: %d (expected: >= 0)", delay));
    }
    return schedule(new ScheduledFutureTask<Void>(
            this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}

这是其中一个重载的 schedule, 当一个 Runnable 传递进来后, 会被封装为一个 ScheduledFutureTask 对象, 这个对象会记录下这个 Runnable 在什么时候运行、已何种频率运行等信息.
当构建了 ScheduledFutureTask 后, 会继续调用 另外一个重载的 schedule 方法:

<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if (inEventLoop()) {
        scheduledTaskQueue().add(task);
    } else {
        execute(new OneTimeTask() {
            @Override
            public void run() {
                scheduledTaskQueue().add(task);
            }
        });
    }

    return task;
}

在这个方法中, ScheduledFutureTask 对象就会被添加到 scheduledTaskQueue 中了。

任务的执行

当一个任务被添加到 taskQueue 后, 它是怎么被 EventLoop 执行的呢?

让咱们回到 NioEventLoop.run() 方法中, 在这个方法里, 会分别调用 processSelectedKeys() 和 runAllTasks() 方法, 来进行 IO 事件的处理和 task 的处理. processSelectedKeys() 方法咱们已经分析过了, 下面咱们来看一下 runAllTasks() 中到底有什么名堂吧。

runAllTasks 方法有两个重载的方法, 一个是无参数的, 另外一个有一个参数的. 首先来看一下无参数的 runAllTasks:

protected boolean runAllTasks() {
    fetchFromScheduledTaskQueue();
    Runnable task = pollTask();
    if (task == null) {
        return false;
    }

    for (;;) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception.", t);
        }

        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            return true;
        }
    }
}

咱们前面已经提到过, EventLoop 能够经过调用 EventLoop.execute 来将一个 Runnable 提交到 taskQueue 中, 也能够经过调用 EventLoop.schedule 来提交一个 schedule 任务到 scheduledTaskQueue 中. 在此方法的一开始调用的

fetchFromScheduledTaskQueue() 其实就是将 scheduledTaskQueue 中已经能够执行的(即定时时间已到的 schedule 任务) 拿出来并添加到 taskQueue 中, 做为可执行的 task 等待被调度执行,源码以下:

private void fetchFromScheduledTaskQueue() {
    if (hasScheduledTasks()) {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        for (;;) {
            Runnable scheduledTask = pollScheduledTask(nanoTime);
            if (scheduledTask == null) {
                break;
            }
            taskQueue.add(scheduledTask);
        }
    }
}

接下来 runAllTasks() 方法就会不断调用 task = pollTask() 从 taskQueue 中获取一个可执行的 task, 而后调用它的 run() 方法来运行此 task.

注意, 由于 EventLoop 既须要执行 IO 操做, 又须要执行 task, 所以咱们在调用 EventLoop.execute 方法提交任务时, 不要提交耗时任务, 更不能提交一些会形成阻塞的任务, 否则会致使咱们的 IO 线程得不到调度, 影响整个程序的并发量。

这里也是为何用咱们本身的线程池隔离一些可能阻塞的业务。