在本文伊始,我们先简单了解一下zookeeper。
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终,将简单易用的接口和性能高效、功能稳定的系统提供给用户。
ZK 底层是一套树状的存储结构,类似于 Unix 系统的文件路径,从根节点开始,向下树状延伸,ZK 的节点称之为 znode ,在这些节点上可以保存数据。ZK 分为四种类型的节点:
节点类型 | 描述 |
---|---|
PERSISTENT | 持久化目录节点,在客户端与 ZK 端开连接后,节点仍存在 |
PERSISTENT_SEQUENTIAL | 持久化顺序编号目录节点,在客户端与 ZK 断开连接后,节点仍存在,同时 ZK 为每个节点进行了顺序编号 |
EPHEMERAL | 临时目录节点,在客户端与 ZK 断开连接后,节点被删除 |
EPHEMERAL_SEQUENTIAL | 临时顺序编号目录节点 ,在客户端与 ZK 断开连接后,节点被删除,节点间有编号顺序 |
ZK 使用了观察者设计模式,客户端会向 ZK 注册其关心的节点。这样在节点发生变化时(数据的更新,子节点的增加删除等),ZK 就会通知客户端。
在整个 Dubbo 中,注册中心是一个非常重要的组件,承担了服务的注册和发现功能,是连接分布式服务节点的纽带。其主要作用有:
(1)动态注册:服务提供者可以通过注册中心动态的把自己暴露给其他消费者,消费者无需逐个更新配置文件。
(2)动态发现:消费者可以动态感知新的配置、路由规则和新的服务提供者,无需重启服务使之生效。
(3)动态调整:注册中心支持参数的动态调整,新参数自动更新到所有相关服务节点。
(4)统一配置:避免了本地配置导致每个服务的配置不一致问题。
Dubbo 注册中心的源码在 dubbo-registry
模块中,其中 dubbo-registry-api
是对注册中心的 api 层定义,其他的包是一些具体实现。
下面是注册中心一些关键接口和抽象类:
以及 zk 注册中心实现:
下面从注册中心的核心 API 开始,再到具体的 zk 注册中心实现,来分析源码如何实现:
RegistryFactory
@SPI("dubbo")
public interface RegistryFactory {
/**
* Connect to the registry
* <p>
* Connecting the registry needs to support the contract: <br>
* 1. When the check=false is set, the connection is not checked, otherwise the exception is thrown when disconnection <br>
* 2. Support username:password authority authentication on URL.<br>
* 3. Support the backup=10.20.153.10 candidate registry cluster address.<br>
* 4. Support file=registry.cache local disk file cache.<br>
* 5. Support the timeout=1000 request timeout setting.<br>
* 6. Support session=60000 session timeout or expiration settings.<br>
*
* @param url Registry address, is not allowed to be empty
* @return Registry reference, never return empty value
*/
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
RegistryFactory 是 Dubbo 的 SPI 拓展接口,该接口只有一个方法 Registry getRegistry(URL url);
使用 Dubbo 的自适应拓展机制根据 URL 中的 protocol 参数获得对应的 Registry 对象。
AbstractRegistryFactory
实现了 RegistryFactory 接口,提供了对 Registry 的容器管理。
protected static final Map<String, Registry> REGISTRIES = new HashMap<>();
使用 REGISTRIES 存储不同的注册中心,key 为 URL toString。
重写了 #getRegistry 方法,加入缓存和锁,同时提供了创建 Registry 的抽象方法,提供了销毁 Registry 的加锁实现。
Registry
public interface Registry extends Node, RegistryService {
default void reExportRegister(URL url) {
register(url);
}
default void reExportUnregister(URL url) {
unregister(url);
}
}
java 8 接口默认实现方法,包装了注册和取消注册方法,同时继承了 RegistryService 和 Node ,拥有注册,订阅,查询相关方法,以及 Node 相关方法。
AbstractRegistry
AbstractRegistry
实现了 Registry
,在实现注册和取消注册、订阅和取消订阅功能的基础上面,对注册数据提供了持久化操作保存到文件,以便注册中心在宕机后重新启动恢复服务提供者列表等信息。
构造方法
public AbstractRegistry(URL url) {
// 保存 URL 信息
setUrl(url);
// 开启本地缓存,默认开启
if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, true)) {
// 创建本地缓存文件
syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false);
String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache";
String filename = url.getParameter(FILE_KEY, defaultFilename);
File file = null;
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
this.file = file;
// 读取本地 properties 缓存文件到内存
loadProperties();
// 通知监听器 URL 的变化
notify(url.getBackupUrls());
}
}
注册 & 订阅方法
并未实现向注册中心真正的注册,而是简单放入注册的本地变量 registered
中存储。订阅逻辑与之相同。
notify(List<URL> urls)
方法
在 AbstratRegistry 的构造方法中我们看到,每次 Registry 的 URL 的更新都会通过这个方法通知到监听器。
/**
* 通知监听器 URL 的变化结果
*
* @param url consumer side url
* @param listener listener
* @param urls provider latest urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((CollectionUtils.isEmpty(urls))
&& !ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
// 按照 category 将 urls 加入集合
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
// 在 notified 中获取对应 URL 的数据,保存到文件中,并通知监听器
// 获取当前 URL 为 key 的数据,如果不存在就创建
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
// 变化结果写入
categoryNotified.put(category, categoryList);
// 通知监听器
listener.notify(categoryList);
// 保存到文件
saveProperties(url);
}
}
FailbackRegistry
在上面的抽象 Registry 实现中,仅保存了注册订阅时的 URL 信息,并没有实现向注册中心注册订阅的动作。FailbackRegistry
在此基础上面,实现了注册订阅,并提供了失败重试的特性。
属性
/**
* 一系列待重试的任务集合
*/
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();
private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();
/**
* 定时器的等待时间(单位毫秒)
*/
private final int retryPeriod;
/**
* 重试失败计时器,定期检查是否有失败请求,是否有无限次重试
*/
private final HashedWheelTimer retryTimer;
public FailbackRegistry(URL url) {
super(url);
// 从 URL 中获取设置重试时间
this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);
// 创建失败重试定时器
retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
}
其中 Holder
维护了 URL 与 监听器的对应关系,而在注册订阅等方法中,增加了异常处理,提供了重试逻辑,并提供对应的模板方法。
上面大致浏览了 Dubbo 为注册中心提供的 API ,那么要实现一个真正的注册中心,如何扩展呢?
通过上面的源码我们知道 API 层已经为我们定义好了注册中心的基本行为和抽象模型以及失败重试等等。这样,扩展注册中心只需要完成最基本的两大功能:服务注册和服务发现。
具体到代码上,只需扩展下面两个类:
org.apache.dubbo.registry.RegistryFactory
org.apache.dubbo.registry.Registry
同时注意注册中心工厂类是 SPI ,详见 《注册中心扩展》
理解 Zookeeper 注册中心是如何实现的,我们并不是直接上代码。而是从上面 扩展注册中心 的角度出发,我们来分析如何使用 Zookeeper 实现一个 Dubbo 注册中心。
利用 Zookeeper 的树形目录结构,首先我们来设计注册信息在其中的存储结构:
数据存储结构
/dubbo/com.aysaml.demoService/providers/10.2.35.240:20880
。流程分析
/dubbo/com.aysaml.demoService/providers
目录下写入自己的 URL 地址。/dubbo/com.aysaml.demoService/providers
目录下的服务提供者 URL 地址,并向 /dubbo/com.aysaml.demoService/consumer
目录下写入自己的 URL 地址。/dubbo/com.aysaml.demoService
目录下的所有服务提供者和服务消费者的 URL 地址。❤注意:在服务消费者启动后,不仅仅订阅了
"providers"
目录,同时订阅了"routes"
(路由规则列表) 和"configurations"
(配置规则列表) 目录,这在上图中没有体现。
除了上述的主要功能外,还可以支持以下功能:
<dubbo:registry check="false" />
时,记录失败注册和订阅请求,后台定时重试<dubbo:registry username="admin" password="1234" />
设置 zookeeper 登录信息<dubbo:registry group="dubbo" />
设置 zookeeper 的根节点,不配置将使用默认的根节点。*
号通配符 <dubbo:reference group="*" version="*" />
,可订阅服务的所有分组和所有版本的提供者我们知道,Dubbo 的很多组件都是以 SPI 形式提供的,参考官方文档 《注册中心扩展》 下面看如何实现 zookeeper 注册中心。
实现注册中心只需要扩展两个接口,负责服务的注册和发现功能。
org.apache.dubbo.registry.Registry
对于 Registry ,上面我们说了很多他的实现类,其中 FailbackRegistry
支持失败重试的特点,我们直接继承并重写相关注册订阅方法即可。
/**
* ZookeeperRegistry
*
*/
public class ZookeeperRegistry extends FailbackRegistry {
private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);
/** 默认 zk 根节点名称 */
private final static String DEFAULT_ROOT = "dubbo";
/** zk 根节点 */
private final String root;
/** 保存 Services 接口全限定名 */
private final Set<String> anyServices = new ConcurrentHashSet<>();
/** 监听器 */
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<>();
/** zk 客户端 */
private final ZookeeperClient zkClient;
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 从 URL 中获得 zk 的根节点名称,如为空则为默认 dubbo
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
// 连接 zk 客户端
zkClient = zookeeperTransporter.connect(url);
// 添加监听器
zkClient.addStateListener((state) -> {
// 重连状态,恢复订阅,获取最新的提供者列表
if (state == StateListener.RECONNECTED) {
logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
" Since ephemeral ZNode will not get deleted for a connection lose, " +
"there's no need to re-register url of this instance.");
ZookeeperRegistry.this.fetchLatestAddresses();
} else if (state == StateListener.NEW_SESSION_CREATED) { // 新的 session 连接,加入重试任务重新注册订阅
logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
try {
ZookeeperRegistry.this.recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else if (state == StateListener.SESSION_LOST) {
logger.warn("Url of this instance will be deleted from registry soon. " +
"Dubbo client will try to re-register once a new session is created.");
} else if (state == StateListener.SUSPENDED) {
} else if (state == StateListener.CONNECTED) {
}
});
}
@Override
public boolean isAvailable() {
// zk 是否可用
return zkClient.isConnected();
}
@Override
public void destroy() {
// 销毁 registry
super.destroy();
try {
// 关闭 zk 连接
zkClient.close();
} catch (Exception e) {
logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
@Override
public void doRegister(URL url) {
try {
// 服务注册,即创建服务对应的 URL 节点
// 关于 dynamic , 若为 false 时,该数据为持久化节点,当注册方退出时,数据依然保存在注册中心。
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
@Override
public void doUnregister(URL url) {
try {
// 取消注册,即删除对应的 URL 节点
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
// 通配符 * ,订阅所有服务,如监控中心
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
// 获取 url 对应的监听器,不存在则创建
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
// 获取对应的子节点监听器,不存在则创建
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
// 新注册的服务
if (!anyServices.contains(child)) {
// 加入集合
anyServices.add(child);
// 重新订阅该服务层服务
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), k);
}
}
});
// 创建根节点,持久化目录
zkClient.create(root, false);
// 订阅 service 层服务,即增加相应的子节点监听器
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
// 循环订阅
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else { // 特定 service 层的订阅
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
// 获取 url 对应的监听器,不存在则创建
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
// 子节点监听器
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
// 创建 service 目录,持久化
zkClient.create(path, false);
// 订阅
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 通知监听器 url 的变化结果
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
// 取消订阅逻辑
// 获取 service 的所有监听器
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
// 子节点
ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
// 全量订阅处理
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
zkClient.removeChildListener(root, zkListener);
} else { // 特定 service
for (String path : toCategoriesPath(url)) {
// 取消订阅,删除监听器
zkClient.removeChildListener(path, zkListener);
}
}
}
}
}
/**
* 根据 url 参数查询已经注册的数据
*
* @param url
* @return
*/
@Override
public List<URL> lookup(URL url) {
if (url == null) {
throw new IllegalArgumentException("lookup url == null");
}
try {
List<String> providers = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
// 获取路径下所有服务提供者
List<String> children = zkClient.getChildren(path);
if (children != null) {
// 加入服务提供者列表
providers.addAll(children);
}
}
return toUrlsWithoutEmpty(url, providers);
} catch (Throwable e) {
throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
private String toRootDir() {
if (root.equals(PATH_SEPARATOR)) {
return root;
}
return root + PATH_SEPARATOR;
}
private String toRootPath() {
return root;
}
/**
* 获取服务层路径
*
* 例:/dubbo/com.aysaml.demoService
*
* @param url
* @return
*/
private String toServicePath(URL url) {
// 接口名
String name = url.getServiceInterface();
// 通配符 * ,订阅所有
if (ANY_VALUE.equals(name)) {
return toRootPath();
}
return toRootDir() + URL.encode(name);
}
/**
* 批量获取分类路径
*
* @param url
* @return
*/
private String[] toCategoriesPath(URL url) {
String[] categories;
if (ANY_VALUE.equals(url.getParameter(CATEGORY_KEY))) {
categories = new String[]{PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY, CONFIGURATORS_CATEGORY};
} else {
categories = url.getParameter(CATEGORY_KEY, new String[]{DEFAULT_CATEGORY});
}
String[] paths = new String[categories.length];
for (int i = 0; i < categories.length; i++) {
paths[i] = toServicePath(url) + PATH_SEPARATOR + categories[i];
}
return paths;
}
/**
* 获取分类路径
*
* 例:/dubbo/com.aysaml.demoService/providers
*
* @param url
* @return
*/
private String toCategoryPath(URL url) {
return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
}
/**
* 获取 URL 路径
*
* 格式为:Root / Service / Type / URL
*
* @param url
* @return
*/
private String toUrlPath(URL url) {
return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString());
}
/**
* 获取服务提供者中和消费者匹配的数据
*
* 不为空的情况
*
* @param consumer
* @param providers
* @return
*/
private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
List<URL> urls = new ArrayList<>();
if (CollectionUtils.isNotEmpty(providers)) {
for (String provider : providers) {
if (provider.contains(PROTOCOL_SEPARATOR_ENCODED)) {
URL url = URLStrParser.parseEncodedStr(provider);
// 匹配
if (UrlUtils.isMatch(consumer, url)) {
urls.add(url);
}
}
}
}
return urls;
}
/**
* 获得与消费者匹配的服务提供者列表
*
* 如果为空,则返回 empty:// 开头的 url, 可以处理没有服务提供者的情况
*
* @param consumer
* @param path
* @param providers
* @return
*/
private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
if (urls == null || urls.isEmpty()) {
int i = path.lastIndexOf(PATH_SEPARATOR);
String category = i < 0 ? path : path.substring(i + 1);
URL empty = URLBuilder.from(consumer)
.setProtocol(EMPTY_PROTOCOL)
.addParameter(CATEGORY_KEY, category)
.build();
urls.add(empty);
}
return urls;
}
/**
* 当Zookeeper连接从连接断开中恢复后,它需要获取最新的服务提供者列表
* re-register watcher is only a side effect and is not mandate.
*/
private void fetchLatestAddresses() {
// 获取订阅的集合
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Fetching the latest urls of " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
// 加入失败重试订阅集合,重连
addFailedSubscribed(url, listener);
}
}
}
}
}
org.apache.dubbo.registry.RegistryFactory
/**
* RegistryFactory. (SPI, Singleton, ThreadSafe)
*
* @see org.apache.dubbo.registry.support.AbstractRegistryFactory
*/
@SPI("dubbo")
public interface RegistryFactory {
/**
* Connect to the registry
* <p>
* Connecting the registry needs to support the contract: <br>
* 1. When the check=false is set, the connection is not checked, otherwise the exception is thrown when disconnection <br>
* 2. Support username:password authority authentication on URL.<br>
* 3. Support the backup=10.20.153.10 candidate registry cluster address.<br>
* 4. Support file=registry.cache local disk file cache.<br>
* 5. Support the timeout=1000 request timeout setting.<br>
* 6. Support session=60000 session timeout or expiration settings.<br>
*
* @param url Registry address, is not allowed to be empty
* @return Registry reference, never return empty value
*/
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
该接口提供了 getRegistry(URL url)
方法通过工厂模式获得 protocol 对应的 Registry 对象。
上面我们提到 AbstractRegistryFactory
,其以模板模式实现了对 Registry 的管理逻辑并实现了 RegistryFactory
接口,因此我们只需继承 AbstractRegistryFactory
。
/**
* ZookeeperRegistryFactory.
*
*/
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
/**
* 通过 dubbo SPI ioc 注入 zk 工厂对象
* @param zookeeperTransporter
*/
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
最后,按照 Dubbo SPI 的规则,在 resource/META-INF/dubbo/org.apache.dubbo.registry.RegistryFactory
中配置扩展:
zookeeper=org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory
Dubbo 的 zookeeper 中心的实现原理,其实不难,理解了注册中心要做的最重要的两件事:注册和订阅,然后再基于 zk 的目录和通知的机制,就很容易理解源码。