dubbo 源码学习笔记 (二) —— dubbo发布服务的过程

欢迎访问我的个人博客休息的风

dubbo基础类ExtensionLoader和URL贯穿整个框架,掌握这两个类的思想和源码,就相当于对dubbo有了初步的认识。接着,我们来分析一下dubbo是如何发布一个服务的。

以ServiceConfig.doExportUrlsFor1Protocol为起点,在使用spring容器启动时,会调用该方法进行发布服务的一系列操作。简单来说,发布的过程可以简化为,服务转换为invoker->通过netty创建server,并将dubboInvoker放到工作线程池中->在zk上注册服务信息。这样在注册中心就会有服务相关的信息,在netty的工作线程中,就会有dubbo真实的invoker。需要使用的时候就在注册中心找信息,然后连接netty,去工作线程中拿一个线程执行。

以下是整理dubbo发布服务的整个源码调用过程:(看不清,请点击新的页签进行查看)


首先,在ServiceConfig.doExportUrlsFor1Protocol这个方法里是真正发布服务的入口。

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    //省略一些代码。。。。
    //前面将各种配置信息都封装进map中,然后构建url对象
    URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }

    //这里就是暴露服务的入口
    String scope = url.getParameter(Constants.SCOPE_KEY);
    //配置为none不暴露
    if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

        //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            //方法里的url是如下形式:
            //injvm://127.0.0.1/com.zyy.service.inter.OrderLogService?anyhost=true&application=order-basic
            // &dubbo=2.5.5&generic=false&interface=com.zyy.service.inter.OrderLogService
            // &methods=queryOrderLogs,update,insert&pid=9304&side=provider&timestamp=1507951011792&validation=true
            //表示的意思大概是:injvm 表示本地暴露, 访问地址是127.0.0.1, 访问路径是com.zyy.service.inter.OrderLogService
            //接下来是参数信息,存在URL的parameters的属性里
            exportLocal(url);
        }
        //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
        if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            if (registryURLs != null && registryURLs.size() > 0
                    && url.getParameter("register", true)) {
                for (URL registryURL : registryURLs) {
                    url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }
                    if (logger.isInfoEnabled()) {
                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                    }
                    //registryURL的值示例如下:
                    //registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=order-basic&dubbo=2.5.5
                    // &pid=9304&registry=zookeeper&timestamp=1507951010581
                    //这里的url表示意思如下: registry 表示当前的url是用来注册的; 访问地址是127.0.0.1 端口号是2181; 访问路径是com.alibaba.dubbo.registry.RegistryService
                    //application=order-basic 应用名是:order-basic 等的信息存在url的parameters属性里
                    //这里的proxyFactory.getInvoker使用的是JavassistProxyFactory.getInvoker方法,
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                    //这里的protocol具体使用的是哪个,它的判断过程是:
                    // com.alibaba.dubbo.common.URL url = arg0.getUrl();
                    // String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
                    // 再根据 extName 获取相应的spi类
                    //如上讲的,invoker里面会存有一个url,这个url的协议是registry, 所以protocol具体的类是RegistryProtocol
                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
这个方法首先是在构建URL对象,之后根据scope的值,决定暴露本地服务还是远程服务(其实这个scope为null,也就是本地和远程都会暴露);这里的proxyFactory.getInvoker使用的是JavassistProxyFactory.getInvoker方法,

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper类不能正确处理带$的类名  final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}
先用Wrapper.getWrapper进行包装,再实例化AbstractProxyInvoker,也就是说“Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));”这句代码返回的Invoker的实例是AbstractProxyInvoker的实例。

“Exporter<?> exporter = protocol.export(invoker);”这里的protocol确定调用是哪个协议的过程比较不好理解。因为这个过程的代码是生成的,用Javassist去加载使用的。类名为Protocol$Adpative,export的方法为:

