netty源码分析之新链接接入全解析

本文收获

通读本文,你会了解到html

  1. netty如何接受新的请求
  2. netty如何给新请求分配reactor线程
  3. netty如何给每一个新链接增长ChannelHandler

其实,远不止这些~java

前序背景

读这篇文章以前,最好掌握一些前序知识,包括netty中的reactor线程,以及服务端启动过程 下面我带你简单地回顾一下react

1.netty中的reactor线程

netty中最核心的东西莫过于两种类型的reactor线程,能够看做netty中两种类型的发动机,驱动着netty整个框架的运转编程

一种类型的reactor线程是boos线程组,专门用来接受新的链接,而后封装成channel对象扔给worker线程组;还有一种类型的reactor线程是worker线程组,专门用来处理链接的读写promise

不论是boos线程仍是worker线程,所作的事情均分为如下三个步骤微信

  1. 轮询注册在selector上的IO事件
  2. 处理IO事件
  3. 执行异步task

对于boos线程来讲,第一步轮询出来的基本都是 accept 事件,表示有新的链接,而worker线程轮询出来的基本都是read/write事件,表示网络的读写事件网络

2.服务端启动

服务端启动过程是在用户线程中开启,第一次添加异步任务的时候启动boos线程被启动,netty将处理新链接的过程封装成一个channel,对应的pipeline会按顺序处理新创建的链接(关于pipeline我后面会开篇详细分析)框架

了解完两个背景,咱们开始进入正题异步

新链接的创建

简单来讲,新链接的创建能够分为三个步骤socket

  1. 检测到有新的链接
  2. 将新的链接注册到worker线程组
  3. 注册新链接的读事件

下面带你庖丁解牛,一步步分析整个过程

检测到有新链接进入

咱们已经知道,当服务端绑启动以后,服务端的channel已经注册到boos reactor线程中,reactor不断检测有新的事件,直到检测出有accept事件发生

NioEventLoop.java

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    int readyOps = k.readyOps();
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
    }
}
复制代码

上面这段代码是reactor线程三部曲中的第二部曲,表示boos reactor线程已经轮询到 SelectionKey.OP_ACCEPT 事件,说明有新的链接进入,此时将调用channel的 unsafe来进行实际的操做

关于 unsafe,这篇文章我不打算细讲,下面是netty做者对于unsafe的解释

Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport.

你只须要了解一个大概的概念,就是全部的channel底层都会有一个与unsafe绑定,每种类型的channel实际的操做都由unsafe来实现

而从上一篇文章,服务端的启动过程中,咱们已经知道,服务端对应的channel的unsafe是 NioMessageUnsafe,那么,咱们进入到它的read方法,进入新链接处理的第二步

注册到reactor线程

NioMessageUnsafe.java

private final List<Object> readBuf = new ArrayList<Object>();

public void read() {
    assert eventLoop().inEventLoop();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    do {
        int localRead = doReadMessages(readBuf);
        if (localRead == 0) {
            break;
        }
        if (localRead < 0) {
            closed = true;
            break;
        }
    } while (allocHandle.continueReading());
    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
        pipeline.fireChannelRead(readBuf.get(i));
    }
    readBuf.clear();
    pipeline.fireChannelReadComplete();
}
复制代码

我省去了非关键部分的代码,能够看到,一上来,就用一条断言肯定该read方法必须是reactor线程调用,而后拿到channel对应的pipeline和 RecvByteBufAllocator.Handle(先不解释)

接下来,调用 doReadMessages 方法不断地读取消息,用 readBuf 做为容器,这里,其实能够猜到读取的是一个个链接,而后调用 pipeline.fireChannelRead(),将每条新链接通过一层服务端channel的洗礼

以后清理容器,触发 pipeline.fireChannelReadComplete(),整个过程清晰明了,不含一丝杂质,下面咱们具体看下这两个方法

1.doReadMessages(List) 2.pipeline.fireChannelRead(NioSocketChannel)

