码恋 码恋

ALL YOUR SMILES, ALL MY LIFE.

目录
Dubbo系列笔记之zookeeper注册中心原理
/  

Dubbo系列笔记之zookeeper注册中心原理

在本文伊始,我们先简单了解一下zookeeper。


一、zookeeper简介

image.png

1、什么是zookeeper

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

2、zookeeper 的构成

1) 文件系统

image.png

ZK 底层是一套树状的存储结构,类似于 Unix 系统的文件路径,从根节点开始,向下树状延伸,ZK 的节点称之为 znode ,在这些节点上可以保存数据。ZK 分为四种类型的节点:

节点类型描述
PERSISTENT持久化目录节点,在客户端与 ZK 端开连接后,节点仍存在
PERSISTENT_SEQUENTIAL持久化顺序编号目录节点,在客户端与 ZK 断开连接后,节点仍存在,同时 ZK 为每个节点进行了顺序编号
EPHEMERAL临时目录节点,在客户端与 ZK 断开连接后,节点被删除
EPHEMERAL_SEQUENTIAL临时顺序编号目录节点 ,在客户端与 ZK 断开连接后,节点被删除,节点间有编号顺序
2) 通知机制

ZK 使用了观察者设计模式,客户端会向 ZK 注册其关心的节点。这样在节点发生变化时(数据的更新,子节点的增加删除等),ZK 就会通知客户端。

二、Dubbo 注册中心

在整个 Dubbo 中,注册中心是一个非常重要的组件,承担了服务的注册和发现功能,是连接分布式服务节点的纽带。其主要作用有:

(1)动态注册:服务提供者可以通过注册中心动态的把自己暴露给其他消费者,消费者无需逐个更新配置文件。

(2)动态发现:消费者可以动态感知新的配置、路由规则和新的服务提供者,无需重启服务使之生效。

(3)动态调整:注册中心支持参数的动态调整,新参数自动更新到所有相关服务节点。

(4)统一配置:避免了本地配置导致每个服务的配置不一致问题。

1、注册中心简述

Dubbo 注册中心的源码在 dubbo-registry 模块中,其中 dubbo-registry-api 是对注册中心的 api 层定义,其他的包是一些具体实现。

image.png

下面是注册中心一些关键接口和抽象类:

image.png

以及 zk 注册中心实现:

image.png

2、关键代码

下面从注册中心的核心 API 开始,再到具体的 zk 注册中心实现,来分析源码如何实现:

