在 Spring 容器启动后,Dubbo 监听到 Spring 容器发布刷新事件,就会执行服务的暴露逻辑。整个过程可分为下面三部分:
第一部分是前置工作,主要用于检查参数,组装 URL。
第二部分是导出服务,包含暴露服务到本地 ( injvm ),和暴露服务到远程两个过程。
第三部分是向注册中心注册服务,用于服务发现。
❤ tips :小伙伴们可以使用 dubbo-demo
模块的 dubbo-demo-xml-provider
下的 Application
进行调试,跟踪服务暴露的步骤。
ContextRefreshedEvent
事件我们知道,在 spring 的上下文初始化完毕后,会发布一个 ContextRefreshedEvent
事件。
应用上下文构造方法
以 ClassPathXmlApplicationContext
分析,查看其构造方法:
public ClassPathXmlApplicationContext(String[] paths, Class<?> clazz, ApplicationContext parent) throws BeansException {
super(parent);
Assert.notNull(paths, "Path array must not be null");
Assert.notNull(clazz, "Class argument must not be null");
this.configResources = new Resource[paths.length];
for(int i = 0; i < paths.length; ++i) {
this.configResources[i] = new ClassPathResource(paths[i], clazz);
}
this.refresh();
}
注意到在上下文对象的属性初始化完毕后,调用了 refresh( )
方法。
refresh( )
方法
该方法继承自 AbstractApplicationContext
,在容器刷新完成后,调用 finishRefresh( )
方法发布了一个上下文刷新事件:
protected void finishRefresh() {
// Initialize lifecycle processor for this context.
initLifecycleProcessor();
// Propagate refresh to lifecycle processor first.
getLifecycleProcessor().onRefresh();
// 发布上下文刷新事件
publishEvent(new ContextRefreshedEvent(this));
// Participate in LiveBeansView MBean, if active.
LiveBeansView.registerApplicationContext(this);
}
在 dubbo-config
模块,dubbo-config-spring
中,OneTimeExecutionApplicationContextEventListener
这个类提供了对部分 spring 事件的处理逻辑,并提供了 onApplicationContextEvent(ApplicationContextEvent event)
模板方法实现对事件的处理。
DubboBootstrapApplicationListener
Dubbo 引导类应用事件监听器是上述模板类的一个实现类,主要处理应用的刷新关闭事件。
@Override
public void onApplicationContextEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
dubboBootstrap.start();
}
在监听到上下文刷新事件时,调用了 DubboBootstrap#start( )
方法:
public DubboBootstrap start() {
if (started.compareAndSet(false, true)) {
ready.set(false);
initialize();
if (logger.isInfoEnabled()) {
logger.info(NAME + " is starting...");
}
// Dubbo 服务暴露
exportServices();
// Not only provider register
if (!isOnlyRegisterProvider() || hasExportedServices()) {
// 2. export MetadataService
exportMetadataService();
//3. Register the local ServiceInstance if required
registerServiceInstance();
}
referServices();
if (asyncExportingFutures.size() > 0) {
new Thread(() -> {
try {
this.awaitFinish();
} catch (Exception e) {
logger.warn(NAME + " exportAsync occurred an exception.");
}
ready.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
}).start();
} else {
ready.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
}
if (logger.isInfoEnabled()) {
logger.info(NAME + " has started.");
}
}
return this;
}
exportServices( )
注意到其调用了 exportServices( )
方法执行服务暴露。
private void exportServices() {
// 循环配置中的 service
configManager.getServices().forEach(sc -> {
ServiceConfig serviceConfig = (ServiceConfig) sc;
serviceConfig.setBootstrap(this);
// 异步暴露
if (exportAsync) {
ExecutorService executor = executorRepository.getServiceExporterExecutor();
Future<?> future = executor.submit(() -> {
sc.export();
exportedServices.add(sc);
});
asyncExportingFutures.add(future);
} else {// 调用 ServiceConfig 的 export( ) 方法暴露服务,并加入 Map 记录
sc.export();
exportedServices.add(sc);
}
});
}
服务暴露的前置工作主要是两部分,分别是配置的检查和 URL 的装配。配置的检查主要检查用户的配置是否合理,并为用户提供一些默认配置;URL 是 Dubbo 扩展点中传递参数配置的对象,在配置检查完毕后,再根据配置组装 URL 对象。
继续上面,我们从 ServiceConfig 的 export( ) 方法看:
public synchronized void export() {
// 是否暴露服务
if (!shouldExport()) {
return;
}
// 获取引导类
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.initialize();
}
// 配置检查
checkAndUpdateSubConfigs();
// 初始化 ServiceMetadata
serviceMetadata.setVersion(getVersion());
serviceMetadata.setGroup(getGroup());
serviceMetadata.setDefaultGroup(getGroup());
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());
// 延时暴露
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 执行服务暴露逻辑
doExport();
}
// 发布 ServiceConfigExportedEvent 事件
exported();
}
其中进行了两项配置的检查:
export 属性
检查是否允许暴露服务,比如当我们进行本地调试时,并不希望服务暴露出去给其他人调用,可以将 export 设置为 false 。
<dubbo:provider export="false" />
delay 属性
检查是否延时暴露,可以设置延时暴露的时间(毫秒)。
<dubbo:provider delay="2000" />
其他的配置检查项可自行查看,本文不细说。
接下来继续看 doExport( ) 方法:
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
return;
}
exported = true;
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
doExportUrls();
}
其中调用了 doExportUrls( )
:
private void doExportUrls() {
// ServiceRepository 存储了所有服务端发布的服务、客户端需要访问的服务,通过 ServiceRepository 可以获取所有本 dubbo 实例发布的服务和引用的服务。
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
// 服务加入 ServiceRepository
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
// 加载注册中心 URL,可能存在多个
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
// 遍历协议,并暴露服务
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// 如果用户指定了映射路径,则再注册一次服务映射到路径
repository.registerService(pathKey, interfaceClass);
serviceMetadata.setServiceKey(pathKey);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
继续看 doExportUrlsFor1Protocol 方法
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 协议名
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
// 如果协议名为空,设置成默认 dubbo
name = DUBBO;
}
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);
// 加入版本号、时间戳、进程号
ServiceConfig.appendRuntimeParameters(map);
// metrics/application/moudle 等
AbstractConfig.appendParameters(map, getMetrics());
AbstractConfig.appendParameters(map, getApplication());
AbstractConfig.appendParameters(map, getModule());
// remove 'default.' prefix for configs from ProviderConfig
// appendParameters(map, provider, Constants.DEFAULT_KEY);
AbstractConfig.appendParameters(map, provider);
AbstractConfig.appendParameters(map, protocolConfig);
AbstractConfig.appendParameters(map, this);
// methods 是 MethodConfig 集合,其中存储了 <dubbo:method> 标签的配置信息
if (CollectionUtils.isNotEmpty(getMethods())) {
for (MethodConfig method : getMethods()) {
// MethodConfig 加入 map 集合
AbstractConfig.appendParameters(map, method, method.getName());
// retry 参数处理
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
// 不重试,则 retries 设置为 0
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
// ArgumentConfig 处理
List<ArgumentConfig> arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)) {
for (ArgumentConfig argument : arguments) {
// type 属性是否为空
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// 对比方法名称
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
// ArgumentConfig 中的 type 是否与当前方法参数列表中的参数名是否相同
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
// 遍历参数列表,查找类型名称为 argument.getType() 的参数
if (argclazz.getName().equals(argument.getType())) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
// 用户未配置 type 属性,但配置了 index 属性,且 index != -1
} else if (argument.getIndex() != -1) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
// 判断 generic 属性是否为 true
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
// 生成接口 wrapper 包装类,其中包含的接口的详细信息
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
// 逗号分隔方法名,加入 map
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
// 将 token 加入 map ,token 用于令牌验证
if(ConfigUtils.isEmpty(token) && provider != null) {
token = provider.getToken();
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
// token 为默认值或 true 时,随机生成 token
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
// map 数据加入 ServiceMetadata
serviceMetadata.getAttachments().putAll(map);
// 服务暴露逻辑
// 获取 host , port
String host = findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = findConfigedPorts(protocolConfig, name, map);
// URL 组装
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
// SPI 加载 ConfiguratorFactory 获取 Configurator 实例,配置 url
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
// 从 url 中获取 scope 作用域
String scope = url.getParameter(SCOPE_KEY);
// scope 为 none 时,不暴露服务
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// scope 非 remote 时,本地 injvm 暴露服务
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// scope 非 local 时,远程暴露服务
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
// 有注册中心
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
// 协议为 injvm 时跳出循环
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
// 服务是否动态注册,如果设为false,注册后将显示为disable状态,需人工启用,并且服务提供者停止时,也不会自动取消注册,需人工禁用。
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
// 加载 monitor 连接
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
// monitor 连接 加入 url
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
// 代理参数
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 为服务类生成 Invoker
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// 持有 Invoker 和 ServiceConfig
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 服务暴露,生成 Export
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
} else { // 没有注册中心,仅暴露服务
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
方法代码较长,分支较多,需要小伙伴耐心看。代码容易理解,只是参数非常多。上面说了,URL 参数时 Dubbo 的每个扩展点通信的同一参数,所以这个方法比较重要的一部分是组装 URL 参数。
另一部分就是要分析的重点,服务暴露逻辑。
通过阅读上面的 #doExportUrlsFor1Protocol
方法,我们知道,根据 scope 的不同关于服务暴露大体分三种情况:
- scope = none,不导出服务
- scope != remote,导出到本地
- scope != local,导出到远程
none 的情况就不说了,而无论是暴露服务到本地还是远程,我们观察到都需要生成一个引用服务的 Invoker 。
Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
这样看来,Invoker 屏蔽了调用的内部细节,也就是说在调用时无需关心服务的内部是怎么实现的。
接下来我们看下如何创建 Invoker :
Dubbo 使用 ProxyFactory
来创建 Invoker ,ProxyFactory
是 SPI 接口,默认的扩展使用javassist
实现,即 JavassistProxyFactory,下面我们看一下:
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
// 获取代理
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 创建 Wrapper 类
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 创建 Invoker
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
其中 Wrapper 的生成过程使用了 Dubbo 基于 Javassist
封装的 ClassGenerator
类,有兴趣的小伙伴自行查看,这里我们不过多分析。
接下来我们看 ServiceConfig
中的 exportLocal(URL url)
方法,分析服务本地暴露的过程。
private void exportLocal(URL url) {
// 构建 URL 对象,协议 injvm , host 127.0.0.1
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
// 创建 Invoker ,调用 InjvmProtocol 的 export 方法暴露服务
// 其中 PROTOCOL 为 Protocol 的自适应扩展,会根据 URL 的参数自动获得 InjvmProtocol 对象。
Exporter<?> exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
这个方法比较简单,首先构建 URL 对象,将协议、主机和端口设置为本地状态,然后创建 Invoker ,再调用 InjvmProtocol
的 export
方法暴露服务到本地。下面是 export 方法:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
如上,创建完 InjvmExport 对象后,本地暴露的过程就完成了,而服务的引用过程,就是基于不同的 Export 来完成的,这会在后面的服务引用中详细说明。
在 Dubbo 中,有多种 Protocol
的实现,服务暴露的逻辑大体相同,与本地暴露相比,远程暴露还会执行下面两个步骤:
下面就以 Dubbo 默认的 dubbo
协议来说明服务远程暴露的过程。
通过本地服务暴露的过程我们知道,暴露的动作是通过自适应扩展加载协议对应的 xxxProtocol 对象并调用其 export(Invoker invoker)
方法实现的。由此容易联想到当 dubbo
协议时其会调用 DubboProtocol#export(Invoker invoker)
方法执行服务暴露逻辑。
其实不是的。
回顾上述 doExportUrlsFor1Protocol 方法的 174 行,在生成 Invoker 组装 Registry 的 URL 对象时有代码:
registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())
生成后的 url 类似这样:
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&metadata-type=remote&pid=3451&qos.port=22222®istry=zookeeper×tamp=1599128056569
可以看到经过这样处理过的 URL 的协议是 registry
,所以应该调用的是 RegistryProtocol 类的方法。
你以为这样就对了吗?当然不对!
在 《Dubbo 的 SPI 实现》中,我们分析了 Dubbo 的 SPI 实现细节,其中也提到了 Protocol 的 Wrapper 实现,我们查看关于 Protocol 的 SPI 配置发现其中并没有配置各种协议的实现,只配置了两个 Wrapper 类。
同时在两个 Wrapper 中也指定了优先级:
所以实际的执行顺序应该是:
Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol => DubboProtocol
这部分在上面本地暴露时没有说明,要特别注意。
在不指定 scope 时,Dubbo 默认本地和远程都暴露服务,
ProtocolFilterWrapper 和 ProtocolListenerWrapper 没做什么,下面看 RegistryProtocol 的 export 方法。
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 获取注册中心 URL ,如 zookeeper: zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F10.2.44.93%3A20880%2Forg.apache.dubbo.demo.DemoService...
URL registryUrl = getRegistryUrl(originInvoker);
// 获取已经注册的服务提供者
URL providerUrl = getProviderUrl(originInvoker);
// 获取订阅 URL
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//创建 Invoker 并调用 protocol.export 暴露服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(originInvoker);
// 创建监听器
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// 是否注册
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
notifyExport(exporter);
// 创建并返回 DestroyableExporter
return new DestroyableExporter<>(exporter);
}
总结一下上面代码主要做了下面几件事:
doLocalExport(invoker)
方法暴露服务继续看 doLocalExport(invoker)
方法:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
// 根据 Invoker 获取缓存 key
String key = getCacheKey(originInvoker);
// 从缓存中获取,如果没有暴露过服务则调用 protocol.export 暴露服务
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
如上,根据协议 dubbo 最终调用 DubboProtocol 的 export 方法:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 获取 URL
URL url = invoker.getUrl();
// 创建 DubboExporter
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 加入缓存
exporterMap.put(key, exporter);
// 本地存根相关
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
// 启动本地服务
openServer(url);
// 序列化优化
optimizeSerialization(url);
return exporter;
}
继续看比较重要的 openServer(url)
方法:
private void openServer(URL url) {
// 获取本地服务地址,ip:port
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
// 缓存中获取通信服务器,防止重复创建
ProtocolServer server = serverMap.get(key);
if (server == null) {
// 双重校验锁,如果缓存不存在则创建通信 server 并加入缓存
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
// 创建服务器
serverMap.put(key, createServer(url));
}
}
} else {
// 若服务器已创建,则根据 url 中的配置重置
server.reset(url);
}
}
}
在第 20 行中,方法的开头已经做了缓存检查,为什么还会出现通信服务器已经创建的情况呢?在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例。若某个端口上已有服务器实例,此时则调用 reset 方法重置服务器的一些配置。此部代码不是重点,有兴趣的小伙伴自行查看。
继续看 createServer(url)
方法,如何创建通信服务器:
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
// 默认开启 server 关闭时发送 READ_ONLY 事件
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// 心跳检测
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
// 设置编码器为 dubbo
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
// 获取 server 类型,默认 netty
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
// 判断对应的扩展是否存在
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
// 启动通信服务器
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 判断 client 对应的扩展是否存在
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return new DubboProtocolServer(server);
}
至此,关于服务远程暴露的启动通信服务器部分就分析完了,关于服务器如何创建的细节,会在后面的部分专门加以分析。
另外一部分服务注册的逻辑,就是将服务的 URL 数据保存至注册中心,我们在上一篇 《Dubbo系列笔记之zookeeper注册中心原理》 也有提及,感兴趣的小伙伴可以继续跟进代码查看。