1.doReadMessages()

protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}
复制代码

咱们终于窥探到netty调用jdk底层nio的边界 javaChannel().accept();,因为netty中reactor线程第一步就扫描到有accept事件发生,所以,这里的accept方法是当即返回的,返回jdk底层nio建立的一条channel

netty将jdk的 SocketChannel 封装成自定义的 NioSocketChannel,加入到list里面,这样外层就能够遍历该list,作后续处理

上篇文章中,咱们已经知道服务端的建立过程当中会建立netty中一系列的核心组件,包括pipeline,unsafe等等,那么,接受一条新链接的时候是否也会建立这一系列的组件呢?

带着这个疑问,咱们跟进去

NioSocketChannel.java

public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}
复制代码

咱们重点分析 super(parent, socket),config相关的分析咱们放到后面的文章中

NioSocketChannel的父类为 AbstractNioByteChannel

AbstractNioByteChannel.java

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}
复制代码

这里,咱们看到jdk nio里面熟悉的影子—— SelectionKey.OP_READ,通常在原生的jdk nio编程中,也会注册这样一个事件,表示对channel的读感兴趣

咱们继续往上,追踪到AbstractNioByteChannel的父类 AbstractNioChannel, 这里,我相信读了上篇文章的你对于这部分代码确定是有印象的

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }
        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}
复制代码

在建立服务端channel的时候,最终也会进入到这个方法,super(parent), 即是在AbstractChannel中建立一系列和该channel绑定的组件,以下

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
复制代码

而这里的 readInterestOp 表示该channel关心的事件是 SelectionKey.OP_READ,后续会将该事件注册到selector,以后设置该通道为非阻塞模式

到了这里,我终于能够将netty里面最经常使用的channel的结构图放给你看

简化版channel继承关系

这里的继承关系有所简化,当前,咱们只须要了解这么多。

首先

  1. channel 继承 Comparable 表示channel是一个能够比较的对象
  2. channel 继承AttributeMap表示channel是能够绑定属性的对象,在用户代码中,咱们常用channel.attr(...)方法就是来源于此
  3. ChannelOutboundInvoker是4.1.x版本新加的抽象,表示一条channel能够进行的操做
  4. DefaultAttributeMap用于AttributeMap抽象的默认方法,后面channel继承了直接使用
  5. AbstractChannel用于实现channel的大部分方法,其中咱们最熟悉的就是其构造函数中,建立出一条channel的基本组件
  6. AbstractNioChannel基于AbstractChannel作了nio相关的一些操做,保存jdk底层的 SelectableChannel,而且在构造函数中设置channel为非阻塞
  7. 最后,就是两大channel,NioServerSocketChannel,NioSocketChannel对应着服务端接受新链接过程和新链接读写过程

读到这,关于channel的总体框架你基本已经了解了一大半了

好了,让咱们退栈,继续以前的源码分析,在建立出一条 NioSocketChannel以后,放置在List容器里面以后,就开始进行下一步操做

2.pipeline.fireChannelRead(NioSocketChannel)

AbstractNioMessageChannel.java

pipeline.fireChannelRead(NioSocketChannel);
复制代码

在没有正式介绍pipeline以前,请让我简单介绍一下pipeline这个组件

在netty的各类类型的channel中,都会包含一个pipeline,字面意思是管道,咱们能够理解为一条流水线工艺,流水线工艺有起点,有结束,中间还有各类各样的流水线关卡,一件物品,在流水线起点开始处理,通过各个流水线关卡的加工,最终到流水线结束

对应到netty里面,流水线的开始就是HeadContxt,流水线的结束就是TailConextHeadContxt中调用Unsafe作具体的操做,TailConext中用于向用户抛出pipeline中未处理异常以及对未处理消息的警告,关于pipeline的具体分析咱们后面再详细探讨

