码恋 码恋

ALL YOUR SMILES, ALL MY LIFE.

目录
Netty 系列笔记之 keepalive 与 idle 监测
/  

Netty 系列笔记之 keepalive 与 idle 监测

一、开篇

在网络编程中,我们经常听到 keepalive 这个词。keepalive 是什么?为什么需要 keepalive ?idle 监测是什么?网络编程中到底怎么用?

本篇,我们就探讨 keepalive 与 idle 监测,以及 Netty 如何支持它们。

二、 keepalive

1、为什么需要 keepalive

以打电话为例,对方和你说这说着突然不讲话了。大多数情况下你不会你只等下去,你会和对方确认下:“你还在吗?”,如果对方没有回答,就挂掉电话。这套机制就是 keepalive 。

同样类比到服务器应用,服务器需要 keepalive 确保这条连接是正常的,才能够正常通信。

2、如何设计 keepalive ?(以 TCP 为例)

TCP keepalive 有三个核心参数:

  • net.ipv4.tcp_keepalive_time = 7200
  • net.ipv4.tcp_keepalive_intvl = 75
  • net.ipv4.tcp_keepalive_probes = 9

当启用 keepalive (默认关闭)时,TCP 连接 7200 秒后没有数据通过发送 keepalive 消息,当探测没有确认时,按照 75 秒的间隔发送重试,一共发送 9 次都没有收到确认,则认为此连接失效。

计算总耗时为 2 小时 11 分钟 (7200 + 75 * 9)。

3、为什么应用层还需要 keepalive ?

在开发应用程序时,常遇到需要添加 “心跳” 的需求,既然 TCP 已经有 keepalive 了,为什么还需要应用层添加逻辑呢?原因如下:

  • 协议分层,关注点不同。传输层关注连接是否通,而应用层关注服务是否可用。比如打电话,电话可以打通,但是不代表有人接听。服务器连接在,不一定提供服务。
  • TCP 的 keepalive 默认关闭,且经过路由器等中转设备时 keepalive 包可能被丢弃。
  • TCP 的 keepalive 时间太长,虽然可以改,但是不建议。一旦改动,会影响所有应用。

❤ 注意:在 HTTP 连接中,我们常常看到 keep-alive ,说的不是一回事。其代表长连接(keep-alive)和短链接(close)。

Content-Type: application/json; charset=utf-8
Transfer-Encoding: chunked
Connection: keep-alive

三、idle 监测

继续以打电话为例,对方说着说着没声音了,一般来说你会等待一段时间,在这个时间内看对方还说不说话(idle 监测),如果还不说话,就认为对方存在问题(idle),于是开始询问对方:“你还在吗?”(keepalive),或者直接挂掉电话(关闭连接)。

❀ idle 监测的功能

idle 监测只是负责诊断,诊断后作出不同的行为,决定 idle 监测的最终用途。

  • 发送 keepalive :一般来配合 keepalive ,来减少 keepalive 消息。

    • keepalive 设计演进:
      v1 :定时 keepalive 消息 -> v2 :没有数据传输时进行空闲检测,判定为 idle 时才发送 keepalive 消息。
  • 直接关闭连接:快速释放损坏的、很久不用的、恶意的连接,保持系统健康。

实际应用:结合起来使用,按需 keepalive ,保证不会空闲,如果空闲,关闭连接。

四、Netty 与 keepalive & idle

1、Netty 中开启 keepalive 和 idle

  • 开启 keepalive
    两种方式:

    • bootstrap.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
    • bootstrap.childOption(NioChannelOption.of(StandardSocketOptions.SO_KEEPALIVE), Boolean.TRUE);
  • 开启 idle 监测
    socketChannel.addLast("idleCheckHandler", new IdleStateHandler(0, 20, 0, TimeUnit.SECONDS));

如果参数为负数,则被设置为 0 :

public IdleStateHandler(boolean observeOutput,
            long readerIdleTime, long writerIdleTime, long allIdleTime,
            TimeUnit unit) {
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        this.observeOutput = observeOutput;

        if (readerIdleTime <= 0) {
            readerIdleTimeNanos = 0;
        } else {
            readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
        }
        if (writerIdleTime <= 0) {
            writerIdleTimeNanos = 0;
        } else {
            writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
        }
        if (allIdleTime <= 0) {
            allIdleTimeNanos = 0;
        } else {
            allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
        }
    }

