接着上一篇,数据在 pipline 中处理完成后,往往需要给予客户端一定的响应,这个阶段就是本篇的主要内容:发送数据。
快递场景(包裹) | Netty 写数据(数据) |
---|---|
揽收到仓库 | write : 写到一个 buffer |
从仓库发货 | flush : 把 buffer 中的数据发送出去 |
揽收到仓库并立即发货(加急件) | writeAndFlush : 写到 buffer 立即发送 |
揽收与发货之间有中转仓库 | write 和 flush 之间有个 ChannelOutBoundBuffer |
WriteBufferWaterMark.high()
),会将可写的标志位改为 false ,让应用端自己做决定要不要继续发送数据了。写数据分为两个部分:
write:
将数据写到 Buffer ,调用 ChannelOutboundBuffer#addMessage()
方法将消息放到待写队列中的队尾,形成链表结构。
flush:
发送 Buffer 里面的数据,调用 AbstractChannel.AbstractUnsafe#flush()
ChannelOutboundBuffer#addFlush()
,将队首的 unflushedEntry 赋值为 null ,将对发送的数据 entry 赋值给 flushedEntry 。NioSocketChannel#dowrite()
,将已经发送的 entry 删除,如果数据发送了一半,则使用 progress 标记发送的位置。我们继续以 netty-example
模块下的 EchoServer
为例,分析 Netty 发送数据的过程。
这里 EchoServer
的例子做的是,客户端不断的发送递增的数字给服务端,服务端接收后再反向发送给客户端,所以我们从 EchoServerHandler#channelRead()
方法入手,看服务端如何发送数据。
在 channelRead()
方法中调用了 AbstractChannelHandlerContext#invokeWrite()
:
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
实际到后面是调用了 AbstractChannel 的 AbstractUnsafe#write()
方法:
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
在方法的 第 4 行 我们看到了发送数据的缓冲仓库真身 ChannelOutboundBuffer
。接下来的判空是用来判断 channel 是否已经关闭了,如果关闭则执行安全关闭的逻辑。
接下来的 16 到 27 行 大概计算一下消息的大小,然后就调用 outboundBuffer.addMessage(msg, size, promise)
方法将消息放入缓冲 buffer 中。
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
// 追加到队尾
tailEntry = entry;
// 如果没数据队尾就是 unflushedEntry
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(entry.pendingSize, false);
}
上述方法的逻辑就如上面的写数据过程图的 write 部分所示,将消息包装为 Entry 然后追加到队尾。在方法的最后有一行
incrementPendingOutboundBytes(entry.pendingSize, false);
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
// 根据之前的计算出的数据大小,更新 buffer 中还有多少数据没有处理完毕
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
// 判断待发送数据是否已经超过了高水位线
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
// 如果超过,则将状态变为不可写
setUnwritable(invokeLater);
}
}
这个方法用来判断待发送的数据是否已经超出给定的最高水位线,如果超出则将状态变为不可写,让用户决定是否继续写。
执行到这里,Netty 发送数据的 write 部分就执行完毕了。
回到 EchoServerHandler
:
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
在客户端发送过来的数据处理完毕后,执行了 flush()
方法,我们继续跟进。
数据准备
AbstractChannelHandlerContext#flush()
:
public ChannelHandlerContext flush() {
final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeFlush();
} else {
Tasks tasks = next.invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
}
safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);
}
return this;
}
继续执行 invokeFlush() :
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
}
后面实际调用了 AbstractChannel的AbstractUnsafe#flush()
方法:
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
在 第 9 行 我们看到了 addFlush() 方法,即数据准备。(下一行就是数据发送,我们下面讲。)
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
这个方法也和我们开头的过程图对应,即将 unflushedEntry 的数据放到 flushedEntry 中。
发送
数据准备完毕后,就到了发送阶段了:
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
// Check if we need to generate the exception at all.
if (!outboundBuffer.isEmpty()) {
if (isOpen()) {
outboundBuffer.failFlushed(new NotYetConnectedException(), true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
}
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
initialCloseCause = t;
close(voidPromise(), t, newClosedChannelException(t, "flush0()"), false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
initialCloseCause = t;
close(voidPromise(), t2, newClosedChannelException(t, "flush0()"), false);
}
}
} finally {
inFlush0 = false;
}
}
虽然方法的行数很多,但是逻辑比较简单,在 第 33 行 一眼就看到了关键代码,doWrite()
方法,真正执行发送数据的部分。
这里我们是 server 端为例,所以直接来到 NioSocketChannel#doWrite()
方法:
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
// 有数据要写,且能写入,最多尝试 16 次
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
// All written so clear OP_WRITE
// 数据全部写完,不需要尝试 16 次
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
// Ensure the pending writes are made of ByteBufs only.
// 尽量多写数据
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
// 数据组装为 ByteBuffer 数组,最多返回 1024,不超过 maxBytesPerGatheringWrite
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
// Always use nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
break;
case 1: {
// 单个数据直接调用 channel.write(buffer) 方法
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
// 移除已经写完的数据
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// 批量写,调用 channel.write(nioBuffers, 0, nioBufferCnt)
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// We limit the max amount to int above so cast is safe
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
// 写了 16 次还是没写完,直接交给 eventLoop 去执行写,而不是注册 OP_WRITE 事件。
incompleteWrite(writeSpinCount < 0);
}
其中根据数据的不同,分为单个数据和批量写两种逻辑,对比之下只是最后调用的 JDK 的 write 方法不同,我们以单个数据为例说明。
首先获得最多尝试的次数,默认最多尝试 16 次。然后判断数据如果全部写完,则终止循环。接着计算最多写数据的大小,并组装为 ByteBuffer 数组。之后就是两种数据分类的不同处理方式,单个数据时直接调用 JDK 的 channel.write(buffer) 方法写出数据,然后移除已经发送的数据,并标记数据发送的位置。
值得注意的是在方法的最后,如果写了 16 次数据还是没有写完,那么直接将写任务交给 EventLoop 去执行。
SocketChannel#write()
方法