码恋 码恋

ALL YOUR SMILES, ALL MY LIFE.

目录
Netty 系列笔记之源码分析:业务处理
/  

Netty 系列笔记之源码分析:业务处理

一、引文

上一篇说的 Netty 接收数据的过程和读取数据的技巧,本质上就是对 OP_READ 事件的处理过程。数据拿到后,我们就可以进行具体的业务处理了。所以本篇来看看 Netty 对业务逻辑的处理。

二、业务处理过程一览

回顾接收数据的过程,在数据读取时,触发了 Netty 的 pipeline.fireChannelRead(byteBuf); ,后面的处理逻辑就在 pipline 的一系列 Handler 中执行。

image.png

如上图,从处理读事件调用 fireChannelRead() 方法开始,数据经过一连串的 pipline 。每个 pipline 由 Head 开头 Tail 结尾,中间有一些不同的 Handler , Handler 不是直接执行的,而是要放到 context 中去执行。

同时我们也注意到,读数据是 inbound 的过程,从 Head 到 Tail ,写数据与之相反。值得注意的是,并不是每个 Handler 都会执行,在途中也做了标示说明:Handler 要执行首先需要实现 ChannelInBoundHandler ,同时实现的 channelRead() 方法不能添加 @Skip 注解。

三、Netty 业务处理过程源码分析

我们从 fireChannelRead() 方法开始,了解业务处理的执行过程:

  • DefaultChannelPipeline#fireChannelRead(Object msg)
@Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }

如上所述,Handler 是放在 Context 中执行的,同时也看到了 pipline 从 Head 开始执行。

继续跟进:

  • AbstractChannelHandlerContext#invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

这其中有一个 EventExecutor ,默认是 NioEventLoop ,也可以自行指定。

private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                invokeExceptionCaught(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

继续看,就执行了 Handler 的 channelRead() 方法,第一次时进入到 DefaultChannelPipeline 中:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ctx.fireChannelRead(msg);
        }

其调用了 fireChannelRead(msg) 方法在 pipline 上继续传播,这时来到

AbstractChannelHandlerContext

@Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
        return this;
    }

可以看到其中有个名为 findContextInbound(MASK_CHANNEL_READ) 的方法,用来寻找下一个执行 channelRead 的地方:

private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do {
            ctx = ctx.next;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
        return ctx;
    }

首先执行 ctx.next ,得到下一个 context ,然后判断是否有执行的资格,如果有就继续执行所选的 Handler ,这里我们是以 Netty 的 EchoServer 为例的,所以下一步进入到 EchoServerHandler ,这里执行具体的业务逻辑处理。

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg);
    }

处理完成后,将结果返回写出去。

四、知识点

1、处理业务的本质

处理业务逻辑的本质是数据在 pipline 上的所有 Handler 执行 channelRead() 方法的过程。

Handler 需要实现 ChannelInboundHandler 并实现 channelRead() 方法,并且不添加 @Skip 注解才能被执行到。

2、执行处理逻辑的线程

默认是 Channel 绑定的 NioEventLoop 的线程,也可以通过以下方法设置:

pipline.addLast(new UnOrderedThreadPoolEventExcutor(10), handler);



❤ 转载请注明本文地址或来源,谢谢合作 ❤


center