码恋 码恋

ALL YOUR SMILES, ALL MY LIFE.

目录
Dubbo系列笔记之服务暴露过程
/  

Dubbo系列笔记之服务暴露过程

一、引言

在 Spring 容器启动后,Dubbo 监听到 Spring 容器发布刷新事件,就会执行服务的暴露逻辑。整个过程可分为下面三部分:

第一部分是前置工作,主要用于检查参数,组装 URL。
第二部分是导出服务,包含暴露服务到本地 ( injvm ),和暴露服务到远程两个过程。
第三部分是向注册中心注册服务,用于服务发现。

二、服务暴露的起点

❤ tips :小伙伴们可以使用 dubbo-demo 模块的 dubbo-demo-xml-provider 下的 Application 进行调试,跟踪服务暴露的步骤。

1、ContextRefreshedEvent 事件

我们知道,在 spring 的上下文初始化完毕后,会发布一个 ContextRefreshedEvent 事件。

  • 应用上下文构造方法
    ClassPathXmlApplicationContext 分析,查看其构造方法:

    public ClassPathXmlApplicationContext(String[] paths, Class<?> clazz, ApplicationContext parent) throws BeansException {
          super(parent);
          Assert.notNull(paths, "Path array must not be null");
          Assert.notNull(clazz, "Class argument must not be null");
          this.configResources = new Resource[paths.length];
    
          for(int i = 0; i < paths.length; ++i) {
              this.configResources[i] = new ClassPathResource(paths[i], clazz);
          }
    
          this.refresh();
      }
    

    注意到在上下文对象的属性初始化完毕后,调用了 refresh( ) 方法。

  • refresh( ) 方法
    该方法继承自 AbstractApplicationContext ,在容器刷新完成后,调用 finishRefresh( ) 方法发布了一个上下文刷新事件:

    protected void finishRefresh() {
      	// Initialize lifecycle processor for this context.
      	initLifecycleProcessor();
    
      	// Propagate refresh to lifecycle processor first.
      	getLifecycleProcessor().onRefresh();
    
      	// 发布上下文刷新事件
      	publishEvent(new ContextRefreshedEvent(this));
    
      	// Participate in LiveBeansView MBean, if active.
      	LiveBeansView.registerApplicationContext(this);
      }
    
2、Dubbo 监听 spring 上下文刷新事件

dubbo-config 模块,dubbo-config-spring 中,OneTimeExecutionApplicationContextEventListener 这个类提供了对部分 spring 事件的处理逻辑,并提供了 onApplicationContextEvent(ApplicationContextEvent event) 模板方法实现对事件的处理。

  • DubboBootstrapApplicationListener
    Dubbo 引导类应用事件监听器是上述模板类的一个实现类,主要处理应用的刷新关闭事件。

    @Override
      public void onApplicationContextEvent(ApplicationContextEvent event) {
          if (event instanceof ContextRefreshedEvent) {
              onContextRefreshedEvent((ContextRefreshedEvent) event);
          } else if (event instanceof ContextClosedEvent) {
              onContextClosedEvent((ContextClosedEvent) event);
          }
      }
    
      private void onContextRefreshedEvent(ContextRefreshedEvent event) {
          dubboBootstrap.start();
      }
    

    在监听到上下文刷新事件时,调用了 DubboBootstrap#start( ) 方法:

    public DubboBootstrap start() {
          if (started.compareAndSet(false, true)) {
              ready.set(false);
              initialize();
              if (logger.isInfoEnabled()) {
                  logger.info(NAME + " is starting...");
              }
              // Dubbo 服务暴露
              exportServices();
    
              // Not only provider register
              if (!isOnlyRegisterProvider() || hasExportedServices()) {
                  // 2. export MetadataService
                  exportMetadataService();
                  //3. Register the local ServiceInstance if required
                  registerServiceInstance();
              }
    
              referServices();
              if (asyncExportingFutures.size() > 0) {
                  new Thread(() -> {
                      try {
                          this.awaitFinish();
                      } catch (Exception e) {
                          logger.warn(NAME + " exportAsync occurred an exception.");
                      }
                      ready.set(true);
                      if (logger.isInfoEnabled()) {
                          logger.info(NAME + " is ready.");
                      }
                  }).start();
              } else {
                  ready.set(true);
                  if (logger.isInfoEnabled()) {
                      logger.info(NAME + " is ready.");
                  }
              }
              if (logger.isInfoEnabled()) {
                  logger.info(NAME + " has started.");
              }
          }
          return this;
      }
    
  • exportServices( )

    注意到其调用了 exportServices( ) 方法执行服务暴露。

    private void exportServices() {
          // 循环配置中的 service
          configManager.getServices().forEach(sc -> {
              ServiceConfig serviceConfig = (ServiceConfig) sc;
              serviceConfig.setBootstrap(this);
              // 异步暴露
              if (exportAsync) {
                  ExecutorService executor = executorRepository.getServiceExporterExecutor();
                  Future<?> future = executor.submit(() -> {
                      sc.export();
                      exportedServices.add(sc);
                  });
                  asyncExportingFutures.add(future);
              } else {// 调用 ServiceConfig 的 export( ) 方法暴露服务,并加入 Map 记录
                  sc.export();
                  exportedServices.add(sc);
              }
          });
      }
    