public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
    if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
    if (arg0.getUrl() == null)
        throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
    com.alibaba.dubbo.common.URL url = arg0.getUrl();
    String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
    if (extName == null)
        throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.export(arg0);
}
这个方法的入参就是上一句代码返回的Invoker,Invoker里面 会存有一个url,这个url的协议值是registry, extName的值就是registry,通过spi的加载机制,可以确定protocol具体的类是RegistryProtocol。

在RegistryProtocol.export的方法里,简单的过程是:DubboProtocl启动netty,发布服务到工作线程(通用情况下是dubbo协议和netty做网络通讯)-> 往zk上注册信息和监听(注册中心一般使用zk) -> 返回一个new Exporter

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export invoker  //使用dubboProtocl启动netty,发布服务  final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);  //registry provider  //通过registryFactory获取一个Registry,  final Registry registry = getRegistry(originInvoker);  //对要注册到注册中心的url做一些处理  final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);  //如果用zk,会在ZookeeperRegistry.doRegister里真正注册  registry.register(registedProviderUrl);  // 订阅override数据  // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。  final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);  final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);  overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);  ///如果用zk,会在ZookeeperRegistry.doSubscribe里真正订阅监听  registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);  //保证每次export都返回一个新的exporter实例  return new Exporter<T>() {
        public Invoker<T> getInvoker() {
            return exporter.getInvoker();  }

        //省略一些代码。。。。
这个方法里主要是发布的一个过程,真正的细节,还是在zookeeperRegistry注册,DubboProtocol启动netty,和发布服务。特别是启动netty网络层比较复杂,之后会单独分析。这里只做一个简单的过程分析。

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    String key = serviceKey(url);
    //封装为一个DubboExporter
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //省略一些代码
    。。。。。。
    
    //网络通讯的入口,会去默认用netty创建
    openServer(url);

    return exporter;
}
在openServer的方法里,会去调用createServer方法,开启一个Server服务端通讯的入口。

private ExchangeServer createServer(URL url) {
    //默认开启server关闭时发送readonly事件
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    //默认开启heartbeat
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
    ExchangeServer server;
    try {
        //通过spi机制,真正使用HeaderExchanger去bind,这里可以看一下requestHandler,
        // 这个就是之后客户端refer真正处理的地方
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    //省略一些代码。。。。
requestHandler的代码如下:

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

    public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
        if (message instanceof Invocation) {
            Invocation inv = (Invocation) message;
            //从DubboExporter里获取Invoker
            Invoker<?> invoker = getInvoker(channel, inv);
            //如果是callback 需要处理高版本调用低版本的问题
            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
               //callback的处理,省略。。。。
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            //反射调用,真正客户端方法调用的地方
            return invoker.invoke(inv);
        }

在HeaderExchanger里,通过Transporters,让具体的协议与发布过程解耦。通过SPI机制,使用NettyTransporter去创建一个NettyServer,doOpen方法,开启一个netty服务器。如果有用过netty的,应该会比较熟悉这段代码。

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    //两个线程池,用于请求和工作处理
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);

    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    // https://issues.jboss.org/browse/NETTY-365
    // https://issues.jboss.org/browse/NETTY-379
    // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            //使用适配,用DubboCodec去编码解码
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            /*int idleTimeout = getIdleTimeout();
            if (idleTimeout > 10000) {
                pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
            }*/
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            //工作线程的处理handler
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // bind
    channel = bootstrap.bind(getBindAddress());
这样,就启动一个netty的服务端了。并且也将真正处理的DubboProtocol的requestHandler绑定到netty的工作线程池里了。这样,如果要获取一个请求,就可以知道,最终的处理,是通过netty工作线程池里拿一个线程,去用 requestHandler去处理,把处理结果再通过netty返回客户端。至于如何找到哪个机器上的哪个服务的,就是通过注册中心以及负载均衡去处理了。这篇博客主要是描述发布服务的整个过程,其中有些细节省略,关注点在整个过程是如何发布的。