(1) 核心 API
  • RegistryFactory

    @SPI("dubbo")
    public interface RegistryFactory {
    
    /**
    
    * Connect to the registry
    * <p>
    * Connecting the registry needs to support the contract: <br>
    * 1. When the check=false is set, the connection is not checked, otherwise the exception is thrown when disconnection <br>
    * 2. Support username:password authority authentication on URL.<br>
    * 3. Support the backup=10.20.153.10 candidate registry cluster address.<br>
    * 4. Support file=registry.cache local disk file cache.<br>
    * 5. Support the timeout=1000 request timeout setting.<br>
    * 6. Support session=60000 session timeout or expiration settings.<br>
    * 
    * @param url Registry address, is not allowed to be empty
    * @return Registry reference, never return empty value
      */
      @Adaptive({"protocol"})
      Registry getRegistry(URL url);
    }
    

    RegistryFactory 是 Dubbo 的 SPI 拓展接口,该接口只有一个方法 Registry getRegistry(URL url); 使用 Dubbo 的自适应拓展机制根据 URL 中的 protocol 参数获得对应的 Registry 对象。


  • AbstractRegistryFactory
    实现了 RegistryFactory 接口,提供了对 Registry 的容器管理。

    protected static final Map<String, Registry> REGISTRIES = new HashMap<>();
    

    使用 REGISTRIES 存储不同的注册中心,key 为 URL toString。
    重写了 #getRegistry 方法,加入缓存和锁,同时提供了创建 Registry 的抽象方法,提供了销毁 Registry 的加锁实现。


  • RegistryService
    RegistryService 接口定义了一个注册中心最基本的功能,注册、取消注册、订阅、取消订阅、查看符合条件的注册数据。

  • Registry

    public interface Registry extends Node, RegistryService {
        default void reExportRegister(URL url) {
            register(url);
        }
    
        default void reExportUnregister(URL url) {
            unregister(url);
        }
      }
    

    java 8 接口默认实现方法,包装了注册和取消注册方法,同时继承了 RegistryService 和 Node ,拥有注册,订阅,查询相关方法,以及 Node 相关方法。


  • AbstractRegistry

    AbstractRegistry 实现了 Registry ,在实现注册和取消注册、订阅和取消订阅功能的基础上面,对注册数据提供了持久化操作保存到文件,以便注册中心在宕机后重新启动恢复服务提供者列表等信息。

    • 构造方法

      public AbstractRegistry(URL url) {
          // 保存 URL 信息
          setUrl(url);
          // 开启本地缓存,默认开启
          if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, true)) {
              // 创建本地缓存文件
              syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false);
              String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache";
              String filename = url.getParameter(FILE_KEY, defaultFilename);
              File file = null;
              if (ConfigUtils.isNotEmpty(filename)) {
                  file = new File(filename);
                  if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                      if (!file.getParentFile().mkdirs()) {
                          throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                      }
                  }
              }
              this.file = file;
              // 读取本地 properties 缓存文件到内存
              loadProperties();
              // 通知监听器 URL 的变化
              notify(url.getBackupUrls());
          }
      }
      
    • 注册 & 订阅方法
      并未实现向注册中心真正的注册,而是简单放入注册的本地变量 registered 中存储。订阅逻辑与之相同。

    • notify(List<URL> urls) 方法
      在 AbstratRegistry 的构造方法中我们看到,每次 Registry 的 URL 的更新都会通过这个方法通知到监听器。

      /**
       * 通知监听器 URL 的变化结果
       *
       * @param url      consumer side url
       * @param listener listener
       * @param urls     provider latest urls
       */
      protected void notify(URL url, NotifyListener listener, List<URL> urls) {
          if (url == null) {
              throw new IllegalArgumentException("notify url == null");
          }
          if (listener == null) {
              throw new IllegalArgumentException("notify listener == null");
          }
          if ((CollectionUtils.isEmpty(urls))
                  && !ANY_VALUE.equals(url.getServiceInterface())) {
              logger.warn("Ignore empty notify urls for subscribe url " + url);
              return;
          }
          if (logger.isInfoEnabled()) {
              logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
          }
          // 按照 category 将 urls 加入集合
          Map<String, List<URL>> result = new HashMap<>();
          for (URL u : urls) {
              if (UrlUtils.isMatch(url, u)) {
                  String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                  List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                  categoryList.add(u);
              }
          }
          if (result.size() == 0) {
              return;
          }
          // 在 notified 中获取对应 URL 的数据,保存到文件中,并通知监听器
          // 获取当前 URL 为 key 的数据,如果不存在就创建
          Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
          for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
              String category = entry.getKey();
              List<URL> categoryList = entry.getValue();
              // 变化结果写入
              categoryNotified.put(category, categoryList);
              // 通知监听器
              listener.notify(categoryList);
              // 保存到文件
              saveProperties(url);
          }
      }
      

  • FailbackRegistry
    在上面的抽象 Registry 实现中,仅保存了注册订阅时的 URL 信息,并没有实现向注册中心注册订阅的动作。FailbackRegistry 在此基础上面,实现了注册订阅,并提供了失败重试的特性。

    • 属性

      /**
       * 一系列待重试的任务集合
       */
      private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();
      
      private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();
      
      private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();
      
      private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();
      
      private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();
      
      /**
       * 定时器的等待时间(单位毫秒)
       */
      private final int retryPeriod;
      
      /**
       * 重试失败计时器,定期检查是否有失败请求,是否有无限次重试
       */
      private final HashedWheelTimer retryTimer;
      
      public FailbackRegistry(URL url) {
          super(url);
          // 从 URL 中获取设置重试时间
          this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);
      
          // 创建失败重试定时器
          retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
      }
      

    其中 Holder 维护了 URL 与 监听器的对应关系,而在注册订阅等方法中,增加了异常处理,提供了重试逻辑,并提供对应的模板方法。

(2) 扩展注册中心

上面大致浏览了 Dubbo 为注册中心提供的 API ,那么要实现一个真正的注册中心,如何扩展呢?

通过上面的源码我们知道 API 层已经为我们定义好了注册中心的基本行为和抽象模型以及失败重试等等。这样,扩展注册中心只需要完成最基本的两大功能:服务注册和服务发现。

