Jetty - Connector源码分析

1. 描述

基于Jetty-9.4.8.v20171121。java

Connector接受远程机器的链接和数据,容许应用向远程机器发送数据。api

1.2 类图

从类图看出AbstractConnector继承ContainerLifeCycle,因此具备Container和LifeCycle特性。数组

此外有一个ServerConnector,这个是整个Jetty中很重要的链接器,目前该链接器负责HTTP和HTTPS协议等链接。缓存

ConnectionFactory负责为链接器建立链接对象,不一样的链接(HTTP)建立不一样的链接对象。服务器

1.3 API能力

主要都是一些getter方法,获取该链接器相关的信息。异步

@ManagedObject("Connector Interface")
public interface Connector extends LifeCycle, Container, Graceful
{
    // 与这个链接器关联的服务器
    public Server getServer();

    // 返回执行任务的执行器
    public Executor getExecutor();

    // 返回调度任务的调度器
    public Scheduler getScheduler();

    // 数据缓冲区
    public ByteBufferPool getByteBufferPool();

    // 返回与协议名称对应的ConnectionFactory对象
    public ConnectionFactory getConnectionFactory(String nextProtocol);
    

    public <T> T getConnectionFactory(Class<T> factoryType);
    
    // 返回默认ConnectionFactory对象
    public ConnectionFactory getDefaultConnectionFactory();
    // 返回全部Connection工厂
    public Collection<ConnectionFactory> getConnectionFactories();
    
    public List<String> getProtocols();
    
    // 返回最大空闲链接时间
    @ManagedAttribute("maximum time a connection can be idle before being closed (in ms)")
    public long getIdleTimeout();

    // 返回这个对象底层的socket,channel,buffer等
    public Object getTransport();
    
    /**
     * @return immutable collection of connected endpoints
     */
    // 返回链接端的不可变集合
    public Collection<EndPoint> getConnectedEndPoints();

    public String getName();
}

2. AbstractConnector

2.1 描述

AbstractConnector利用ConnectionFactory工厂机制为不一样协议(HTTP,SSL等)建立Connection实例。  socket

AbstractConnector管理着链接器必须的几个基本服务:ide

(1)Executor:Executor服务用于运行该链接器所需的全部活动任务,(例如接受链接,处理HTTP请求),默认使用Server.getThreadPool做为Executor;函数

(2)Scheduler:调度器服务用于监视全部链接的空闲超时,而且也可用于监控链接时间,例如异步请求超时,默认使用ScheduledExecutorScheduler实例;this

(3)ByteBufferPool:ByteBufferPool服务提供给全部链接,用于从池中获取和释放ByteBuffer实例。

这些服务做为bean被Container管理,能够是托管或未托管。

链接器有一个ConnectionFactory集合,每一个ConnectionFactory有对应的协议名称。协议名称能够是现实的协议好比https/1.1或http2,甚至能够是私有协议名称。

好比SSL-http/1.1标示SslConnectionFactory,它是由HttpConnectionFactory实例化而且做为HttpConnectionFactory下一个协议。

ConnectionFactory集合能够经过构造函数注入,经过addConnectionFactory,removeConnectionFactory和setConnectionFactories修改。每一个协议名称只能对应一个ConnectionFactory实例,若是两个ConnectionFactory对应一个协议名称,那么第二个将替换第一个。

最新ConnectionFactory经过setDefaultProtocol方法设置,或第一次配置的协议工厂。

每一个ConnectionFactory类型负责它所接受的协议配置。为了配置HTTP协议,你须要传递HttpConfiguration实例到HttpConnectionFactory(或者其余支持HTTP的ConnectionFactory);类似地,SslConnectionFactory须要SslContextFactory对象和下一个协议名称。

(1)ConnectionFactory能够简单建立Connection对象去支持特定协议。好比HttpConnectionFactory能建立HttpConnection处理http/1.1,http/1.0和http/0.9;

