码恋 码恋

ALL YOUR SMILES, ALL MY LIFE.

目录
Netty 系列笔记之源码分析:启动服务
/  

Netty 系列笔记之源码分析:启动服务

一、概述

我们从 Netty 请求处理的主线来分析源码,注意这部分不会对每个组件的原理过多深入,我们的目标是对 Netty 的请求处理过程有一个线性的理解。

先知其形,再探真义。

是我们学习的基本路线。

首先是启动服务部分,启动服务最核心的是在连接到来之前做好接受连接的准备,本篇讲述启动服务 Netty 是如何做好接受连接的准备的。

二、Netty 启动过程一览

image.png

Netty Server 端启动的过程由两个线程共同完成,其中 Main Thread 是我们的主线程,第一步创建了一个 Selector ,作为服务端继续创建 ServerSocketChannel 并进行初始化,最后为 ServerSocketChannel 分配一个 NioEventLoop 。

下一步是将 ServerSocketChannel 注册到 Selector 上,这一步做了一个线程切换,注册的动作是作为一个 task 来执行的。注册之后就绑定一个地址端口,就可以工作了,就是注册、接收、创建连接到 Selector 上面。

三、Netty 启动源码分析

我们以 netty-example 下的 EchoServer 为例,分析服务端启动的过程。

public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
1、NioEventLoopGroup 初始化
  • 12 行:创建 Boss Group ,我们跟随调用链看 NioEventLoopGroup 的初始化过程。

    image.png

    经过一些列构造方法后,最终调用了 其父类 MultithreadEventExecutorGroup 的构造方法,截取其中的主要逻辑代码

    image.png

    newChild() 方法在其子类 NioEventLoopGroup 中进行了重写:

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
      EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
      return new NioEventLoop(this, executor, (SelectorProvider) args[0],
    ((SelectStrategyFactory) args[1]).newSelectStrategy(),      (RejectedExecutionHandler) args[2], queueFactory);
    }
    

    我们看到最终调用了 NioEventLoop 的构造方法,所以初始化的主要工作是根据传入的线程数,通过调用 newChild() 方法创建 NioEventLoop ,在不指定线程数时,默认创建2倍CPU 个NioEventLoop 。

    简单理解 NioEventLoop 由持有的线程、Selector 、任务队列组成。这个线程负责轮询 IO 事件, 处理 IO 事件, 执行 taskQueue 里面的任务等,当多个客户端连接到服务器时, 每个 NioEventLoop 会被分配到多个客户端连接, 通过 Selector 轮询IO 事件, 实现多路复用。

  • 创建 Selector

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                   SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                   EventLoopTaskQueueFactory queueFactory) {
          super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                  rejectedExecutionHandler);
          this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
          this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
          // 创建 Selector
          final SelectorTuple selectorTuple = openSelector();
          this.selector = selectorTuple.selector;
          this.unwrappedSelector = selectorTuple.unwrappedSelector;
      }
    

    继续跟进 NioEventLoop 的构造方法,其调用 openSelector() 创建了 Selector 。这也与我们最上面的启动过程那张图保持一致。

2、调用 bind() 方法启动服务端

14 行到 31 行执行一系列的配置,到第 34 行调用了 bind() 方法启动服务,那么在 bind() 方法执行的过程中,都做了什么呢?我们继续跟进:

调用了 AbstractBootstrapbind() 方法:

public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }

首先是 group 和 channelFactory 的非空校验,然后调用 doBind()