2、源码分析

关于 keepalive 的设置比较简单,这里我们就不说了,主要看下 idle 的源码实现,关于 idle 的代码在 io.netty.handler.timeout 包下:

image.png

我们逐个来阅读:

(1) IdleState
public enum IdleState {
  /**
   * No data was received for a while.
   */
  READER_IDLE,
  /**
   * No data was sent for a while.
   */
  WRITER_IDLE,
  /**
   * No data was either received or sent for a while.
   */
  ALL_IDLE
}

idle 状态的枚举,包含三种情况

  • 读空闲 ,一段时间内没有数据被接收。
  • 写空闲 ,一段时间内没有数据被发送。
  • 写 任一空闲。
(2) IdleStateEvent
public class IdleStateEvent {
  public static final IdleStateEvent FIRST_READER_IDLE_STATE_EVENT =
          new DefaultIdleStateEvent(IdleState.READER_IDLE, true);
  public static final IdleStateEvent READER_IDLE_STATE_EVENT =
          new DefaultIdleStateEvent(IdleState.READER_IDLE, false);
  public static final IdleStateEvent FIRST_WRITER_IDLE_STATE_EVENT =
          new DefaultIdleStateEvent(IdleState.WRITER_IDLE, true);
  public static final IdleStateEvent WRITER_IDLE_STATE_EVENT =
          new DefaultIdleStateEvent(IdleState.WRITER_IDLE, false);
  public static final IdleStateEvent FIRST_ALL_IDLE_STATE_EVENT =
          new DefaultIdleStateEvent(IdleState.ALL_IDLE, true);
  public static final IdleStateEvent ALL_IDLE_STATE_EVENT =
          new DefaultIdleStateEvent(IdleState.ALL_IDLE, false);

  private final IdleState state;
  private final boolean first;

... 省略部分代码
}

这个类根据上面的三种状态定义了发生 idle 时触发的 6 个事件,为什么是 6 个不是 3 个呢?对于第一次发生 idle ,抽出了第一次发生的事件,这样就能只对第一次发生 idle 时做处理,后面就可以忽略。