三、服务暴露前置工作

服务暴露的前置工作主要是两部分,分别是配置的检查和 URL 的装配。配置的检查主要检查用户的配置是否合理,并为用户提供一些默认配置;URL 是 Dubbo 扩展点中传递参数配置的对象,在配置检查完毕后,再根据配置组装 URL 对象。

1、配置检查

继续上面,我们从 ServiceConfig 的 export( ) 方法看:

public synchronized void export() {
        // 是否暴露服务
        if (!shouldExport()) {
            return;
        }

        // 获取引导类
        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.initialize();
        }

        // 配置检查
        checkAndUpdateSubConfigs();

        // 初始化 ServiceMetadata
        serviceMetadata.setVersion(getVersion());
        serviceMetadata.setGroup(getGroup());
        serviceMetadata.setDefaultGroup(getGroup());
        serviceMetadata.setServiceType(getInterfaceClass());
        serviceMetadata.setServiceInterfaceName(getInterface());
        serviceMetadata.setTarget(getRef());

        // 延时暴露
        if (shouldDelay()) {
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
            // 执行服务暴露逻辑
            doExport();
        }

        // 发布 ServiceConfigExportedEvent 事件
        exported();
    }

其中进行了两项配置的检查:

  • export 属性
    检查是否允许暴露服务,比如当我们进行本地调试时,并不希望服务暴露出去给其他人调用,可以将 export 设置为 false 。

    <dubbo:provider export="false" />
    
  • delay 属性
    检查是否延时暴露,可以设置延时暴露的时间(毫秒)。

    <dubbo:provider delay="2000" />
    

其他的配置检查项可自行查看,本文不细说。

接下来继续看 doExport( ) 方法:

protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;

        if (StringUtils.isEmpty(path)) {
            path = interfaceName;
        }
        doExportUrls();
    }

其中调用了 doExportUrls( )

