netty源码分析之服务端启动全解析

background

netty 是一个异步事件驱动的网络通讯层框架,其官方文档的解释为html

Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.java

咱们在新美大消息推送系统sailfish(日均推送消息量50亿),新美大移动端代理优化系统shark(日均吞吐量30亿)中,均选择了netty做为底层网络通讯框架。react

既然两大如此重要的系统底层都使用到了netty,因此必然要对netty的机制,甚至源码了若指掌,因而,便催生了netty源码系列文章。后面,我会经过一系列的主题把我从netty源码里所学到的毫无保留地介绍给你,源码基于4.1.6.Final编程

why netty

netty底层基于jdk的NIO,咱们为何不直接基于jdk的nio或者其余nio框架?下面是我总结出来的缘由bootstrap

1.使用jdk自带的nio须要了解太多的概念,编程复杂 2.netty底层IO模型随意切换,而这一切只须要作微小的改动 3.netty自带的拆包解包,异常检测等机制让你从nio的繁重细节中脱离出来,让你只须要关心业务逻辑 4.netty解决了jdk的不少包括空轮训在内的bug 5.netty底层对线程,selector作了不少细小的优化,精心设计的reactor线程作到很是高效的并发处理 6.自带各类协议栈让你处理任何一种通用协议都几乎不用亲自动手 7.netty社区活跃,遇到问题随时邮件列表或者issue 8.netty已经历各大rpc框架,消息中间件,分布式通讯中间件线上的普遍验证,健壮性无比强大promise

dive into netty

了解了这么多,今天咱们就从一个例子出来,开始咱们的netty源码之旅。服务器

本篇主要讲述的是netty是如何绑定端口,启动服务。启动服务的过程当中,你将会了解到netty各大核心组件,我先不会细讲这些组件,而是会告诉你各大组件是怎么串起来组成netty的核心微信

example

下面是一个很是简单的服务端启动代码网络

public final class SimpleServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new SimpleServerHandler())
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                        }
                    });

            ChannelFuture f = b.bind(8888).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelActive");
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelRegistered");
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println("handlerAdded");
        }
    }
}
复制代码

简单的几行代码就能开启一个服务端,端口绑定在8888,使用nio模式,下面讲下每个步骤的处理细节并发

EventLoopGroup 已经在个人其余文章中详细剖析过,说白了,就是一个死循环,不停地检测IO事件,处理IO事件,执行任务

ServerBootstrap 是服务端的一个启动辅助类,经过给他设置一系列参数来绑定端口启动服务

group(bossGroup, workerGroup) 咱们须要两种类型的人干活,一个是老板,一个是工人,老板负责从外面接活,接到的活分配给工人干,放到这里,bossGroup的做用就是不断地accept到新的链接,将新的链接丢给workerGroup来处理

.channel(NioServerSocketChannel.class) 表示服务端启动的是nio相关的channel,channel在netty里面是一大核心概念,能够理解为一条channel就是一个链接或者一个服务端bind动做,后面会细说

.handler(new SimpleServerHandler() 表示服务器启动过程当中,须要通过哪些流程,这里SimpleServerHandler最终的顶层接口为ChannelHander,是netty的一大核心概念,表示数据流通过的处理器,能够理解为流水线上的每一道关卡

childHandler(new ChannelInitializer<SocketChannel>)...表示一条新的链接进来以后,该怎么处理,也就是上面所说的,老板如何给工人配活

ChannelFuture f = b.bind(8888).sync(); 这里就是真正的启动过程了,绑定8888端口,等待服务器启动完毕,才会进入下行代码

f.channel().closeFuture().sync(); 等待服务端关闭socket

bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); 关闭两组死循环

上述代码能够很轻松地再本地跑起来,最终控制台的输出为:

handlerAdded
channelRegistered
channelActive
复制代码

关于为何会顺序输出这些,深刻分析以后其实很easy

深刻细节

ServerBootstrap 一系列的参数配置其实没啥好讲的,无非就是使用method chaining的方式将启动服务器须要的参数保存到filed。咱们的重点落入到下面这段代码