(2)ConnectionFactory也能够经过其余ConnectionFactory建立一系列Connection对象。好比SslConnectionFactory配置了下一个协议名称,一旦接受了请求建立了SslConnection对象。而后能够经过链接器的getConnectionFactory获取下一个ConnectionFactory,这个ConnectionFactory产生的Connection能够处理从SslConnection获取的未加密的数据;

(3)ConnectionFactory也能够建立一个临时Connection,用于在链接上交换数据,以肯定下一个使用的协议。例如,ALPN(Application Layer Protocol Negotiation)协议是SSL的扩展,容许在SSL握手期间指定协议,ALPN用于HTTP2在客户端与服务器之间通讯协商协议。接受一个HTTP2链接,链接器会配置SSL-ALPN, h2,http/1.1。一个新接受的链接使用“SSL-ALPN”,它指定一个带有“ALPN”的SSLConnectionFactory做为下一个协议。所以,一个SSL链接实例被连接到一个ALPN链接实例。ALPN而后与客户端协商下一个协议,多是http2,或则http/1.1。一旦决定了下一个协议,ALPN链接调用getConnectionFactory建立链接实例,而后替换ALPN链接。

Connector在运行中会重复调用accept(int)方法,acceptor任务运行在一个循环中。

accept方法实现必须知足以下几点:

(1)阻塞等待新链接;

(2)接受链接(好比socket accept);

(3)配置链接;

(4)调用getDefaultConnectionFactory->ConnectionFactory.newConnection去建立一个新Connection。

acceptor默认的数量是1,数量能够是CPU的个数除以8。更多的acceptor能够减小服务器延迟并且能够得到一个高速的链接(好比http/1.0没有keepalive)。对于现代持久链接协议(http/1.1,http/2)默认值是足够的。

 

2.2 类图

 

 

(1)实现Connector接口,实现基本的链接器能力;

(2)继承ContainerLifeCycle类,具有容器化和生命周期能力;

(3)AbstractConnector有一个内部类Acceptor;

(4)ConnectionFactory工厂接口,实现类有HttpConnectionFactory,SslConnectionFactory,针对不一样的链接类型如HTTP,HTTPS产生不一样的工厂建立不一样的产品(Connection),这是典型的工厂方法模式;

2.3 AbstractConnector源码解读

AbstractConnector实现Connector接口,而Connector接口提供的都是getter方法,都比较简单,通常都是直接返回某个对象;

AbstractConnector继承ContainerLifeCycle类,因此具备LifeCycle特性,有启动中止动做;

2.3.1 字段描述

    // 锁对象,设置Accepting,获取ConnectionFactory等操做
    // Locker类封装了ReentrantLock类
    private final Locker _locker = new Locker();
    private final Condition _setAccepting = _locker.newCondition();
    // ConnectionFactory缓存,key为协议名称,value为ConnectionFactory实例
    private final Map<String, ConnectionFactory> _factories = new LinkedHashMap<>(); 
    // 与链接器对应的server
    private final Server _server;
    // 下面executor,scheduler,byteBufferPool是Connector必须的组件,Connector接口的几个getter方法就是返回这些对象。
    private final Executor _executor;
    private final Scheduler _scheduler;
    private final ByteBufferPool _byteBufferPool;
    // 接受链接的线程数组
    private final Thread[] _acceptors;
    // 已经创建的对端链接对象
    private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>());
    private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints);
    // Connector中止时,须要把Thread[] _acceptors中止
    // _stopping = new CountDownLatch(_acceptors.length)
    private CountDownLatch _stopping;
// 默认空闲链接时间 private long _idleTimeout = 30000; // 默认协议名称,对应ConnectionFactory private String _defaultProtocol; // 默认ConnectionFactory工厂 private ConnectionFactory _defaultConnectionFactory; private String _name; private int _acceptorPriorityDelta=-2; private boolean _accepting = true; private ThreadPoolBudget.Lease _lease;

2.3.2 构造函数和addConnectionFactory

 从构造函数来看,已经初始化了以下字段:

(1)_factories

(2)_defaultProtocol

(3)_defaultConnectionFactory

(4)_server

(5)_executor

(6)_scheduler

(7)_byteBufferPool

(8)_acceptors 