private void doExportUrls() {
        // ServiceRepository 存储了所有服务端发布的服务、客户端需要访问的服务,通过 ServiceRepository 可以获取所有本 dubbo 实例发布的服务和引用的服务。
        ServiceRepository repository = ApplicationModel.getServiceRepository();
        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
        // 服务加入 ServiceRepository
        repository.registerProvider(
                getUniqueServiceName(),
                ref,
                serviceDescriptor,
                this,
                serviceMetadata
        );

        // 加载注册中心 URL,可能存在多个
        List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);

        // 遍历协议,并暴露服务
        for (ProtocolConfig protocolConfig : protocols) {
            String pathKey = URL.buildKey(getContextPath(protocolConfig)
                    .map(p -> p + "/" + path)
                    .orElse(path), group, version);
            // 如果用户指定了映射路径,则再注册一次服务映射到路径
            repository.registerService(pathKey, interfaceClass);
            serviceMetadata.setServiceKey(pathKey);
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
2、URL 组装

继续看 doExportUrlsFor1Protocol 方法

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        // 协议名
        String name = protocolConfig.getName();
        if (StringUtils.isEmpty(name)) {
            // 如果协议名为空,设置成默认 dubbo
            name = DUBBO;
        }

        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, PROVIDER_SIDE);
        // 加入版本号、时间戳、进程号
        ServiceConfig.appendRuntimeParameters(map);
        // metrics/application/moudle 等
        AbstractConfig.appendParameters(map, getMetrics());
        AbstractConfig.appendParameters(map, getApplication());
        AbstractConfig.appendParameters(map, getModule());
        // remove 'default.' prefix for configs from ProviderConfig
        // appendParameters(map, provider, Constants.DEFAULT_KEY);
        AbstractConfig.appendParameters(map, provider);
        AbstractConfig.appendParameters(map, protocolConfig);
        AbstractConfig.appendParameters(map, this);
        // methods 是 MethodConfig 集合,其中存储了 <dubbo:method> 标签的配置信息
        if (CollectionUtils.isNotEmpty(getMethods())) {
            for (MethodConfig method : getMethods()) {
                // MethodConfig 加入 map 集合
                AbstractConfig.appendParameters(map, method, method.getName());
                // retry 参数处理
                String retryKey = method.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    // 不重试,则 retries 设置为 0
                    if ("false".equals(retryValue)) {
                        map.put(method.getName() + ".retries", "0");
                    }
                }
                // ArgumentConfig 处理
                List<ArgumentConfig> arguments = method.getArguments();
                if (CollectionUtils.isNotEmpty(arguments)) {
                    for (ArgumentConfig argument : arguments) {
                        // type 属性是否为空
                        if (argument.getType() != null && argument.getType().length() > 0) {
                            Method[] methods = interfaceClass.getMethods();
                            // visit all methods
                            if (methods.length > 0) {
                                for (int i = 0; i < methods.length; i++) {
                                    String methodName = methods[i].getName();
                                    // 对比方法名称
                                    if (methodName.equals(method.getName())) {
                                        Class<?>[] argtypes = methods[i].getParameterTypes();
                                        // one callback in the method
                                        if (argument.getIndex() != -1) {
                                            // ArgumentConfig 中的 type 是否与当前方法参数列表中的参数名是否相同
                                            if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                                AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                            } else {
                                                throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                            }
                                        } else {
                                            // multiple callbacks in the method
                                            for (int j = 0; j < argtypes.length; j++) {
                                                Class<?> argclazz = argtypes[j];
                                                // 遍历参数列表,查找类型名称为 argument.getType() 的参数
                                                if (argclazz.getName().equals(argument.getType())) {
                                                    AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
                                                    if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                        throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                            // 用户未配置 type 属性,但配置了 index 属性,且 index != -1
                        } else if (argument.getIndex() != -1) {
                            AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                        } else {
                            throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                        }

                    }
                }
            } // end of methods for
        }
        // 判断 generic 属性是否为 true
        if (ProtocolUtils.isGeneric(generic)) {
            map.put(GENERIC_KEY, generic);
            map.put(METHODS_KEY, ANY_VALUE);
        } else {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put(REVISION_KEY, revision);
            }
            // 生成接口 wrapper 包装类,其中包含的接口的详细信息
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                // 逗号分隔方法名,加入 map
                map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }

        // 将 token 加入 map ,token 用于令牌验证
        if(ConfigUtils.isEmpty(token) && provider != null) {
            token = provider.getToken();
        }

        if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                // token 为默认值或 true 时,随机生成 token
                map.put(TOKEN_KEY, UUID.randomUUID().toString());
            } else {
                map.put(TOKEN_KEY, token);
            }
        }
        // map 数据加入 ServiceMetadata
        serviceMetadata.getAttachments().putAll(map);

        // 服务暴露逻辑
        // 获取 host , port
        String host = findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = findConfigedPorts(protocolConfig, name, map);
        // URL 组装
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            // SPI 加载 ConfiguratorFactory 获取 Configurator 实例,配置 url
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }
        // 从 url 中获取 scope 作用域
        String scope = url.getParameter(SCOPE_KEY);
        // scope 为 none 时,不暴露服务
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            // scope 非 remote 时,本地 injvm 暴露服务
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // scope 非 local 时,远程暴露服务
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                // 有注册中心
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        // 协议为 injvm 时跳出循环
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        // 服务是否动态注册,如果设为false,注册后将显示为disable状态,需人工启用,并且服务提供者停止时,也不会自动取消注册,需人工禁用。
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        // 加载 monitor 连接
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                        if (monitorUrl != null) {
                            // monitor 连接 加入 url
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            if (url.getParameter(REGISTER_KEY, true)) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            } else {
                                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                            }
                        }

                        // 代理参数
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }
                        // 为服务类生成 Invoker
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        // 持有 Invoker 和 ServiceConfig
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        // 服务暴露,生成 Export
                        Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else { // 没有注册中心,仅暴露服务
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

方法代码较长,分支较多,需要小伙伴耐心看。代码容易理解,只是参数非常多。上面说了,URL 参数时 Dubbo 的每个扩展点通信的同一参数,所以这个方法比较重要的一部分是组装 URL 参数。

另一部分就是要分析的重点,服务暴露逻辑。

四、服务暴露过程

通过阅读上面的 #doExportUrlsFor1Protocol 方法,我们知道,根据 scope 的不同关于服务暴露大体分三种情况:

  • scope = none,不导出服务
  • scope != remote,导出到本地
  • scope != local,导出到远程

none 的情况就不说了,而无论是暴露服务到本地还是远程,我们观察到都需要生成一个引用服务的 Invoker 。

Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。

这样看来,Invoker 屏蔽了调用的内部细节,也就是说在调用时无需关心服务的内部是怎么实现的。

接下来我们看下如何创建 Invoker :

Dubbo 使用 ProxyFactory 来创建 Invoker ,ProxyFactory 是 SPI 接口,默认的扩展使用javassist 实现,即 JavassistProxyFactory,下面我们看一下:

public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        // 获取代理
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // 创建 Wrapper 类
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        // 创建 Invoker
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                // 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

其中 Wrapper 的生成过程使用了 Dubbo 基于 Javassist 封装的 ClassGenerator 类,有兴趣的小伙伴自行查看,这里我们不过多分析。

1、服务本地暴露

接下来我们看 ServiceConfig 中的 exportLocal(URL url) 方法,分析服务本地暴露的过程。

private void exportLocal(URL url) {
        // 构建 URL 对象,协议 injvm , host 127.0.0.1
        URL local = URLBuilder.from(url)
                .setProtocol(LOCAL_PROTOCOL)
                .setHost(LOCALHOST_VALUE)
                .setPort(0)
                .build();
        // 创建 Invoker ,调用 InjvmProtocol 的 export 方法暴露服务
        // 其中 PROTOCOL 为 Protocol 的自适应扩展,会根据 URL 的参数自动获得 InjvmProtocol 对象。
        Exporter<?> exporter = PROTOCOL.export(
                PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
    }

这个方法比较简单,首先构建 URL 对象,将协议、主机和端口设置为本地状态,然后创建 Invoker ,再调用 InjvmProtocolexport 方法暴露服务到本地。下面是 export 方法:

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }

如上,创建完 InjvmExport 对象后,本地暴露的过程就完成了,而服务的引用过程,就是基于不同的 Export 来完成的,这会在后面的服务引用中详细说明。

2、服务远程暴露

在 Dubbo 中,有多种 Protocol 的实现,服务暴露的逻辑大体相同,与本地暴露相比,远程暴露还会执行下面两个步骤:

  • 启动通信 Server,绑定端口,提供远程调用。
  • 将服务注册到注册中心。

下面就以 Dubbo 默认的 dubbo 协议来说明服务远程暴露的过程。

通过本地服务暴露的过程我们知道,暴露的动作是通过自适应扩展加载协议对应的 xxxProtocol 对象并调用其 export(Invoker invoker) 方法实现的。由此容易联想到当 dubbo 协议时其会调用 DubboProtocol#export(Invoker invoker) 方法执行服务暴露逻辑。

其实不是的。

回顾上述 doExportUrlsFor1Protocol 方法的 174 行,在生成 Invoker 组装 Registry 的 URL 对象时有代码:

registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())