经过前面一篇文章,咱们已经知道在服务端处理新链接的pipeline中,已经自动添加了一个pipeline处理器 ServerBootstrapAcceptor, 并已经将用户代码中设置的一系列的参数传入了构造函数,接下来,咱们就来看下ServerBootstrapAcceptor

ServerBootstrapAcceptor.java

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    private final EventLoopGroup childGroup;
    private final ChannelHandler childHandler;
    private final Entry<ChannelOption<?>, Object>[] childOptions;
    private final Entry<AttributeKey<?>, Object>[] childAttrs;

    ServerBootstrapAcceptor(
            EventLoopGroup childGroup, ChannelHandler childHandler,
            Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
        this.childGroup = childGroup;
        this.childHandler = childHandler;
        this.childOptions = childOptions;
        this.childAttrs = childAttrs;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;

        child.pipeline().addLast(childHandler);

        for (Entry<ChannelOption<?>, Object> e: childOptions) {
            try {
                if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            } catch (Throwable t) {
                logger.warn("Failed to set a channel option: " + child, t);
            }
        }

        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }

        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
复制代码

前面的 pipeline.fireChannelRead(NioSocketChannel); 最终经过head->unsafe->ServerBootstrapAcceptor的调用链,调用到这里的 ServerBootstrapAcceptorchannelRead方法

channelRead 一上来就把这里的msg强制转换为 Channel, 为何这里能够强制转换?读者能够思考一下

而后,拿到该channel,也就是咱们以前new出来的 NioSocketChannel对应的pipeline,将用户代码中的 childHandler,添加到pipeline,这里的 childHandler 在用户代码中的体现为

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new EchoServerHandler());
     }
 });
复制代码

其实对应的是 ChannelInitializer,到了这里,NioSocketChannel中pipeline对应的处理器为 head->ChannelInitializer->tail,牢记,后面会再次提到!

接着,设置 NioSocketChannel 对应的 attr和option,而后进入到 childGroup.register(child),这里的childGroup就是咱们在启动代码中new出来的NioEventLoopGroup,具体能够参考这篇文章

咱们进入到NioEventLoopGroupregister方法,代理到其父类MultithreadEventLoopGroup

MultithreadEventLoopGroup.java

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
复制代码

这里又扯出来一个 next()方法,咱们跟进去

MultithreadEventLoopGroup.java

@Override
public EventLoop next() {
    return (EventLoop) super.next();
}
复制代码

回到其父类

MultithreadEventExecutorGroup.java

@Override
public EventExecutor next() {
    return chooser.next();
}
复制代码

这里的chooser对应的类为 EventExecutorChooser,字面意思为事件执行器选择器,放到咱们这里的上下文中的做用就是从worker reactor线程组中选择一个reactor线程

public interface EventExecutorChooserFactory {

    /** * Returns a new {@link EventExecutorChooser}. */
    EventExecutorChooser newChooser(EventExecutor[] executors);

    /** * Chooses the next {@link EventExecutor} to use. */
    @UnstableApi
    interface EventExecutorChooser {

        /** * Returns the new {@link EventExecutor} to use. */
        EventExecutor next();
    }
}
复制代码

关于chooser的具体建立我不打算展开,相信前面几篇文章中的源码阅读技巧能够帮助你找出choose的始末,这里,我直接告诉你(可是劝你仍是自行分析一下,简单得很),chooser的实现有两种

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTowEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}
复制代码

默认状况下,chooser经过 DefaultEventExecutorChooserFactory被建立,在建立reactor线程选择器的时候,会判断reactor线程的个数,若是是2的幂,就建立PowerOfTowEventExecutorChooser,不然,建立GenericEventExecutorChooser

两种类型的选择器在选择reactor线程的时候,都是经过Round-Robin的方式选择reactor线程,惟一不一样的是,PowerOfTowEventExecutorChooser是经过与运算,而GenericEventExecutorChooser是经过取余运算,与运算的效率要高于求余运算,可见,netty为了效率优化简直丧心病狂!