// 惟一的构造函数 
public AbstractConnector(
            Server server,
            Executor executor,
            Scheduler scheduler,
            ByteBufferPool pool,
            int acceptors,
            ConnectionFactory... factories)
    {
        _server=server;
        // 若是Executor为null,则获取Server的
        _executor=executor!=null?executor:_server.getThreadPool();
        if (scheduler==null)
            scheduler=_server.getBean(Scheduler.class);
        _scheduler=scheduler!=null?scheduler:new ScheduledExecutorScheduler();
        if (pool==null)
            pool=_server.getBean(ByteBufferPool.class);
        _byteBufferPool = pool!=null?pool:new ArrayByteBufferPool();

        addBean(_server,false);
        addBean(_executor);
        if (executor==null)
            unmanage(_executor); // inherited from server
        addBean(_scheduler);
        addBean(_byteBufferPool);
        // 缓存ConnectionFactory,若是没有设置,则为HttpConnectionFactory
        for (ConnectionFactory factory:factories) 
            addConnectionFactory(factory);

        int cores = Runtime.getRuntime().availableProcessors();
        if (acceptors < 0)
            acceptors=Math.max(1, Math.min(4,cores/8));
        if (acceptors > cores)
            LOG.warn("Acceptors should be <= availableProcessors: " + this);
       // 设置acceptor进程数组
        _acceptors = new Thread[acceptors];
    }

调用addConnectionFactory方法能够缓存全部的ConnectionFactory:

 public void addConnectionFactory(ConnectionFactory factory)
    {
        if (isRunning())
            throw new IllegalStateException(getState());

        // 须要移除的ConnectionFactory
        Set<ConnectionFactory> to_remove = new HashSet<>();
        for (String key:factory.getProtocols())
        {
            key=StringUtil.asciiToLowerCase(key);
            ConnectionFactory old=_factories.remove(key); // 先移除协议名称对应的ConnectionFactory对象
            if (old!=null)
            {
                if (old.getProtocol().equals(_defaultProtocol))
                    _defaultProtocol=null;
                to_remove.add(old); // 保存待移除
            }
            _factories.put(key, factory); // 增长新的ConnectionFactory
        }

         // keep factories still referenced
         // 避免一种场景:若是_factories里面已经缓存了HttpConnectionFactory,对应的协议名称为http/1.1
         // 而后增长的factory也是HttpConnectionFactory对应的协议名称为http/1.1通过上面的操做,to_remove里面有HttpConnectionFactory这样下面removeBean的时候会误删
        for (ConnectionFactory f : _factories.values())
            to_remove.remove(f);

        // remove old factories
        for (ConnectionFactory old: to_remove)
        {
            removeBean(old);
            if (LOG.isDebugEnabled())
                LOG.debug("{} removed {}", this, old);
        }

        // add new Bean
        addBean(factory);
        if (_defaultProtocol==null)
            _defaultProtocol=factory.getProtocol();
        if (LOG.isDebugEnabled())
            LOG.debug("{} added {}", this, factory);
    }

   

2.3.3 doStart

在Jetty中,服务器对象Server在启动的时候会启动该服务器管理的全部Connector,从全部Connector的继承关系中能够看出AbstractConnector对象是全部具体Connector对象的父类。 

        // start connectors last
        for (Connector connector : _connectors)
        {
            try
            {  
                connector.start(); // 启动Connector对象
            }
            catch(Throwable e)
            {
                mex.add(e);
            }
        }  