生成后的 url 类似这样:

registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&metadata-type=remote&pid=3451&qos.port=22222&registry=zookeeper&timestamp=1599128056569

可以看到经过这样处理过的 URL 的协议是 registry,所以应该调用的是 RegistryProtocol 类的方法。

你以为这样就对了吗?当然不对!

在 《Dubbo 的 SPI 实现》中,我们分析了 Dubbo 的 SPI 实现细节,其中也提到了 Protocol 的 Wrapper 实现,我们查看关于 Protocol 的 SPI 配置发现其中并没有配置各种协议的实现,只配置了两个 Wrapper 类。

image.png

同时在两个 Wrapper 中也指定了优先级:

image.png

image.png

所以实际的执行顺序应该是:

Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol => DubboProtocol

这部分在上面本地暴露时没有说明,要特别注意。

在不指定 scope 时,Dubbo 默认本地和远程都暴露服务,

ProtocolFilterWrapper 和 ProtocolListenerWrapper 没做什么,下面看 RegistryProtocol 的 export 方法。

@Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        // 获取注册中心 URL ,如 zookeeper: zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F10.2.44.93%3A20880%2Forg.apache.dubbo.demo.DemoService...
        URL registryUrl = getRegistryUrl(originInvoker);
        // 获取已经注册的服务提供者
        URL providerUrl = getProviderUrl(originInvoker);

        // 获取订阅 URL
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //创建 Invoker 并调用 protocol.export 暴露服务
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        final Registry registry = getRegistry(originInvoker);
        // 创建监听器
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

        // 是否注册
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            register(registryUrl, registeredProviderUrl);
        }

        // register stated url on provider model
        registerStatedUrl(registryUrl, registeredProviderUrl, register);


        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        notifyExport(exporter);
        // 创建并返回 DestroyableExporter
        return new DestroyableExporter<>(exporter);

    }