具体到代码上,只需扩展下面两个类:

  • org.apache.dubbo.registry.RegistryFactory
  • org.apache.dubbo.registry.Registry

同时注意注册中心工厂类是 SPI ,详见 《注册中心扩展》

三、Zookeeper 注册中心

理解 Zookeeper 注册中心是如何实现的,我们并不是直接上代码。而是从上面 扩展注册中心 的角度出发,我们来分析如何使用 Zookeeper 实现一个 Dubbo 注册中心。

1、数据存储结构和流程分析

利用 Zookeeper 的树形目录结构,首先我们来设计注册信息在其中的存储结构:

image.png

  • 数据存储结构

    • 在 zookeeper 节点的选型上,使用持久节点和临时节点两种,由于对顺序没有要求,不使用编号。
    • 在 zookeeper 节点的层级上,定义以 dubbo 为 root 节点,下层分别是 Service/Type/URL,典型的层级结构为 /dubbo/com.aysaml.demoService/providers/10.2.35.240:20880
  • 流程分析

    • 服务提供者启动时,向 /dubbo/com.aysaml.demoService/providers 目录下写入自己的 URL 地址。
    • 服务消费者启动时,订阅 /dubbo/com.aysaml.demoService/providers 目录下的服务提供者 URL 地址,并向 /dubbo/com.aysaml.demoService/consumer 目录下写入自己的 URL 地址。
    • 监控中心启动时,订阅 /dubbo/com.aysaml.demoService 目录下的所有服务提供者和服务消费者的 URL 地址。

❤注意:在服务消费者启动后,不仅仅订阅了 "providers" 目录,同时订阅了 "routes"(路由规则列表) 和 "configurations"(配置规则列表) 目录,这在上图中没有体现。

除了上述的主要功能外,还可以支持以下功能:

  • 当提供者出现断电等异常停机时,注册中心能自动删除提供者信息
  • 当注册中心重启时,能自动恢复注册数据,以及订阅请求
  • 当会话过期时,能自动恢复注册数据,以及订阅请求
  • 当设置 <dubbo:registry check="false" /> 时,记录失败注册和订阅请求,后台定时重试
  • 可通过 <dubbo:registry username="admin" password="1234" /> 设置 zookeeper 登录信息
  • 可通过 <dubbo:registry group="dubbo" /> 设置 zookeeper 的根节点,不配置将使用默认的根节点。
  • 支持 * 号通配符 <dubbo:reference group="*" version="*" />,可订阅服务的所有分组和所有版本的提供者

2、实现 zookeeper 注册中心

我们知道,Dubbo 的很多组件都是以 SPI 形式提供的,参考官方文档 《注册中心扩展》 下面看如何实现 zookeeper 注册中心。

1、扩展接口

实现注册中心只需要扩展两个接口,负责服务的注册和发现功能。

(1) org.apache.dubbo.registry.Registry

对于 Registry ,上面我们说了很多他的实现类,其中 FailbackRegistry 支持失败重试的特点,我们直接继承并重写相关注册订阅方法即可。

/**
 * ZookeeperRegistry
 *
 */
public class ZookeeperRegistry extends FailbackRegistry {

