一、引言
系统在连接准备好之后,就是做数据的交互了。所以本篇分析接收数据相关的代码。
二、读数据过程一览
读数据的过程在 Work Thread 中:
- Select 接收到 OP_READ 事件。
- 处理 OP_READ 事件:
- 分配一个初始容量为 1024 大小的 ByteBufer 接收数据。
- 从 Channel 读数据到 ByteBuffer.
- 记录实际接收数据的大小,调整下次分配 ByteBuffer 的大小。
- 触发
pipline.fireChannelRead(byteBuf)
将读到的数据传播出去。 - 判断接收数据的 ByteBuffer 是否读满:是,尝试继续读取知道没有数据或达到限定次数 16 次;否,结束本轮读取,等待下一个 OP_READ 事件。
三、Netty 读数据过程源码分析
1、接收读(OP_READ)事件
服务端在接收到读事件后,由 NioEventLoop 负责处理,其入口与接收连接相同,都是始于 Selector 轮询事件,然后调用 NioEventLoop#processSelectedKeys()
方法处理。
2、处理读事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
最终进入到上面的代码片段,继续查看 read() 方法的一个实现:AbstractNioByteChannel#read()
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// 获得 ByteBuf 的分配器
final ByteBufAllocator allocator = config.getAllocator();
// Handle 用于决定下次需要分配多大的 ByteBuf
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 分配 ByteBuf, 调用 guess() 方法,初始容量 1024
byteBuf = allocHandle.allocate(allocator);
// 向 ByteBuf 中装入数据,本质就是调用 channel.read(buf) 读数据
// 读完数据后记录下读了多少数据,如果读满了,则下次就扩容
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
// 记录读的次数
allocHandle.incMessagesRead(1);
readPending = false;
// pipline 上执行相应的业务逻辑
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
// 是否继续读,逻辑是:1. 是否上次读满,如果没读满就代表已经读完了,不需要继续了
// 2. 是否达到最大的读取次数: 16 次 并且读到的数据 > 0,即本次读到了数据
} while (allocHandle.continueReading());
// 记录本次读的数据大小,用来计算下次分配 ByteBuf 的大小
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
在着手处理读事件的时候,首先需要分配一个 ByteBuf ,Netty 通过 AdaptiveRecvByteBufAllocator
来计算每次需要分配的大小,初始化容量是 1024 。ByteBuf 分配之后,向其中读入数据,并记录本次读取数据的大小,用来计算下次需要分配的容量。
Netty 的这种设计可以实现自适应的 ByteBuf 分配,可以节省空间。下面看下 AdaptiveRecvByteBufAllocator 的实现:
主要逻辑是计算需要分配的 ByteBuf 大小,入口是在本次读数据结束后调用 readComplete()
方法:
@Override
public void readComplete() {
record(totalBytesRead());
}
record() 方法:
private void record(int actualReadBytes) {
// 尝试是否可以减小分配的空间来满足需求:当前实际读取的 size 是否小于或等于打算缩小的 size
if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
// 连续两次减小都可以
if (decreaseNow) {
// 减小
index = max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
// 判断是否实际读取的数量大于等于预估的,如果是则尝试扩容
} else if (actualReadBytes >= nextReceiveBufferSize) {
index = min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
四、Q&A
Q: 为什么读数据只尝试读取 16 次?
A:
pipeline.fireChannelReadComplete();
代表一个读事件的处理完成,pipeline.fireChannelRead(byteBuf);
代表一次读数据完成,而一次读事件的处理可能会包含多次读数据,也就是上面代码展示的 do-while 循环。
而处理一次读事件上面说了,最多尝试读数据 16 次,其原因在于:
对于多路复用器 Selector 来说,可以注册多个 Channel ,而轮询是交给 NioEventLoop 来做的,对于次数的限制,是为了把机会让给其他人,而不是一直死循环的读下去。
而如果超过 16 次数据还没有读完,后面的数据会再次出发读事件,继续读取。