总结一下上面代码主要做了下面几件事:

  • 获取注册中心 URL
  • 调用 doLocalExport(invoker) 方法暴露服务
  • 向注册中心进行订阅 override 数据
  • 创建并返回 DestroyableExporter

继续看 doLocalExport(invoker) 方法:

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        // 根据 Invoker 获取缓存 key
        String key = getCacheKey(originInvoker);
        // 从缓存中获取,如果没有暴露过服务则调用 protocol.export 暴露服务
        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

如上,根据协议 dubbo 最终调用 DubboProtocol 的 export 方法:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // 获取 URL
        URL url = invoker.getUrl();

        // 创建 DubboExporter
        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        // 加入缓存
        exporterMap.put(key, exporter);

        // 本地存根相关
        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            }
        }
        // 启动本地服务
        openServer(url);
        // 序列化优化
        optimizeSerialization(url);

        return exporter;
    }

继续看比较重要的 openServer(url) 方法:

private void openServer(URL url) {
        // 获取本地服务地址,ip:port
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            // 缓存中获取通信服务器,防止重复创建
            ProtocolServer server = serverMap.get(key);
            if (server == null) {
                // 双重校验锁,如果缓存不存在则创建通信 server 并加入缓存
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
			// 创建服务器
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // 若服务器已创建,则根据 url 中的配置重置
                server.reset(url);
            }
        }
    }

在第 20 行中,方法的开头已经做了缓存检查,为什么还会出现通信服务器已经创建的情况呢?在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例。若某个端口上已有服务器实例,此时则调用 reset 方法重置服务器的一些配置。此部代码不是重点,有兴趣的小伙伴自行查看。

继续看 createServer(url) 方法,如何创建通信服务器:

private ProtocolServer createServer(URL url) {
        url = URLBuilder.from(url)
                // 默认开启 server 关闭时发送 READ_ONLY 事件
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // 心跳检测
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                // 设置编码器为 dubbo
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                .build();
        // 获取 server 类型,默认 netty
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
        // 判断对应的扩展是否存在
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }
        // 启动通信服务器
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        // 判断 client 对应的扩展是否存在
        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return new DubboProtocolServer(server);
    }

至此,关于服务远程暴露的启动通信服务器部分就分析完了,关于服务器如何创建的细节,会在后面的部分专门加以分析。

另外一部分服务注册的逻辑,就是将服务的 URL 数据保存至注册中心,我们在上一篇 《Dubbo系列笔记之zookeeper注册中心原理》 也有提及,感兴趣的小伙伴可以继续跟进代码查看。



❤ 转载请注明本文地址或来源,谢谢合作 ❤


center