    private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);

    /** 默认 zk 根节点名称 */
    private final static String DEFAULT_ROOT = "dubbo";

    /** zk 根节点 */
    private final String root;

    /** 保存 Services 接口全限定名 */
    private final Set<String> anyServices = new ConcurrentHashSet<>();

    /** 监听器 */
    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<>();

    /** zk 客户端 */
    private final ZookeeperClient zkClient;

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        // 从 URL 中获得 zk 的根节点名称,如为空则为默认 dubbo
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        this.root = group;
        // 连接 zk 客户端
        zkClient = zookeeperTransporter.connect(url);
        // 添加监听器
        zkClient.addStateListener((state) -> {
            // 重连状态,恢复订阅,获取最新的提供者列表
            if (state == StateListener.RECONNECTED) {
                logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
                        " Since ephemeral ZNode will not get deleted for a connection lose, " +
                        "there's no need to re-register url of this instance.");
                ZookeeperRegistry.this.fetchLatestAddresses();
            } else if (state == StateListener.NEW_SESSION_CREATED) { // 新的 session 连接,加入重试任务重新注册订阅
                logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
                try {
                    ZookeeperRegistry.this.recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            } else if (state == StateListener.SESSION_LOST) {
                logger.warn("Url of this instance will be deleted from registry soon. " +
                        "Dubbo client will try to re-register once a new session is created.");
            } else if (state == StateListener.SUSPENDED) {

            } else if (state == StateListener.CONNECTED) {

            }
        });
    }

    @Override
    public boolean isAvailable() {
        // zk 是否可用
        return zkClient.isConnected();
    }

    @Override
    public void destroy() {
        // 销毁 registry
        super.destroy();
        try {
            // 关闭 zk 连接
            zkClient.close();
        } catch (Exception e) {
            logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

    @Override
    public void doRegister(URL url) {
        try {
            // 服务注册,即创建服务对应的 URL 节点
            // 关于 dynamic , 若为 false 时,该数据为持久化节点,当注册方退出时,数据依然保存在注册中心。
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

    @Override
    public void doUnregister(URL url) {
        try {
            // 取消注册,即删除对应的 URL 节点
            zkClient.delete(toUrlPath(url));
        } catch (Throwable e) {
            throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

    @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            // 通配符 * ,订阅所有服务,如监控中心
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                // 获取 url 对应的监听器,不存在则创建
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                // 获取对应的子节点监听器,不存在则创建
                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
                    for (String child : currentChilds) {
                        child = URL.decode(child);
                        // 新注册的服务
                        if (!anyServices.contains(child)) {
                            // 加入集合
                            anyServices.add(child);
                            // 重新订阅该服务层服务
                            subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                    Constants.CHECK_KEY, String.valueOf(false)), k);
                        }
                    }
                });
                // 创建根节点,持久化目录
                zkClient.create(root, false);
                // 订阅 service 层服务,即增加相应的子节点监听器
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (CollectionUtils.isNotEmpty(services)) {
                    // 循环订阅
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else { // 特定 service 层的订阅
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {
                    // 获取 url 对应的监听器,不存在则创建
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                    // 子节点监听器
                    ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    // 创建 service 目录,持久化
                    zkClient.create(path, false);
                    // 订阅
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // 通知监听器 url 的变化结果
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

    @Override
    public void doUnsubscribe(URL url, NotifyListener listener) {
        // 取消订阅逻辑
        // 获取 service 的所有监听器
        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
        if (listeners != null) {
            // 子节点
            ChildListener zkListener = listeners.get(listener);
            if (zkListener != null) {
                // 全量订阅处理
                if (ANY_VALUE.equals(url.getServiceInterface())) {
                    String root = toRootPath();
                    zkClient.removeChildListener(root, zkListener);
                } else { // 特定 service
                    for (String path : toCategoriesPath(url)) {
                        // 取消订阅,删除监听器
                        zkClient.removeChildListener(path, zkListener);
                    }
                }
            }
        }
    }

    /**
     * 根据 url 参数查询已经注册的数据
     *
     * @param url
     * @return
     */
    @Override
    public List<URL> lookup(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("lookup url == null");
        }
        try {
            List<String> providers = new ArrayList<>();
            for (String path : toCategoriesPath(url)) {
                // 获取路径下所有服务提供者
                List<String> children = zkClient.getChildren(path);
                if (children != null) {
                    // 加入服务提供者列表
                    providers.addAll(children);
                }
            }
            return toUrlsWithoutEmpty(url, providers);
        } catch (Throwable e) {
            throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

    private String toRootDir() {
        if (root.equals(PATH_SEPARATOR)) {
            return root;
        }
        return root + PATH_SEPARATOR;
    }

    private String toRootPath() {
        return root;
    }

    /**
     * 获取服务层路径
     *
     * 例:/dubbo/com.aysaml.demoService
     *
     * @param url
     * @return
     */
    private String toServicePath(URL url) {
        // 接口名
        String name = url.getServiceInterface();
        // 通配符 * ,订阅所有
        if (ANY_VALUE.equals(name)) {
            return toRootPath();
        }
        return toRootDir() + URL.encode(name);
    }

    /**
     * 批量获取分类路径
     *
     * @param url
     * @return
     */
    private String[] toCategoriesPath(URL url) {
        String[] categories;
        if (ANY_VALUE.equals(url.getParameter(CATEGORY_KEY))) {
            categories = new String[]{PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY, CONFIGURATORS_CATEGORY};
        } else {
            categories = url.getParameter(CATEGORY_KEY, new String[]{DEFAULT_CATEGORY});
        }
        String[] paths = new String[categories.length];
        for (int i = 0; i < categories.length; i++) {
            paths[i] = toServicePath(url) + PATH_SEPARATOR + categories[i];
        }
        return paths;
    }

    /**
     * 获取分类路径
     *
     * 例:/dubbo/com.aysaml.demoService/providers
     *
     * @param url
     * @return
     */
    private String toCategoryPath(URL url) {
        return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
    }

    /**
     * 获取 URL 路径
     *
     * 格式为:Root / Service / Type / URL
     *
     * @param url
     * @return
     */
    private String toUrlPath(URL url) {
        return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString());
    }

    /**
     * 获取服务提供者中和消费者匹配的数据
     *
     * 不为空的情况
     *
     * @param consumer
     * @param providers
     * @return
     */
    private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
        List<URL> urls = new ArrayList<>();
        if (CollectionUtils.isNotEmpty(providers)) {
            for (String provider : providers) {
                if (provider.contains(PROTOCOL_SEPARATOR_ENCODED)) {
                    URL url = URLStrParser.parseEncodedStr(provider);
                    // 匹配
                    if (UrlUtils.isMatch(consumer, url)) {
                        urls.add(url);
                    }
                }
            }
        }
        return urls;
    }


    /**
     * 获得与消费者匹配的服务提供者列表
     *
     * 如果为空,则返回 empty:// 开头的 url, 可以处理没有服务提供者的情况
     *
     * @param consumer
     * @param path
     * @param providers
     * @return
     */
    private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
        List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
        if (urls == null || urls.isEmpty()) {
            int i = path.lastIndexOf(PATH_SEPARATOR);
            String category = i < 0 ? path : path.substring(i + 1);
            URL empty = URLBuilder.from(consumer)
                    .setProtocol(EMPTY_PROTOCOL)
                    .addParameter(CATEGORY_KEY, category)
                    .build();
            urls.add(empty);
        }
        return urls;
    }

    /**
     * 当Zookeeper连接从连接断开中恢复后,它需要获取最新的服务提供者列表
     * re-register watcher is only a side effect and is not mandate.
     */
    private void fetchLatestAddresses() {
        // 获取订阅的集合
        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!recoverSubscribed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Fetching the latest urls of " + recoverSubscribed.keySet());
            }
            for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    // 加入失败重试订阅集合,重连
                    addFailedSubscribed(url, listener);
                }
            }
        }
    }

}
(2) org.apache.dubbo.registry.RegistryFactory
/**
 * RegistryFactory. (SPI, Singleton, ThreadSafe)
 *
 * @see org.apache.dubbo.registry.support.AbstractRegistryFactory
 */
