一、引文
上一篇说的 Netty 接收数据的过程和读取数据的技巧,本质上就是对 OP_READ 事件的处理过程。数据拿到后,我们就可以进行具体的业务处理了。所以本篇来看看 Netty 对业务逻辑的处理。
二、业务处理过程一览
回顾接收数据的过程,在数据读取时,触发了 Netty 的 pipeline.fireChannelRead(byteBuf);
,后面的处理逻辑就在 pipline 的一系列 Handler 中执行。
如上图,从处理读事件调用 fireChannelRead()
方法开始,数据经过一连串的 pipline 。每个 pipline 由 Head 开头 Tail 结尾,中间有一些不同的 Handler , Handler 不是直接执行的,而是要放到 context 中去执行。
同时我们也注意到,读数据是 inbound 的过程,从 Head 到 Tail ,写数据与之相反。值得注意的是,并不是每个 Handler 都会执行,在途中也做了标示说明:Handler 要执行首先需要实现 ChannelInBoundHandler
,同时实现的 channelRead()
方法不能添加 @Skip 注解。
三、Netty 业务处理过程源码分析
我们从 fireChannelRead() 方法开始,了解业务处理的执行过程:
DefaultChannelPipeline#fireChannelRead(Object msg)
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
如上所述,Handler 是放在 Context 中执行的,同时也看到了 pipline 从 Head 开始执行。
继续跟进:
AbstractChannelHandlerContext#invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
这其中有一个 EventExecutor ,默认是 NioEventLoop ,也可以自行指定。
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}
继续看,就执行了 Handler 的 channelRead() 方法,第一次时进入到 DefaultChannelPipeline
中:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);
}
其调用了 fireChannelRead(msg)
方法在 pipline 上继续传播,这时来到
AbstractChannelHandlerContext
:
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
可以看到其中有个名为 findContextInbound(MASK_CHANNEL_READ)
的方法,用来寻找下一个执行 channelRead 的地方:
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
首先执行 ctx.next ,得到下一个 context ,然后判断是否有执行的资格,如果有就继续执行所选的 Handler ,这里我们是以 Netty 的 EchoServer 为例的,所以下一步进入到 EchoServerHandler ,这里执行具体的业务逻辑处理。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
处理完成后,将结果返回写出去。
四、知识点
1、处理业务的本质
处理业务逻辑的本质是数据在 pipline 上的所有 Handler 执行 channelRead() 方法的过程。
Handler 需要实现 ChannelInboundHandler 并实现 channelRead() 方法,并且不添加 @Skip 注解才能被执行到。
2、执行处理逻辑的线程
默认是 Channel 绑定的 NioEventLoop 的线程,也可以通过以下方法设置:
pipline.addLast(new UnOrderedThreadPoolEventExcutor(10), handler);