选择完一个reactor线程,即 NioEventLoop 以后,咱们回到注册的地方

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
复制代码

代理到 NioEventLoop 的父类的register方法

SingleThreadEventLoop.java

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}
复制代码

其实,这里已经和服务端启动的过程同样了,详细步骤能够参考服务端启动详解这篇文章,咱们直接跳到关键环节

AbstractNioChannel.java

private void register0(ChannelPromise promise) {
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;

    pipeline.invokeHandlerAddedIfNeeded();

    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    if (isActive()) {
        if (firstRegistration) {
            pipeline.fireChannelActive();
        } else if (config().isAutoRead()) {
            beginRead();
        }
    }
}
复制代码

和服务端启动过程同样,先是调用 doRegister();作真正的注册过程,以下

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}
复制代码

将该条channel绑定到一个selector上去,一个selector被一个reactor线程使用,后续该channel的事件轮询,以及事件处理,异步task执行都是由此reactor线程来负责

绑定完reactor线程以后,调用 pipeline.invokeHandlerAddedIfNeeded()

前面咱们说到,到目前为止NioSocketChannel 的pipeline中有三个处理器,head->ChannelInitializer->tail,最终会调用到 ChannelInitializerhandlerAdded 方法

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        initChannel(ctx);
    }
}
复制代码

handlerAdded方法调用 initChannel 方法以后,调用remove(ctx);将自身删除

AbstractNioChannel.java

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { 
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            remove(ctx);
        }
        return true;
    }
    return false;
}
复制代码

而这里的 initChannel 方法又是神马玩意?让咱们回到用户方法,好比下面这段用户代码

用户代码

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 100)
 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new LoggingHandler(LogLevel.INFO));
         p.addLast(new EchoServerHandler());
     }
 });
复制代码

哦,原来最终跑到咱们本身的代码里去了啊!我就不解释这段代码是干吗的了,你懂的~

完了以后,NioSocketChannel绑定的pipeline的处理器就包括 head->LoggingHandler->EchoServerHandler->tail

注册读事件

接下来,咱们还剩下这些代码没有分析完

AbstractNioChannel.java

private void register0(ChannelPromise promise) {
    // ..
    pipeline.fireChannelRegistered();
    if (isActive()) {
        if (firstRegistration) {
            pipeline.fireChannelActive();
        } else if (config().isAutoRead()) {
            beginRead();
        }
    }
}
复制代码

pipeline.fireChannelRegistered();,其实没有干啥有意义的事情,最终无非是再调用一下业务pipeline中每一个处理器的 ChannelHandlerAdded方法处理下回调

isActive()在链接已经创建的状况下返回true,因此进入方法块,进入到 pipeline.fireChannelActive();,这里的分析和netty源码分析之服务端启动全解析分析中的同样,在这里我详细步骤先省略,直接进入到关键环节

AbstractNioChannel.java

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
复制代码

你应该还记得前面 register0() 方法的时候,向selector注册的事件代码是0,而 readInterestOp对应的事件代码是 SelectionKey.OP_READ,参考前文中建立 NioSocketChannel 的过程,稍加推理,聪明的你就会知道,这里其实就是将 SelectionKey.OP_READ事件注册到selector中去,表示这条通道已经能够开始处理read事件了

总结

至此,netty中关于新链接的处理已经向你展现完了,咱们作下总结

  1. boos reactor线程轮询到有新的链接进入
  2. 经过封装jdk底层的channel建立 NioSocketChannel以及一系列的netty核心组件
  3. 将该条链接经过chooser,选择一条worker reactor线程绑定上去
  4. 注册读事件,开始新链接的读写

下篇文章将深挖netty中的核心组件 pipeline,敬请期待

若是你想系统地学Netty,个人小册《Netty 入门与实战:仿写微信 IM 即时通信系统》能够帮助你,若是你想系统学习Netty原理,那么你必定不要错过个人Netty源码分析系列视频:coding.imooc.com/class/230.h…