b.bind(8888).sync();
复制代码

这里说一句:咱们刚开始看源码,对细节没那么清楚的状况下能够借助IDE的debug功能,step by step,one step one test或者二分test的方式,来肯定哪行代码是最终启动服务的入口,在这里,咱们已经肯定了bind方法是入口,咱们跟进去,分析

public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
} 
复制代码

经过端口号建立一个 InetSocketAddress,而后继续bind

public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}
复制代码

validate() 验证服务启动须要的必要参数,而后调用doBind()

private ChannelFuture doBind(final SocketAddress localAddress) {
    //...
    final ChannelFuture regFuture = initAndRegister();
    //...
    final Channel channel = regFuture.channel();
    //...
    doBind0(regFuture, channel, localAddress, promise);
    //...
    return promise;
}
复制代码

这里,我去掉了细枝末节,让咱们专一于核心方法,其实就两大核心一个是 initAndRegister(),以及doBind0()

其实,从方法名上面咱们已经能够略窥一二,init->初始化,register->注册,那么到底要注册到什么呢?联系到nio里面轮询器的注册,多是把某个东西初始化好了以后注册到selector上面去,最后bind,像是在本地绑定端口号,带着这些猜想,咱们深刻下去

initAndRegister()

final ChannelFuture initAndRegister() {
    Channel channel = null;
    // ...
    channel = channelFactory.newChannel();
    //...
    init(channel);
    //...
    ChannelFuture regFuture = config().group().register(channel);
    //...
    return regFuture;
}
复制代码

咱们仍是专一于核心代码,抛开边角料,咱们看到 initAndRegister() 作了几件事情 1.new一个channel 2.init这个channel 3.将这个channel register到某个对象

咱们逐步分析这三件事情

1.new一个channel

咱们首先要搞懂channel的定义,netty官方对channel的描述以下

A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind

这里的channel,因为是在服务启动的时候建立,咱们能够和普通Socket编程中的ServerSocket对应上,表示服务端绑定的时候通过的一条流水线

咱们发现这条channel是经过一个 channelFactory new出来的,channelFactory 的接口很简单

public interface ChannelFactory<T extends Channel> extends io.netty.bootstrap.ChannelFactory<T> {
    /** * Creates a new channel. */
    @Override
    T newChannel();
}
复制代码

就一个方法,咱们查看channelFactory被赋值的地方

AbstractBootstrap.java

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    }
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }

    this.channelFactory = channelFactory;
    return (B) this;
}
复制代码

在这里被赋值,咱们层层回溯,查看该函数被调用的地方,发现最终是在这个函数中,ChannelFactory被new出

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
复制代码

这里,咱们的demo程序调用channel(channelClass)方法的时候,将channelClass做为ReflectiveChannelFactory的构造函数建立出一个ReflectiveChannelFactory

demo端的代码以下:

.channel(NioServerSocketChannel.class);
复制代码

而后回到本节最开始

channelFactory.newChannel();
复制代码

咱们就能够推断出,最终是调用到 ReflectiveChannelFactory.newChannel() 方法,跟进

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }
}
复制代码

看到clazz.newInstance();,咱们明白了,原来是经过反射的方式来建立一个对象,而这个class就是咱们在ServerBootstrap中传入的NioServerSocketChannel.class

结果,绕了一圈,最终建立channel至关于调用默认构造函数new出一个 NioServerSocketChannel对象

这里提一下,读源码细节,有两种读的方式,一种是回溯,好比用到某个对象的时候能够逐层追溯,必定会找到该对象的最开始被建立的代码区块,还有一种方式就是自顶向下,逐层分析,通常用在分析某个具体的方法,庖丁解牛,最后拼接出完整的流程

接下来咱们就能够将重心放到 NioServerSocketChannel的默认构造函数

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
复制代码
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    //...
    return provider.openServerSocketChannel();
}
复制代码

经过SelectorProvider.openServerSocketChannel()建立一条server端channel,而后进入到如下方法

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
复制代码