private ChannelFuture doBind(final SocketAddress localAddress) {
        // 创建并初始化 Channel,然后将 channel 注册到 NioEventLoop 持有的 Selector 上面
        final ChannelFuture regFuture = initAndRegister();
        // 返回的 Future 对象,其上面的执行是一个异步过程,获取其持有的 Channel
        final Channel channel = regFuture.channel();
        // 操作失败,直接返回
        if (regFuture.cause() != null) {
            return regFuture;
        }
        // 前面说过 Future 是异步的,因为注册的操作交给了 NioEventLoop 去执行了
        if (regFuture.isDone()) { // 完成
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            // 执行完成后,调用 bind0 方法
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // 如果没有完成,则将 bind0 方法放到 Future 的 listener 中,等待其完成再执行
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
  • initAndRegister() 方法
    跟随执行过程,我们首先来看这个方法:

    final ChannelFuture initAndRegister() {
          Channel channel = null;
          try {
              // 根据前面配置的 Channel 类型创建(泛型 + 反射)
              channel = channelFactory.newChannel();
              // 初始化
              init(channel);
          } catch (Throwable t) {
              if (channel != null) {
                  // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                  channel.unsafe().closeForcibly();
                  // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                  return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
              }
              // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
              return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
          }
          // 配置 group ,注册 Channel 到 Selector
          ChannelFuture regFuture = config().group().register(channel);
          if (regFuture.cause() != null) {
              if (channel.isRegistered()) {
                  channel.close();
              } else {
                  channel.unsafe().closeForcibly();
              }
          }
    
          // If we are here and the promise is not failed, it's one of the following cases:
          // 1) If we attempted registration from the event loop, the registration has been completed at this point.
          //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
          // 2) If we attempted registration from the other thread, the registration request has been successfully
          //    added to the event loop's task queue for later execution.
          //    i.e. It's safe to attempt bind() or connect() now:
          //         because bind() or connect() will be executed *after* the scheduled registration task is executed
          //         because register(), bind(), and connect() are all bound to the same thread.
    
          return regFuture;
      }
    

    如方法的命名一样,首先是创建 Channel 并初始化,然后配置 group 并将 Channel 注册到 NioEventLoop 持有的 Selector 中。其调用了AbstractChannelregister() 方法:

    @Override
          public final void register(EventLoop eventLoop, final ChannelPromise promise) {
              ObjectUtil.checkNotNull(eventLoop, "eventLoop");
              if (isRegistered()) {
                  promise.setFailure(new IllegalStateException("registered to an event loop already"));
                  return;
              }
              if (!isCompatible(eventLoop)) {
                  promise.setFailure(
                          new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                  return;
              }
    
              AbstractChannel.this.eventLoop = eventLoop;
              // 判断当前线程是否是 NioEventLoop 里面的线程
              if (eventLoop.inEventLoop()) {
                  register0(promise);
              } else {
                  try {
                      // 将注册过程封装为 task 交给  NioEventLoop 执行
                      eventLoop.execute(new Runnable() {
                          @Override
                          public void run() {
                              // 调用 JDK NIO 将 Channel 注册给 Selector
                              register0(promise);
                          }
                      });
                  } catch (Throwable t) {
                      logger.warn(
                              "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                              AbstractChannel.this, t);
                      closeForcibly();
                      closeFuture.setClosed();
                      safeSetFailure(promise, t);
                  }
              }
          }
    

    其重要逻辑是判断当前线程是否是 NioEventLoop 的自己的线程,如果是调用 JDK 的方法注册,否则将注册方法包装为一个任务交给 NioEventLoop 去执行。

到这里,Channel 的创建初始化和注册过程就分析完了,也就完成了上述过程图的 2-5 步。这时回到 AbstractBootstrap#bind() 方法,就会调用 doBind0()

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

可以看到绑定地址端口的逻辑也交给了 NioEventLoop 去执行,调用 channel.bind() 方法:

@Overrid
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
}

Netty 的 pipline 是一个串行化的操作,里面有各种各样的 Handler ,这里不做过多深入。下一步执行到 AbstractChannel#bind() 方法,其中有 doBind() 方法:

@Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();

            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            // See: https://github.com/netty/netty/issues/576
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
                // Warn a user about the fact that a non-root user can't receive a
                // broadcast packet on *nix if the socket is bound on non-wildcard address.
                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }

            boolean wasActive = isActive();
            try {
		// 执行 bind 逻辑
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }
            // 绑定后,由 非 active 状态 -> active, 继续执行后面的逻辑
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }

            safeSetSuccess(promise);
        }

这里我们看 NioServerSocketChannel 的实现 😄 :

@SuppressJava6Requirement(reason = "Usage guarded by java version check")
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

显而易见,是对不同版本的 JDK bind() 的调用,这里 Server 就启动起来,如过程图的步骤 6

这时执行到上面的 bind() 方法,有一个 Channel 的状态转变,从非 active 到 active 状态,这里跟随调试,执行到 Pipeline 的 channelActive() 方法:

@Override
public void channelActive(ChannelHandlerContext ctx) {
     ctx.fireChannelActive();
      // 注册读事件:包括创建连接和读数据(Accept/Read)
      readIfIsAutoRead();
}

第一行就是 pipeline 继续往下传,第二行中就是服务启动的最后一步注册 OP_ACCEPT 事件到 Selector :

private void readIfIsAutoRead() {
     if (channel.config().isAutoRead()) {
           channel.read();
     }
}

这里再度来到 DefaultChannelPipeline.read()

@Override
public void read(ChannelHandlerContext ctx) {
     unsafe.beginRead();
}

这个 unsafe 是 Channel 中的接口,来到其实现 AbstractChannel

@Override
public final void beginRead() {
            assertEventLoop();

            if (!isActive()) {
                return;
            }

            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }

doBeginRead() 的实现 AbstractNioChannel

@Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;
        // 获取监听的事件
        final int interestOps = selectionKey.interestOps();
        // 如果没有监听读事件
        if ((interestOps & readInterestOp) == 0) {
            // 注册接收连接事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

如此,就完成了服务端启动的全流程:Selector 的创建 -> ServerSocketChannel 创建并初始化 -> NioEventLoop 的创建 -> 注册 Channel 到 Selector -> 绑定地址端口启动 -> 注册接收连接事件 。

这样,服务端就做好了接收连接的准备工作 。

❤ 转载请注明本文地址或来源,谢谢合作 ❤

wx.png


标题:Netty 系列笔记之源码分析:启动服务
作者:wangning1018
地址:https://aysaml.com/articles/2020/11/05/1604559009881.html