(3) IdleStateHandler
  • IdleStateHandler extends ChannelDuplexHandler
    继承自 ChannelDuplexHandler ,通过重写其 channelRead、write 等方法可以获得 channel 的读写状态并控制 IdleStateHandler 的生命周期。当读或写空闲时,将会触发 IdleStateEvent 事件。

  • 构造方法

    public IdleStateHandler(
            int readerIdleTimeSeconds,
            int writerIdleTimeSeconds,
            int allIdleTimeSeconds) {
    
        this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
             TimeUnit.SECONDS);
    }
    

    构造方法就是我们之前看到的设置开启 idle 的代码,其中三个参数分别对应了 idle 的三种状态。

  • 属性

    /**
       * 最小超时时间
       */
      private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
      /**
       * 写入监听器
       */
      // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
      private final ChannelFutureListener writeListener = new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {
              lastWriteTime = ticksInNanos();
              firstWriterIdleEvent = firstAllIdleEvent = true;
          }
      };
      /**
       * 是否观察 {@link ChannelOutboundBuffer} 写入队列
       */
      private final boolean observeOutput;
      /**
       * 读空闲时间
       */
      private final long readerIdleTimeNanos;
      /**
       * 写空闲时间
       */
      private final long writerIdleTimeNanos;
      /**
       * 读写任一空闲时间
       */
      private final long allIdleTimeNanos;
      /**
       * 读空闲的定时检测任务
       */
      private ScheduledFuture<?> readerIdleTimeout;
      /**
       * 最后读时间
       */
      private long lastReadTime;
      /**
       * 是否首次读空闲
       */
      private boolean firstReaderIdleEvent = true;
      /**
       * 写空闲的定时检测任务
       */
      private ScheduledFuture<?> writerIdleTimeout;
      /**
       * 最后写时间
       */
      private long lastWriteTime;
      /**
       * 是否第一次写空闲
       */
      private boolean firstWriterIdleEvent = true;
      /**
       * 读写任一空闲时间
       */
      private ScheduledFuture<?> allIdleTimeout;
      /**
       * 是否首次读写任一空闲
       */
      private boolean firstAllIdleEvent = true;
      /**
       * handler 状态
       * 0:未初始化
       * 1:初始化完毕
       * 2:销毁
       */
      private byte state; // 0 - none, 1 - initialized, 2 - destroyed
      /**
       * 是否正在读取
       */
      private boolean reading;
      /**
       * 最后检测到 {@link ChannelOutboundBuffer} 发生变化的时间
       */
      private long lastChangeCheckTimeStamp;
      /**
       * 上一次准备 flash 到对端的消息( {@link ChannelOutboundBuffer#current()} )的 HashCode
       */
      private int lastMessageHashCode;
      /**
       * 上一次等待 flush 到对端的内存大小( {@link ChannelOutboundBuffer#totalPendingWriteBytes()} )
       */
      private long lastPendingWriteBytes;
      /**
       * 上一次 flush 到对端的进度
       */
      private long lastFlushProgress;
    
    • observeOutput 这个属性是标识写空闲的判断依据是否是写成功。两种写不成功的情况:1)写了,但是缓冲区满了,写不出去;2)写了一个特别大的文件,是在写,但是还没写成功。上述两种情况确实是在写,但是不代表写空闲。所以可以调整这个参数按照实际需要决定写空闲的判断依据。

    其中涉及到的一些不清楚的地方,我们在后面分析的时候会有提及。

  • initialize(ChannelHandlerContext ctx) 方法

    private void initialize(ChannelHandlerContext ctx) {
          // Avoid the case where destroy() is called before scheduling timeouts.
          // See: https://github.com/netty/netty/issues/143
          // 根据状态判断是否执行,保证 destory() 方法在初始化后执行
          switch (state) {
              case 1:
              case 2:
                  return;
          }
          // 设置状态为初始化完毕
          state = 1;
          // ChannelOutboundBuffer 相关属性初始化
          initOutputChanged(ctx);
          // 最后读写时间初始化
          lastReadTime = lastWriteTime = ticksInNanos();
          // 初始化三种事件的定时任务
          if (readerIdleTimeNanos > 0) {
              readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                      readerIdleTimeNanos, TimeUnit.NANOSECONDS);
          }
          if (writerIdleTimeNanos > 0) {
              writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                      writerIdleTimeNanos, TimeUnit.NANOSECONDS);
          }
          if (allIdleTimeNanos > 0) {
              allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                      allIdleTimeNanos, TimeUnit.NANOSECONDS);
          }
      }
    
  • ReaderIdleTimeoutTask
    我们知道判断读写空闲是使用定时任务的方式进行的,看下读空闲定时任务。

    private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
    
          ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
              super(ctx);
          }
    
          @Override
          protected void run(ChannelHandlerContext ctx) {
              // 下一次检测的定时任务的时间
              long nextDelay = readerIdleTimeNanos;
              // 不处于正在读取状态
              if (!reading) {
                  // 配置的空闲时间 - 实际空闲时间
                  nextDelay -= ticksInNanos() - lastReadTime;
              }
              // 小于 0 说明读空闲,也就是说空闲时间大于配置的读空闲时间
              if (nextDelay <= 0) {
                  // 重新设置定时任务的空闲时间
                  // Reader is idle - set a new timeout and notify the callback.
                  readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
                  // 标志是否是首次空闲
                  boolean first = firstReaderIdleEvent;
                  firstReaderIdleEvent = false;
    
                  try {
                      // 创建读空闲事件
                      IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                      // 通知通道空闲事件
                      channelIdle(ctx, event);
                  } catch (Throwable t) {
                      ctx.fireExceptionCaught(t);
                  }
              } else {
                  // 空闲时间设置为 nextDelay , 按照最后一次读的时间作为开始计数
                  // Read occurred before the timeout - set a new timeout with shorter delay.
                  readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
              }
          }
      }
    
  • WriterIdleTimeoutTask
    检测写空闲的定时任务,注意写空闲如何定义,hasOutputChanged 方法用于判断写的过程是否发生变化,如果发生变化就认为此时没有发生写空闲而 return 。

    private final class WriterIdleTimeoutTask extends AbstractIdleTask {
    
          WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
              super(ctx);
          }
    
          @Override
          protected void run(ChannelHandlerContext ctx) {
              // 最后写数据的时间
              long lastWriteTime = IdleStateHandler.this.lastWriteTime;
              // 实际写空闲时间与配置的写空闲时间的差值
              long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
              // 小于 0 说明写空闲
              if (nextDelay <= 0) {
                  // 重新设置定时任务的空闲时间
                  // Writer is idle - set a new timeout and notify the callback.
                  writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
                  // 标志是否首次空闲
                  boolean first = firstWriterIdleEvent;
                  firstWriterIdleEvent = false;
    
                  try {
                      // 判断 ChannelOutboundBuffer 是否发生变化
                      if (hasOutputChanged(ctx, first)) {
                          return;
                      }
                      // 创建写空闲事件
                      IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
                      // 通知通道空闲事件
                      channelIdle(ctx, event);
                  } catch (Throwable t) {
                      ctx.fireExceptionCaught(t);
                  }
              } else {
                  // Write occurred before the timeout - set a new timeout with shorter delay.
                  writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
              }
          }
      }
    
  • hasOutputChanged 方法

    其中的 observeOutput 属性在前面已经简单说过了,该方法用于检测 ChannelOutboundBuffer 队列是否发生变化,用来控制是否是写空闲的依据。

    private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
          if (observeOutput) {
              // 一般情况下,false , 即写空闲判断是否写成功。
              // 实际情况下:
              // 1) 写了,但是缓存区满了,写不出去;2)写了一个大数据,正在写,但是没有写完成。
              // 上述这两种情况,确实在写,但是还没有写成功。
              // 所以这个参数是判断 "是否在写" , 而不是 "是否写成功" 。
              // We can take this shortcut if the ChannelPromises that got passed into write()
              // appear to complete. It indicates "change" on message level and we simply assume
              // that there's change happening on byte level. If the user doesn't observe channel
              // writability events then they'll eventually OOME and there's clearly a different
              // problem and idleness is least of their concerns.
              // 如果 lastChangeCheckTimeStamp != lastWriteTime 说明进行过写操作,更新 lastChangeCheckTimeStamp
              if (lastChangeCheckTimeStamp != lastWriteTime) {
                  lastChangeCheckTimeStamp = lastWriteTime;
                  // 不是第一次
                  // But this applies only if it's the non-first call.
                  if (!first) {
                      return true;
                  }
              }
    
              Channel channel = ctx.channel();
              Unsafe unsafe = channel.unsafe();
              ChannelOutboundBuffer buf = unsafe.outboundBuffer();
    
              if (buf != null) {
                  int messageHashCode = System.identityHashCode(buf.current());
                  long pendingWriteBytes = buf.totalPendingWriteBytes();
                  // 此时的数据哈希与上一次的不同 || 正在等待发送的数据字节数不等于上次的字节数,说明发生了变化
                  if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
                      lastMessageHashCode = messageHashCode;
                      lastPendingWriteBytes = pendingWriteBytes;
    
                      if (!first) {
                          return true;
                      }
                  }
                  // flush 的进度发生了变化
                  long flushProgress = buf.currentProgress();
                  if (flushProgress != lastFlushProgress) {
                      lastFlushProgress = flushProgress;
    
                      if (!first) {
                          return true;
                      }
                  }
              }
          }
    
          return false;
      }
    