这里第一行代码就跑到父类里面去了,第二行,new出来一个 NioServerSocketChannelConfig,其顶层接口为 ChannelConfig,netty官方的描述以下

A set of configuration properties of a Channel.

基本能够断定,ChannelConfig 也是netty里面的一大核心模块,初次看源码,看到这里,咱们大可没必要深挖这个对象,而是在用到的时候再回来深究,只要记住,这个对象在建立NioServerSocketChannel对象的时候被建立便可

咱们继续追踪到 NioServerSocketChannel 的父类

AbstractNioMessageChannel.java

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}
复制代码

继续往上追

AbstractNioChannel.java

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    //...
    ch.configureBlocking(false);
    //...
}
复制代码

这里,简单地将前面 provider.openServerSocketChannel(); 建立出来的 ServerSocketChannel 保存到成员变量,而后调用ch.configureBlocking(false);设置该channel为非阻塞模式,标准的jdk nio编程的玩法

这里的 readInterestOp 即前面层层传入的 SelectionKey.OP_ACCEPT,接下来重点分析 super(parent);(这里的parent实际上是null,由前面写死传入)

AbstractChannel.java

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
复制代码

到了这里,又new出来三大组件,赋值到成员变量,分别为

id = newId();
protected ChannelId newId() {
    return DefaultChannelId.newInstance();
}
复制代码

id是netty中每条channel的惟一标识,这里不细展开,接着

unsafe = newUnsafe();
protected abstract AbstractUnsafe newUnsafe();
复制代码

查看Unsafe的定义

Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport, and must be invoked from an I/O thread

成功捕捉netty的又一大组件,咱们能够先不用管TA是干吗的,只须要知道这里的 newUnsafe方法最终属于类NioServerSocketChannel

最后

pipeline = newChannelPipeline();

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
}

复制代码

初次看这段代码,可能并不知道 DefaultChannelPipeline 是干吗用的,咱们仍然使用上面的方式,查看顶层接口ChannelPipeline的定义

A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel

从该类的文档中能够看出,该接口基本上又是netty的一大核心模块

到了这里,咱们总算把一个服务端channel建立完毕了,将这些细节串起来的时候,咱们顺带提取出netty的几大基本组件,先总结以下

  • Channel
  • ChannelConfig
  • ChannelId
  • Unsafe
  • Pipeline
  • ChannelHander

初次看代码的时候,咱们的目标是跟到服务器启动的那一行代码,咱们先把以上这几个组件记下来,等代码跟完,咱们就能够自顶向下,逐层分析,我会放到后面源码系列中去深刻到每一个组件

总结一下,用户调用方法 Bootstrap.bind(port) 第一步就是经过反射的方式new一个NioServerSocketChannel对象,而且在new的过程当中建立了一系列的核心组件,仅此而已,并没有他,真正的启动咱们还须要继续跟

2.init这个channel

到了这里,你最好跳到文章最开始的地方回忆一下,第一步newChannel完毕,这里就对这个channel作init,init方法具体干啥,咱们深刻

@Override
void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        channel.config().setOptions(options);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

复制代码

初次看到这个方法,可能会以为,哇塞,老长了,这可这么看?还记得咱们前面所说的吗,庖丁解牛,逐步拆解,最后归一,下面是个人拆解步骤

1.设置option和attr

final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        channel.config().setOptions(options);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
复制代码

经过这里咱们能够看到,这里先调用options0()以及attrs0(),而后将获得的options和attrs注入到channelConfig或者channel中,关于option和attr是干吗用的,其实你如今不用了解得那么深刻,只须要查看最顶层接口ChannelOption以及查看一下channel的具体继承关系,就能够了解,我把这两个也放到后面的源码分析系列再讲

2.设置新接入channel的option和attr

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
    currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
    currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
复制代码

这里,和上面相似,只不过不是设置当前channel的这两个属性,而是对应到新进来链接对应的channel,因为咱们这篇文章只关心到server如何启动,接入链接放到下一篇文章中详细剖析

