EZLippi-浮生志

Netty ChannelHandler处理流程

很多人都说自己熟悉Netty,但问到Netty内部的消息流转时却又答不出来,阅读源码可以学习框架的设计思想,帮助我们更好的理解和使用框架,这篇文章主要从源码的角度介绍Netty内部的消息处理流程.

先来看下面这段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Main{
public void init(){
ServerBootstrap b = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
}
}

大家对这段代码肯定不陌生,但是对Netty内部的请求流转就不一定熟悉了,比如LoggingHandler和EchoServerHandler分别是处理什么请求的,handler和childHandler具体有啥区别,下面我们就从ServerBootStrap的启动过程及消息流转来彻底理解.

ServerBootStrap启动过程

调用ServerBootstrap.bind()方法后会调用AbstractBootStrap的doBind()方法,然后调用initAndRegister()方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class AbstractBootStrap{
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
}

final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
//省略
}
ChannelFuture regFuture = group().register(channel);
}
}

可以看到然后调用initAndRegister()方法内部调用newChannel()方法创建了一个新的Channel,该方法内部就是调用clazz.newInstance()方法来创建了一个新的实例,由于我们初始化ServerBootStrap时传递的Channel是NioServerSocketChannel,因此创建出来的就是NioServerSocketChannel
的一个实例,然后调用了init(channel)方法,可以看下ServerBootStrap里的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class ServerBootStrap{
public void init(Channel channel) {
ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler());
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
}

handler()方法返回的是最开始我们注册的LoggingHandler,这里把LoggingHandler加入到ChannelPipeLine的尾部,实际上不是尾部,Netty会给每个ChannelPipeLine创建一个HeadHandler和TailHandler,这个TailHandler才是真正的尾部,
因此LoggingHandler是加在TailHandler的前面,接下来我们简要分析下HeadHandler和TailHandler的逻辑.

HeadHandler和TailHandler

先借用网上一张图来看下PipeLine的内部结构:

通过上图我们可以看到,而 ChannelPipeline中维护了一个由ChannelHandlerContext组成的双向链表,这个链表的头是HeadContext,链表的尾是TailContext,并且每个ChannelHandlerContext中又关联着一个ChannelHandler.
Netty里每个Handler分为InBoundHandler和OutBoundHandler,inBound和outBound在读写消息的时候会用到,通过源代码可以知道HeadContext是一个ChannelOutBoundHandler,而TailContext是一个ChannelInBoundHandler:

1
2
class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler{}
class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler{}

init过程

简单介绍完HeadHandler和TailHandler后我们再回到ServerBootStrap的init()方法,注册完LoggingHandler后又在pipeline尾部加上了ChannelInitializer,这时候init()方法基本执行完了,接下来调用group().register(channel),
这个group()方法返回的就是我们初始化时传递的BossGroup,方法内部会调用SingleThreadEventLoop.register()方法,SingleThreadEventLoop内部关联着一个线程,register()方法内部调用了
channel.unsafe().register(this, promise),最后走到AbstractChannel的register0()方法,方法内部逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class AbstractUnsafe{
private void register0(ChannelPromise promise) {
try {
doRegister();
registered = true;
pipeline.fireChannelRegistered();
if (isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

其中doRegister()方法由AbstractNioChannel类重写了,如下:

1
2
3
4
5
6
7
8
9
10
11
12
class AbstractNioChannel {
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
}
}
}
}

主要逻辑就和普通的NIO编程类似了,就是把ServerSocketChannel注册到Nio的Selector上,这个eventLoop就是我们初始化时传递的NioEventLoop,每个NioEventLoop内部都关联一个Selector对象,只不过
BossGroup的EventLoop的Selector只处理IO_Accept事件,注册完之后register0()方法内部调用了pipeline.fireChannelRegistered()来发送ChannelRegister事件,这里需要注意的是上面注册的是IO_Read事件,
而ServerSocketChannel需要处理IO_Accept事件,从它的构造方法也可以看出来:

1
2
3
4
5
6
class NioServerSocketChannel{
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}

我们来看下DefaultChannelPipeline里的实现:

1
2
3
4
5
6
class DefaultChannelPipeline{
public ChannelPipeline fireChannelRegistered() {
head.fireChannelRegistered();
return this;
}
}

可以看到该方法内部只是简单调用了head.fireChannelRegistered(),也就是说Channel的读事件是从headContext开始处理的,由于HeadContext是一个ChannelOutboundHandler,因此它没有重写fireChannelRegistered方法,我们看下父类AbstractChannelHandlerContext的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class AbstractChannelHandlerContext {
public ChannelHandlerContext fireChannelRegistered() {
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
return this;
}

private void invokeChannelRegistered() {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
}

里面的逻辑也比较清晰,就是找到下一个InBoundHandler,然后调用它的invokeChannelRegistered(),最终是调用ChannelInboundHandler.channelRegistered()方法,
通过前面的分析我们知道NioServerSocketChannel的pipeline链路为headContext->LoggingHandler->ChannelInitializer->tailContext,
接下来看下ChannelInitializer怎么处理这个事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

@Override
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline pipeline = ctx.pipeline();
boolean success = false;
initChannel((C) ctx.channel());
pipeline.remove(this);
ctx.fireChannelRegistered();
success = true;
//省略了try-catch逻辑
}
}