下面具体看一下AbstractConnector.doStart的具体实现。

 @Override
    protected void doStart() throws Exception
    {   // 一个Connector至少有一个ConnectionFactory对象
        if(_defaultProtocol==null)
            throw new IllegalStateException("No default protocol for "+this);
        _defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
        if(_defaultConnectionFactory==null)
            throw new IllegalStateException("No protocol factory for default protocol '"+_defaultProtocol+"' in "+this);
        // 若是是SslConnectionFactory,则必需要有下一个协议ConnectionFactory
        SslConnectionFactory ssl = getConnectionFactory(SslConnectionFactory.class);
        if (ssl != null)
        {
            String next = ssl.getNextProtocol();
            ConnectionFactory cf = getConnectionFactory(next);
            if (cf == null)
                throw new IllegalStateException("No protocol factory for SSL next protocol: '" + next + "' in " + this);
        }

        _lease = ThreadPoolBudget.leaseFrom(getExecutor(),this,_acceptors.length);
        super.doStart();
        // 设置全部acceptor线程中止的同步器
        _stopping=new CountDownLatch(_acceptors.length);
        for (int i = 0; i < _acceptors.length; i++)
        {
            Acceptor a = new Acceptor(i);
            addBean(a);
            getExecutor().execute(a); // 启动接受器
        }

        LOG.info("Started {}", this);
    }  

 

2.3.4 Acceptor

Acceptor接收器负责接受链接且自己是一个线程。

@Override
        public void run()
        {  // (1)设置线程名称与优先级
            final Thread thread = Thread.currentThread();
            String name=thread.getName();
            _name=String.format("%s-acceptor-%d@%x-%s",name,_id,hashCode(),AbstractConnector.this.toString());
            thread.setName(_name);

            int priority=thread.getPriority();
            if (_acceptorPriorityDelta!=0)
                thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,priority+_acceptorPriorityDelta)));

            _acceptors[_id] = thread;

            try
            {
                while (isRunning()) // (2)循环接受请求
                {
                    try (Locker.Lock lock = _locker.lock())
                    {
                        if (!_accepting && isRunning())
                        {
                            _setAccepting.await(); // 若是_accepting==false则阻塞,等待通知
                            continue;
                        }
                    }
                    catch (InterruptedException e) 
                    {
                        continue;
                    }
                    
                    try
                    {
                        accept(_id); // (3)接受请求,这个是核心方法
                    }
                    catch (Throwable x)
                    {
                        if (!handleAcceptFailure(x))
                            break;
                    }
                }
            }
            finally
            {
                thread.setName(name);
                if (_acceptorPriorityDelta!=0)
                    thread.setPriority(priority);

                synchronized (AbstractConnector.this) // Why?
                {
                    _acceptors[_id] = null;
                }
                CountDownLatch stopping=_stopping; 
                if (stopping!=null)
                    stopping.countDown(); // 若是线程异常,则设置线程同步器减一
            }
        }  

在Acceptor的run方法里面有个accept(int acceptorID)是AbstractConnector新增的抽象方法,负责处理链接请求。

到此AbstractConnector类的主要方法基本已经分析完毕,下面紧接着分析accept方法的实现,重点关注ServerConnector类。

3. ServerConnector源码解读

ServerConnector主要用于TCP/IP链接,使用不一样的ConnectionFactory实例,它能够直接或经过SSL接受HTTP、HTTP/2和WebSocket的链接。

ServerConnector是一个基于NIO的彻底异步实现。链接器必需的服务(Executor,Scheduler等)默认使用传入的Server实例;也能够经过构造函数注入;

各类重载的构造函数用于ConnectionFactory的配置。若是没有设置ConnectionFactory,构造函数将默认使用HttpConnectionFactory。SslContextFactory能够实例化SslConnectionFactory。

链接器会使用Executor执行许多Selector任务,这些任务使用NIO的Selector异步调度accepted链接。selector线程将调用经过EndPoint.fillInterested(Callback)或EndPoint.write(Callback,ByteBuffer)传入的Callback的方法。

这些回调能够执行一些非阻塞的IO工做,但老是会向Executor服务发送任何阻塞、长时间运行或应用程序任务。Selector默认的数量是JVM可用核数的一半。

3.1 字段描述

3.2 承接上面2.3.4小结介绍ServerConnector.accept(int accpetorID)

     // 参数acceptorID其实没有使用 
    @Override
    public void accept(int acceptorID) throws IOException
    {   // 获取与该链接器对应的ServerSocketChannel,该类是java.nio
        ServerSocketChannel serverChannel = _acceptChannel;
        if (serverChannel != null && serverChannel.isOpen())
        {
            SocketChannel channel = serverChannel.accept(); // java api
            accepted(channel); // 接受具体的SocketChannel
        }
    }  