(4)ReadTimeoutHandler

前面说的 IdleStateHandler 作用是判断空闲,并创建了空闲事件,具体的处理逻辑还需要用户自行处理。ReadTimeoutHandler 做了一个简单的处理,就是发生读空闲事件时,就抛出异常。

@Override
    protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        assert evt.state() == IdleState.READER_IDLE;
        readTimedOut(ctx);
    }

    /**
     * Is called when a read timeout was detected.
     */
    protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!closed) {
            // 抛出异常
            ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
            ctx.close();
            closed = true;
        }
    }
(5) WriteTimeoutHandler

值得注意的是,它什么时候回抛出异常。在类的开头就有一句 :

Raises a {@link WriteTimeoutException} when a write operation cannot finish in a certain period of time.

当一个写操作在一段时间没有完成的时候抛出 WriteTimeoutException

所以实际上,这是对写操作在一段时间没有完成的时候做的处理。

@Override
        public void run() {
            // 操作是否完成
            // Was not written yet so issue a write timeout
            // The promise itself will be failed with a ClosedChannelException once the close() was issued
            // See https://github.com/netty/netty/issues/2159
            if (!promise.isDone()) {
                try {
                    // 抛出异常
                    writeTimedOut(ctx);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            }
            removeWriteTimeoutTask(this);
        }


❤ 转载请注明本文地址或来源,谢谢合作 ❤


center