可以看到方法内部调用了initChannel(),还记得我们前面注册ChannelInitializer的时候是不是重写了initChannel()方法,我把这段代码再贴上来:

1
2
3
4
5
6
7
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});

可以看到initChannel方法内部是在pipeLine尾部加上了ServerBootstrapAcceptor这个Handler,从名字也可以猜测出这个Handler用于接受新的客户端连接,后面我们再具体介绍,调用完initChannel()方法后调用了pipeline.remove()方法
把ChannelInitializer从pipeLine中移除,最后的pipeLine链接结构是headContext->LoggingHandler->ServerBootstrapAcceptor->tailContext,最后调用了ctx.fireChannelRegistered()来继续传递channelRegistered事件,
这个逻辑非常重要,如果后续的Handler想处理这个事件,就必须调用context.fire***方法来继续在pipeLine中传递,到目前为止ServerBootStrap的启动过程我们就分析完了,接下来我们来分析ServerBootstrapAcceptor处理新请求的过程.

新连接建立

前面我们介绍过register过程在Selector上注册了IO_Accept事件,那有新事件过来之后是怎么处理的呢,我们继续来分析,BossGroup里每个NioEventLoop都关联一个Selector对象,当Selector上有事件时,处理逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class NioEventLoop {
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
//省略其他事件处理逻辑

} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
}

当有OP_READ或者OP_Accept事件时,调用了channel.unsafe.read()方法,我们看下unsafe.read()方法内部的逻辑,由于NioServerSocketChannel继承自AbstractNioMessageChannel,我们看下AbstractNioMessageChannel的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private final class NioMessageUnsafe extends AbstractNioUnsafe {

private final List<Object> readBuf = new ArrayList<Object>();

@Override
public void read() {
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
try {
for (;;) {
int localRead = doReadMessages(readBuf);
//省略其他逻辑
}
} catch (Throwable t) {
}
setReadPending(false);
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();

}
}

可以看到里面主要逻辑是调用doReadMessages(readBuf),我们看下NioServerSocketChannel的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class NioServerSocketChannel{
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
//省略
}

return 0;
}
}

可以看到这里又是我们比较熟悉的NIO的操作了,由于前面触发的是IO_Accept事件,因此这里调用了ServerSocketChannel.accept()方法来获取新建立的SocketChannel,然后封装成NioSocketChannel并添加到buf列表,
最后read()方法调用了pipeline.fireChannelRead()方法来触发channelRead事件,根据前面的分析我们知道pipeline内部是调用headContext.fireChannelRead()方法,然后按顺序调用各个ChannelInBoundHandler的channelRead方法,
我们来看下前面注册的ServerBootstrapAcceptor的实现逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class ServerBootstrapAcceptor{
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;

child.pipeline().addLast(childHandler);

try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}

可以看到该方法给新建立的Channel添加了childHandler,这个childHandler就是最开始我们通过childHandler()方法注册上去的ChannelInitializer实例,不用看代码我们也能猜测出来最后会调用channelRegistered()方法来调用
ChannelInitializer的initChannel()方法来把我们自定义的EchoServerHandler添加到childChannel的pipeLine里面,最后调用了childGroup.register(child),这个childGroup就是最开始我们注册的workGroup,最后又会回到
我们前面介绍的register方法,里面就是调用channel.unsafe().register(this, promise),然后调用pipeline.fireChannelRegistered()来激活channelRegister事件,最后就会调用到ChannelInitializer的initChannel方法
把EchoServerHandler添加到pipeline里面,最后新建立的channel对应的pipeline链表就是headContext->EchoServerHandler->tailContext.

同样,workGroup的每个NioEventLoop都关联一个Selector实例,前面调用childGroup.register()方法时会选择一个NioEventLoop来处理这个Channel,因此当我们有很多客户端连接时,这些连接分散在不同的Selector中处理.

🐶 您的支持将鼓励我继续创作 🐶