码恋 码恋

ALL YOUR SMILES, ALL MY LIFE.

目录
Netty 系列笔记之源码分析:接收数据
/  

Netty 系列笔记之源码分析:接收数据

一、引言

系统在连接准备好之后,就是做数据的交互了。所以本篇分析接收数据相关的代码。

二、读数据过程一览

读数据的过程在 Work Thread 中:

  • Select 接收到 OP_READ 事件。
  • 处理 OP_READ 事件:
    • 分配一个初始容量为 1024 大小的 ByteBufer 接收数据。
    • 从 Channel 读数据到 ByteBuffer.
    • 记录实际接收数据的大小,调整下次分配 ByteBuffer 的大小。
    • 触发 pipline.fireChannelRead(byteBuf) 将读到的数据传播出去。
    • 判断接收数据的 ByteBuffer 是否读满:是,尝试继续读取知道没有数据或达到限定次数 16 次;否,结束本轮读取,等待下一个 OP_READ 事件。

三、Netty 读数据过程源码分析

1、接收读(OP_READ)事件

服务端在接收到读事件后,由 NioEventLoop 负责处理,其入口与接收连接相同,都是始于 Selector 轮询事件,然后调用 NioEventLoop#processSelectedKeys() 方法处理。

2、处理读事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

最终进入到上面的代码片段,继续查看 read() 方法的一个实现:AbstractNioByteChannel#read()

@Override
public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            final ChannelPipeline pipeline = pipeline();
            // 获得 ByteBuf 的分配器
            final ByteBufAllocator allocator = config.getAllocator();
            // Handle 用于决定下次需要分配多大的 ByteBuf
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    // 分配 ByteBuf, 调用 guess() 方法,初始容量 1024
                    byteBuf = allocHandle.allocate(allocator);
                    // 向 ByteBuf 中装入数据,本质就是调用 channel.read(buf) 读数据
                    // 读完数据后记录下读了多少数据,如果读满了,则下次就扩容
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }
                    // 记录读的次数
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    // pipline 上执行相应的业务逻辑
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                    // 是否继续读,逻辑是:1. 是否上次读满,如果没读满就代表已经读完了,不需要继续了
                    // 2. 是否达到最大的读取次数: 16 次 并且读到的数据 > 0,即本次读到了数据
                } while (allocHandle.continueReading());
                // 记录本次读的数据大小,用来计算下次分配 ByteBuf 的大小
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

在着手处理读事件的时候,首先需要分配一个 ByteBuf ,Netty 通过 AdaptiveRecvByteBufAllocator 来计算每次需要分配的大小,初始化容量是 1024 。ByteBuf 分配之后,向其中读入数据,并记录本次读取数据的大小,用来计算下次需要分配的容量。

Netty 的这种设计可以实现自适应的 ByteBuf 分配,可以节省空间。下面看下 AdaptiveRecvByteBufAllocator 的实现:

主要逻辑是计算需要分配的 ByteBuf 大小,入口是在本次读数据结束后调用 readComplete() 方法:

@Override
public void readComplete() {
    record(totalBytesRead());
}

record() 方法:

private void record(int actualReadBytes) {
            // 尝试是否可以减小分配的空间来满足需求:当前实际读取的 size 是否小于或等于打算缩小的 size
            if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
                // 连续两次减小都可以
                if (decreaseNow) {
                    // 减小
                    index = max(index - INDEX_DECREMENT, minIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                } else {
                    decreaseNow = true;
                }
                // 判断是否实际读取的数量大于等于预估的,如果是则尝试扩容
            } else if (actualReadBytes >= nextReceiveBufferSize) {
                index = min(index + INDEX_INCREMENT, maxIndex);
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            }
        }

四、Q&A

Q: 为什么读数据只尝试读取 16 次?
A:
pipeline.fireChannelReadComplete(); 代表一个读事件的处理完成,pipeline.fireChannelRead(byteBuf); 代表一次读数据完成,而一次读事件的处理可能会包含多次读数据,也就是上面代码展示的 do-while 循环。

而处理一次读事件上面说了,最多尝试读数据 16 次,其原因在于:

对于多路复用器 Selector 来说,可以注册多个 Channel ,而轮询是交给 NioEventLoop 来做的,对于次数的限制,是为了把机会让给其他人,而不是一直死循环的读下去。

而如果超过 16 次数据还没有读完,后面的数据会再次出发读事件,继续读取。



❤ 转载请注明本文地址或来源,谢谢合作 ❤


center