3.加入新链接处理器

p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
复制代码

到了最后一步,p.addLast()向serverChannel的流水线处理器中加入了一个 ServerBootstrapAcceptor,从名字上就能够看出来,这是一个接入器,专门接受新请求,把新的请求扔给某个事件循环器,咱们先不作过多分析

来,咱们总结一下,咱们发现其实init也没有启动服务,只是初始化了一些基本的配置和属性,以及在pipeline上加入了一个接入器,用来专门接受新链接,咱们还得继续往下跟

3.将这个channel register到某个对象

这一步,咱们是分析以下方法

ChannelFuture regFuture = config().group().register(channel);
复制代码

调用到 NioEventLoop 中的register

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}
复制代码
@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}
复制代码

好了,到了这一步,还记得这里的unsafe()返回的应该是什么对象吗?不记得的话能够看下前面关于unsafe的描述,或者最快的方式就是debug到这边,跟到register方法里面,看看是哪一种类型的unsafe

咱们跟进去以后发现是

AbstractUnsafe.java

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // ...
    AbstractChannel.this.eventLoop = eventLoop;
    // ...
    register0(promise);
}
复制代码

这里咱们依然只须要focus重点,先将EventLoop事件循环器绑定到该NioServerSocketChannel上,而后调用 register0()

private void register0(ChannelPromise promise) {
    try {
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}
复制代码

这一段其实也很清晰,先调用 doRegister();,具体干啥待会再讲,而后调用invokeHandlerAddedIfNeeded(), 因而乎,控制台第一行打印出来的就是

handlerAdded
复制代码

关于最终是如何调用到的,咱们后面详细剖析pipeline的时候再讲

而后调用 pipeline.fireChannelRegistered(); 调用以后,控制台的显示为

handlerAdded
channelRegistered
复制代码

继续往下跟

if (isActive()) {
    if (firstRegistration) {
        pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
        beginRead();
    }
}
复制代码

读到这,你可能会想固然地觉得,控制台最后一行

pipeline.fireChannelActive();
复制代码

由这行代码输出,咱们不妨先看一下 isActive() 方法

@Override
public boolean isActive() {
    return javaChannel().socket().isBound();
}
复制代码

最终调用到jdk中

ServerSocket.java

/** * Returns the binding state of the ServerSocket. * * @return true if the ServerSocket succesfuly bound to an address * @since 1.4 */
    public boolean isBound() {
        // Before 1.3 ServerSockets were always bound during creation
        return bound || oldImpl;
    }
复制代码

这里isBound()返回false,可是从目前咱们跟下来的流程看,咱们并无将一个ServerSocket绑定到一个address,因此 isActive() 返回false,咱们没有成功进入到pipeline.fireChannelActive();方法,那么最后一行究竟是谁输出的呢,咱们有点抓狂,其实,只要熟练运用IDE,要定位函数调用栈,无比简单

下面是我用intellij定位函数调用的具体方法

Intellij函数调用定位

咱们先在最终输出文字的这一行代码处打一个断点,而后debug,运行到这一行,intellij自动给咱们拉起了调用栈,咱们惟一要作的事,就是移动方向键,就能看到函数的完整的调用链

若是你看到方法的最近的发起端是一个线程Runnable的run方法,那么就在提交Runnable对象方法的地方打一个断点,去掉其余断点,从新debug,好比咱们首次debug发现调用栈中的最近的一个Runnable以下

if (!wasActive && isActive()) {
    invokeLater(new Runnable() {
        @Override
        public void run() {
            pipeline.fireChannelActive();
        }
    });
}
复制代码

咱们停在了这一行pipeline.fireChannelActive();, 咱们想看最初始的调用,就得跳出来,断点打到 if (!wasActive && isActive()),由于netty里面不少任务执行都是异步线程即reactor线程调用的(具体能够看reactor线程三部曲中的最后一曲),若是咱们要查看最早发起的方法调用,咱们必须得查看Runnable被提交的地方,逐次递归下去,就能找到那行"消失的代码"

最终,经过这种方式,终于找到了 pipeline.fireChannelActive(); 的发起调用的代码,不巧,恰好就是下面的doBind0()方法

doBind0()

private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
复制代码

咱们发现,在调用doBind0(...)方法的时候,是经过包装一个Runnable进行异步化的,关于异步化task,能够看下我前面的文章,netty源码分析之揭开reactor线程的面纱(三)

好,接下来咱们进入到channel.bind()方法

AbstractChannel.java

@Override
public ChannelFuture bind(SocketAddress localAddress) {
    return pipeline.bind(localAddress);
}
复制代码

发现是调用pipeline的bind方法

@Override
public final ChannelFuture bind(SocketAddress localAddress) {
    return tail.bind(localAddress);
}
复制代码

相信你对tail是什么不是很了解,能够翻到最开始,tail在建立pipeline的时候出现过,关于pipeline和tail对应的类,我后面源码系列会详细解说,这里,你要想知道接下来代码的走向,惟一一个比较好的方式就是debug 单步进入,篇幅缘由,我就不详细展开

最后,咱们来到了以下区域

HeadContext.java

@Override
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
    unsafe.bind(localAddress, promise);
}
复制代码