@SPI("dubbo")
public interface RegistryFactory {

    /**
     * Connect to the registry
     * <p>
     * Connecting the registry needs to support the contract: <br>
     * 1. When the check=false is set, the connection is not checked, otherwise the exception is thrown when disconnection <br>
     * 2. Support username:password authority authentication on URL.<br>
     * 3. Support the backup=10.20.153.10 candidate registry cluster address.<br>
     * 4. Support file=registry.cache local disk file cache.<br>
     * 5. Support the timeout=1000 request timeout setting.<br>
     * 6. Support session=60000 session timeout or expiration settings.<br>
     *
     * @param url Registry address, is not allowed to be empty
     * @return Registry reference, never return empty value
     */
    @Adaptive({"protocol"})
    Registry getRegistry(URL url);

}

该接口提供了 getRegistry(URL url) 方法通过工厂模式获得 protocol 对应的 Registry 对象。

上面我们提到 AbstractRegistryFactory ,其以模板模式实现了对 Registry 的管理逻辑并实现了 RegistryFactory 接口,因此我们只需继承 AbstractRegistryFactory

/**
 * ZookeeperRegistryFactory.
 *
 */
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    private ZookeeperTransporter zookeeperTransporter;

    /**
     * 通过 dubbo SPI ioc 注入 zk 工厂对象
     * @param zookeeperTransporter
     */
    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

}

最后,按照 Dubbo SPI 的规则,在 resource/META-INF/dubbo/org.apache.dubbo.registry.RegistryFactory 中配置扩展:

zookeeper=org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory

四、结语

Dubbo 的 zookeeper 中心的实现原理,其实不难,理解了注册中心要做的最重要的两件事:注册和订阅,然后再基于 zk 的目录和通知的机制,就很容易理解源码。



❤ 转载请注明本文地址或来源,谢谢合作 ❤


center