在调用accepted方法以后具体的处理将交给SelectorManager类,ServerConnector使用的是ServerConnectorManager类。

4. SelectorManager类

4.1 类图

(1)SelectorManager经过管理ManagedSelector对象简化JVM原始提供的java.nio非阻塞操做;SelectorManager子类实现方法返回协议指定的EndPoint和Connection对象。

(2) ManagedSelector包装Selector简化在channel上面的非阻塞操做;ManagedSelector运行在select循环中而且阻塞在Selector.select()方法直到注册channel事件发生,当有事件发生,它负责通知与当前channel相关的EndPoint。

4.2 accept方法

    public void accept(SelectableChannel channel)
    {
        accept(channel, null);
    }

     // 注册Channel执行非阻塞读写操做
    public void accept(SelectableChannel channel, Object attachment)
    {
        final ManagedSelector selector = chooseSelector(channel); // 选择ManagedSelector对象
        // Accept是一个Runnable子类,submit提交线程池运行
        selector.submit(selector.new Accept(channel, attachment));
    }
     
    private ManagedSelector chooseSelector(SelectableChannel channel)
    {  
        return _selectors[_selectorIndex.updateAndGet(_selectorIndexUpdate)];
    }
    

4.3 ManagedSelectopr.Accept类

// 线程可执行类
class Accept extends Invocable.NonBlocking implements Closeable
    {
        private final SelectableChannel channel;
        private final Object attachment;

        Accept(SelectableChannel channel, Object attachment)
        {
            this.channel = channel;
            this.attachment = attachment;
        }

        @Override
        public void close()
        {
            LOG.debug("closed accept of {}", channel);
            closeNoExceptions(channel);
        }

        @Override
        public void run()
        {
            try
            {
                final SelectionKey key = channel.register(_selector, 0, attachment);
                // 建立EndPoint对象,而后提交线程池执行
                submit(new CreateEndPoint(channel, key));
            }
            catch (Throwable x)
            {
                closeNoExceptions(channel);
                LOG.debug(x);
            }
        }
    }

submit方法就是把线程任务提交到Queue<Runnable> _actions = new ArrayDeque<>();队列中。

而后每一个ManagedSelector在doStart里面启动线程池不断从_actions队列中获取任务执行,主要涉及的类有EatWhatYouKill,ManagedSelector.SelectorProducer等。

Jetty是基于Handler处理各类请求,下面重点分析如何调用最终的Handler处理器。

5. EndPoint和Connection

5.1 类图

                  

 

紧接着4.3中的CreateEndPoint类,调用createEndPoint方法,

 

private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException
    {
        // endPoint的实际类型是SocketChannelEndPoint对象
        EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);
         // 若是未设置ConnectionFactory,则默认是HttpConnectionFactory,链接对象类型是HttpConnection,即connection类型是HttpConnection
        Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment());
        endPoint.setConnection(connection);
        selectionKey.attach(endPoint);
        // 当EndPoint打开,回调该方法
        endPoint.onOpen();
        // 当EndPoint打开,回调该方法,同时将endPoint保存在AbstractConnector的Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>());里面
        _selectorManager.endPointOpened(endPoint);
        // 内部调用HttpConnection.onOpen()
        _selectorManager.connectionOpened(connection);
        if (LOG.isDebugEnabled())
            LOG.debug("Created {}", endPoint);
    }  

下面看看HttpConnection.onOpen()方法实现:

     // HttpConnection
    @Override
    public void onOpen()
    {
        super.onOpen(); // 通知Listeners
        fillInterested();
    }

    // AbstractConnection
    public void fillInterested()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("fillInterested {}",this);
         // SocketChannelEndPoint类型
        getEndPoint().fillInterested(_readCallback);
    }

    // AbstractEndPoint
    @Override
    public void fillInterested(Callback callback)
    {
        notIdle();
        _fillInterest.register(callback); // 最终调用SocketChannelEndPoint父类ChannelEndPoint.needsFillInterest
    }  

最后补充一个业务调用栈: