在网络编程中,我们经常听到 keepalive 这个词。keepalive 是什么?为什么需要 keepalive ?idle 监测是什么?网络编程中到底怎么用?
本篇,我们就探讨 keepalive 与 idle 监测,以及 Netty 如何支持它们。
以打电话为例,对方和你说这说着突然不讲话了。大多数情况下你不会你只等下去,你会和对方确认下:“你还在吗?”,如果对方没有回答,就挂掉电话。这套机制就是 keepalive 。
同样类比到服务器应用,服务器需要 keepalive 确保这条连接是正常的,才能够正常通信。
TCP keepalive 有三个核心参数:
当启用 keepalive (默认关闭)时,TCP 连接 7200 秒后没有数据通过发送 keepalive 消息,当探测没有确认时,按照 75 秒的间隔发送重试,一共发送 9 次都没有收到确认,则认为此连接失效。
计算总耗时为 2 小时 11 分钟 (7200 + 75 * 9)。
在开发应用程序时,常遇到需要添加 “心跳” 的需求,既然 TCP 已经有 keepalive 了,为什么还需要应用层添加逻辑呢?原因如下:
❤ 注意:在 HTTP 连接中,我们常常看到 keep-alive ,说的不是一回事。其代表长连接(keep-alive)和短链接(close)。
Content-Type: application/json; charset=utf-8
Transfer-Encoding: chunked
Connection: keep-alive
继续以打电话为例,对方说着说着没声音了,一般来说你会等待一段时间,在这个时间内看对方还说不说话(idle 监测),如果还不说话,就认为对方存在问题(idle),于是开始询问对方:“你还在吗?”(keepalive),或者直接挂掉电话(关闭连接)。
idle 监测只是负责诊断,诊断后作出不同的行为,决定 idle 监测的最终用途。
发送 keepalive :一般来配合 keepalive ,来减少 keepalive 消息。
直接关闭连接:快速释放损坏的、很久不用的、恶意的连接,保持系统健康。
实际应用:结合起来使用,按需 keepalive ,保证不会空闲,如果空闲,关闭连接。
开启 keepalive
两种方式:
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
bootstrap.childOption(NioChannelOption.of(StandardSocketOptions.SO_KEEPALIVE), Boolean.TRUE);
开启 idle 监测
socketChannel.addLast("idleCheckHandler", new IdleStateHandler(0, 20, 0, TimeUnit.SECONDS));
如果参数为负数,则被设置为 0 :
public IdleStateHandler(boolean observeOutput,
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("unit");
}
this.observeOutput = observeOutput;
if (readerIdleTime <= 0) {
readerIdleTimeNanos = 0;
} else {
readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
}
if (writerIdleTime <= 0) {
writerIdleTimeNanos = 0;
} else {
writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
}
if (allIdleTime <= 0) {
allIdleTimeNanos = 0;
} else {
allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
}
}
关于 keepalive 的设置比较简单,这里我们就不说了,主要看下 idle 的源码实现,关于 idle 的代码在 io.netty.handler.timeout
包下:
我们逐个来阅读:
IdleState
public enum IdleState {
/**
* No data was received for a while.
*/
READER_IDLE,
/**
* No data was sent for a while.
*/
WRITER_IDLE,
/**
* No data was either received or sent for a while.
*/
ALL_IDLE
}
idle 状态的枚举,包含三种情况
IdleStateEvent
public class IdleStateEvent {
public static final IdleStateEvent FIRST_READER_IDLE_STATE_EVENT =
new DefaultIdleStateEvent(IdleState.READER_IDLE, true);
public static final IdleStateEvent READER_IDLE_STATE_EVENT =
new DefaultIdleStateEvent(IdleState.READER_IDLE, false);
public static final IdleStateEvent FIRST_WRITER_IDLE_STATE_EVENT =
new DefaultIdleStateEvent(IdleState.WRITER_IDLE, true);
public static final IdleStateEvent WRITER_IDLE_STATE_EVENT =
new DefaultIdleStateEvent(IdleState.WRITER_IDLE, false);
public static final IdleStateEvent FIRST_ALL_IDLE_STATE_EVENT =
new DefaultIdleStateEvent(IdleState.ALL_IDLE, true);
public static final IdleStateEvent ALL_IDLE_STATE_EVENT =
new DefaultIdleStateEvent(IdleState.ALL_IDLE, false);
private final IdleState state;
private final boolean first;
... 省略部分代码
}
这个类根据上面的三种状态定义了发生 idle 时触发的 6 个事件,为什么是 6 个不是 3 个呢?对于第一次发生 idle ,抽出了第一次发生的事件,这样就能只对第一次发生 idle 时做处理,后面就可以忽略。
IdleStateHandler
IdleStateHandler extends ChannelDuplexHandler
继承自 ChannelDuplexHandler ,通过重写其 channelRead、write 等方法可以获得 channel 的读写状态并控制 IdleStateHandler 的生命周期。当读或写空闲时,将会触发 IdleStateEvent
事件。
构造方法
public IdleStateHandler(
int readerIdleTimeSeconds,
int writerIdleTimeSeconds,
int allIdleTimeSeconds) {
this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
TimeUnit.SECONDS);
}
构造方法就是我们之前看到的设置开启 idle 的代码,其中三个参数分别对应了 idle 的三种状态。
属性
/**
* 最小超时时间
*/
private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
/**
* 写入监听器
*/
// Not create a new ChannelFutureListener per write operation to reduce GC pressure.
private final ChannelFutureListener writeListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
lastWriteTime = ticksInNanos();
firstWriterIdleEvent = firstAllIdleEvent = true;
}
};
/**
* 是否观察 {@link ChannelOutboundBuffer} 写入队列
*/
private final boolean observeOutput;
/**
* 读空闲时间
*/
private final long readerIdleTimeNanos;
/**
* 写空闲时间
*/
private final long writerIdleTimeNanos;
/**
* 读写任一空闲时间
*/
private final long allIdleTimeNanos;
/**
* 读空闲的定时检测任务
*/
private ScheduledFuture<?> readerIdleTimeout;
/**
* 最后读时间
*/
private long lastReadTime;
/**
* 是否首次读空闲
*/
private boolean firstReaderIdleEvent = true;
/**
* 写空闲的定时检测任务
*/
private ScheduledFuture<?> writerIdleTimeout;
/**
* 最后写时间
*/
private long lastWriteTime;
/**
* 是否第一次写空闲
*/
private boolean firstWriterIdleEvent = true;
/**
* 读写任一空闲时间
*/
private ScheduledFuture<?> allIdleTimeout;
/**
* 是否首次读写任一空闲
*/
private boolean firstAllIdleEvent = true;
/**
* handler 状态
* 0:未初始化
* 1:初始化完毕
* 2:销毁
*/
private byte state; // 0 - none, 1 - initialized, 2 - destroyed
/**
* 是否正在读取
*/
private boolean reading;
/**
* 最后检测到 {@link ChannelOutboundBuffer} 发生变化的时间
*/
private long lastChangeCheckTimeStamp;
/**
* 上一次准备 flash 到对端的消息( {@link ChannelOutboundBuffer#current()} )的 HashCode
*/
private int lastMessageHashCode;
/**
* 上一次等待 flush 到对端的内存大小( {@link ChannelOutboundBuffer#totalPendingWriteBytes()} )
*/
private long lastPendingWriteBytes;
/**
* 上一次 flush 到对端的进度
*/
private long lastFlushProgress;
其中涉及到的一些不清楚的地方,我们在后面分析的时候会有提及。
initialize(ChannelHandlerContext ctx)
方法
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
// 根据状态判断是否执行,保证 destory() 方法在初始化后执行
switch (state) {
case 1:
case 2:
return;
}
// 设置状态为初始化完毕
state = 1;
// ChannelOutboundBuffer 相关属性初始化
initOutputChanged(ctx);
// 最后读写时间初始化
lastReadTime = lastWriteTime = ticksInNanos();
// 初始化三种事件的定时任务
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
ReaderIdleTimeoutTask
我们知道判断读写空闲是使用定时任务的方式进行的,看下读空闲定时任务。
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
// 下一次检测的定时任务的时间
long nextDelay = readerIdleTimeNanos;
// 不处于正在读取状态
if (!reading) {
// 配置的空闲时间 - 实际空闲时间
nextDelay -= ticksInNanos() - lastReadTime;
}
// 小于 0 说明读空闲,也就是说空闲时间大于配置的读空闲时间
if (nextDelay <= 0) {
// 重新设置定时任务的空闲时间
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
// 标志是否是首次空闲
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
// 创建读空闲事件
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
// 通知通道空闲事件
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// 空闲时间设置为 nextDelay , 按照最后一次读的时间作为开始计数
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
WriterIdleTimeoutTask
检测写空闲的定时任务,注意写空闲如何定义,hasOutputChanged
方法用于判断写的过程是否发生变化,如果发生变化就认为此时没有发生写空闲而 return 。
private final class WriterIdleTimeoutTask extends AbstractIdleTask {
WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
// 最后写数据的时间
long lastWriteTime = IdleStateHandler.this.lastWriteTime;
// 实际写空闲时间与配置的写空闲时间的差值
long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
// 小于 0 说明写空闲
if (nextDelay <= 0) {
// 重新设置定时任务的空闲时间
// Writer is idle - set a new timeout and notify the callback.
writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
// 标志是否首次空闲
boolean first = firstWriterIdleEvent;
firstWriterIdleEvent = false;
try {
// 判断 ChannelOutboundBuffer 是否发生变化
if (hasOutputChanged(ctx, first)) {
return;
}
// 创建写空闲事件
IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
// 通知通道空闲事件
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Write occurred before the timeout - set a new timeout with shorter delay.
writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
hasOutputChanged
方法
其中的 observeOutput 属性在前面已经简单说过了,该方法用于检测 ChannelOutboundBuffer 队列是否发生变化,用来控制是否是写空闲的依据。
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
if (observeOutput) {
// 一般情况下,false , 即写空闲判断是否写成功。
// 实际情况下:
// 1) 写了,但是缓存区满了,写不出去;2)写了一个大数据,正在写,但是没有写完成。
// 上述这两种情况,确实在写,但是还没有写成功。
// 所以这个参数是判断 "是否在写" , 而不是 "是否写成功" 。
// We can take this shortcut if the ChannelPromises that got passed into write()
// appear to complete. It indicates "change" on message level and we simply assume
// that there's change happening on byte level. If the user doesn't observe channel
// writability events then they'll eventually OOME and there's clearly a different
// problem and idleness is least of their concerns.
// 如果 lastChangeCheckTimeStamp != lastWriteTime 说明进行过写操作,更新 lastChangeCheckTimeStamp
if (lastChangeCheckTimeStamp != lastWriteTime) {
lastChangeCheckTimeStamp = lastWriteTime;
// 不是第一次
// But this applies only if it's the non-first call.
if (!first) {
return true;
}
}
Channel channel = ctx.channel();
Unsafe unsafe = channel.unsafe();
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
if (buf != null) {
int messageHashCode = System.identityHashCode(buf.current());
long pendingWriteBytes = buf.totalPendingWriteBytes();
// 此时的数据哈希与上一次的不同 || 正在等待发送的数据字节数不等于上次的字节数,说明发生了变化
if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
lastMessageHashCode = messageHashCode;
lastPendingWriteBytes = pendingWriteBytes;
if (!first) {
return true;
}
}
// flush 的进度发生了变化
long flushProgress = buf.currentProgress();
if (flushProgress != lastFlushProgress) {
lastFlushProgress = flushProgress;
if (!first) {
return true;
}
}
}
}
return false;
}
ReadTimeoutHandler
前面说的 IdleStateHandler
作用是判断空闲,并创建了空闲事件,具体的处理逻辑还需要用户自行处理。ReadTimeoutHandler
做了一个简单的处理,就是发生读空闲事件时,就抛出异常。
@Override
protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
assert evt.state() == IdleState.READER_IDLE;
readTimedOut(ctx);
}
/**
* Is called when a read timeout was detected.
*/
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
if (!closed) {
// 抛出异常
ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
ctx.close();
closed = true;
}
}
WriteTimeoutHandler
值得注意的是,它什么时候回抛出异常。在类的开头就有一句 :
Raises a {@link WriteTimeoutException} when a write operation cannot finish in a certain period of time.
当一个写操作在一段时间没有完成的时候抛出 WriteTimeoutException 。
所以实际上,这是对写操作在一段时间没有完成的时候做的处理。
@Override
public void run() {
// 操作是否完成
// Was not written yet so issue a write timeout
// The promise itself will be failed with a ClosedChannelException once the close() was issued
// See https://github.com/netty/netty/issues/2159
if (!promise.isDone()) {
try {
// 抛出异常
writeTimedOut(ctx);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
}
removeWriteTimeoutTask(this);
}