这里的unsafe就是前面提到的 AbstractUnsafe, 准确点,应该是 NioMessageUnsafe

咱们进入到它的bind方法

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    // ...
    boolean wasActive = isActive();
    // ...
    doBind(localAddress);

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);
}
复制代码

显然按照正常流程,咱们前面已经分析到 isActive(); 方法返回false,进入到 doBind()以后,若是channel被激活了,就发起pipeline.fireChannelActive();调用,最终调用到用户方法,在控制台打印出了最后一行,因此到了这里,你应该清楚为何最终会在控制台按顺序打印出那三行字了吧

doBind()方法也很简单

protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        //noinspection Since15
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}
复制代码

最终调到了jdk里面的bind方法,这行代码事后,正常状况下,就真正进行了端口的绑定。

另外,经过自顶向下的方式分析,在调用pipeline.fireChannelActive();方法的时候,会调用到以下方法

HeadContext.java

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();

    readIfIsAutoRead();
}
复制代码

进入 readIfIsAutoRead

private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        channel.read();
    }
}
复制代码

分析isAutoRead方法

private volatile int autoRead = 1;
public boolean isAutoRead() {
    return autoRead == 1;
}
复制代码

因而可知,isAutoRead方法默认返回true,因而进入到如下方法

public Channel read() {
    pipeline.read();
    return this;
}
复制代码

最终调用到

AbstractNioUnsafe.java

protected void doBeginRead() throws Exception {
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
复制代码

这里的this.selectionKey就是咱们在前面register步骤返回的对象,前面咱们在register的时候,注册测ops是0

回忆一下注册

AbstractNioChannel

selectionKey = javaChannel().register(eventLoop().selector, 0, this)
复制代码

这里至关于把注册过的ops取出来,经过了if条件,而后调用

selectionKey.interestOps(interestOps | readInterestOp);
复制代码

而这里的 readInterestOp 就是前面newChannel的时候传入的SelectionKey.OP_ACCEPT,又是标准的jdk nio的玩法,到此,你须要了解的细节基本已经差很少了,就这样结束吧!

summary

最后,咱们来作下总结,netty启动一个服务所通过的流程 1.设置启动类参数,最重要的就是设置channel 2.建立server对应的channel,建立各大组件,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等 3.初始化server对应的channel,设置一些attr,option,以及设置子channel的attr,option,给server的channel添加新channel接入器,并出发addHandler,register等事件 4.调用到jdk底层作端口绑定,并触发active事件,active触发的时候,真正作服务端口绑定

另外,文章中阅读源码的思路详细或许也能够给你带来一些帮助。

若是你想系统地学Netty,个人小册《Netty 入门与实战:仿写微信 IM 即时通信系统》能够帮助你,若是你想系统学习Netty原理,那么你必定不要错过个人Netty源码分析系列视频:coding.imooc.com/class/230.h…