EZLippi-浮生志

Netty支持的ChannelOption分析

在初始化ServerBootStrap或者BootStrap时我们常常会传递几个ChannelOption,比如常见的SO_BACKLOG,SO_REUSEADDR,SO_KEEPALIVE,TCP_NODELAY等,具体每个参数是什么作用,Netty支持哪些参数呢,这篇文章结合源码的角度来详细分析下.

下面是一个Netty作为服务端的例子,option用于传递参数给ServerSocketChannel,childOption传递给新建的SocketChannel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Main{
static main(){
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new NettyServerHandler());
}
});
}
}

可以看到这些选项都是在ChannelOption类里定义的,先来看下常用有哪些:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ChannelOption<T> extends UniqueName {
public static final ChannelOption<ByteBufAllocator> ALLOCATOR = valueOf("ALLOCATOR");
public static final ChannelOption<RecvByteBufAllocator> RCVBUF_ALLOCATOR = valueOf("RCVBUF_ALLOCATOR");
public static final ChannelOption<MessageSizeEstimator> MESSAGE_SIZE_ESTIMATOR = valueOf("MESSAGE_SIZE_ESTIMATOR");

public static final ChannelOption<Integer> MAX_MESSAGES_PER_READ = valueOf("MAX_MESSAGES_PER_READ");
public static final ChannelOption<Integer> WRITE_SPIN_COUNT = valueOf("WRITE_SPIN_COUNT");
public static final ChannelOption<Integer> WRITE_BUFFER_HIGH_WATER_MARK = valueOf("WRITE_BUFFER_HIGH_WATER_MARK");
public static final ChannelOption<Integer> WRITE_BUFFER_LOW_WATER_MARK = valueOf("WRITE_BUFFER_LOW_WATER_MARK");

public static final ChannelOption<Boolean> ALLOW_HALF_CLOSURE = valueOf("ALLOW_HALF_CLOSURE");

public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS = valueOf("CONNECT_TIMEOUT_MILLIS");
public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
public static final ChannelOption<Boolean> TCP_NODELAY = valueOf("TCP_NODELAY");
}

ChannelOption这么多,接下来我们分析每个的作用及使用场景:

  1. ALLOCATOR

ALLOCATOR和Netty的内存管理有关,我们先来回顾下Netty的内存管理机制,Netty内存管理的高性能主要依赖于两个关键点:

  • 内存的池化管理
  • 使用堆外内存(Direct Memory)

    堆外内存的优势:Java网络程序中使用堆外内存进行内容发送(Socket读写操作),可以避免了字节缓冲区的二次拷贝;相反,如果使用传统的堆内存(Heap Memory,其实就是byte[])进行Socket读写,JVM会将堆内存Buffer拷贝一份到堆外直接内存中,然后才写入Socket中。这样,相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。

ALLOCATOR参数用于决定Netty内部如何管理ByteBuf,其实现类有2个,PooledByteBufAllocatorUnPooledByteBufAllocator,如果不传这个Option,Netty会读取io.netty.allocator.type这个jvm参数,如果type=pooled那Netty
就会选择PooledByteBufAllocator来分配内存,只要没有设置-Dio.netty.noPreferDirect=true并且运行的JVM存在sun.misc.Unsafe就会优先使用Direct Memory,当然还有一个前提是分配了足够数量的Direct Memory.

Netty在启动过程中会打印和Allocator相关的参数,如果遇到问题可以调整日志级别来查看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
if (pageSizeFallbackCause == null) {
logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE);
} else {
logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE, pageSizeFallbackCause);
}
if (maxOrderFallbackCause == null) {
logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER);
} else {
logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause);
}
logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER);
logger.debug("-Dio.netty.allocator.tinyCacheSize: {}", DEFAULT_TINY_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL);
}
  1. MAX_MESSAGES_PER_READ

这个参数用于设置每次ChannelRead事件读取Socket时允许读取的最大消息条数

  1. RCVBUF_ALLOCATOR

这个参数用于设置接收缓存区内存分配策略,有AdaptiveRecvByteBufAllocator和FixedRecvByteBufAllocator两种实现,唯一的区别就是分配出来的ByteBuf大小是固定的还是自适应的,实际申请内存时还是要调用前面的PooledByteBufAllocator来分配.

  1. MESSAGE_SIZE_ESTIMATOR

顾名思义,该参数用于计算msg的大小,用于Channel发送/接收缓冲区的扩容

  1. CONNECT_TIMEOUT_MILLIS

客户端连接server的连接超时时间

  1. WRITE_SPIN_COUNT

  一个Loop写操作执行的最大次数,默认值为16,也就是说,对于大数据量的写操作至多进行16次,如果16次仍没有全部写完数据,此时会提交一个新的写任务给EventLoop,任务将在下次调度继续执行,其他的写请求不会因为单个大数据量写请求而耽误。
可以结合AbstractNioByteChannel的源码来分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class AbstractNioByteChannel{
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;

for (;;) {
Object msg = in.current();
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
boolean setOpWrite = false;
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}

flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
if (done) {
in.remove();
} else {
//写了writeSpinCount次数后依然没完成
incompleteWrite(setOpWrite);
break;
}
} else {
// Should not reach here.
throw new Error();
}
}
}
}
//从上面可以看到如果循环了writeSpinCount次还没完成,会调用incompleteWrite(),逻辑如下:
class AbstractNioByteChannel{
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
setOpWrite();
} else {
// Schedule flush again later so other tasks can be picked up in the meantime
Runnable flushTask = this.flushTask;
if (flushTask == null) {
flushTask = this.flushTask = new Runnable() {
@Override
public void run() {
flush();
}
};
}
eventLoop().execute(flushTask);
}
}

protected final void setOpWrite() {
final SelectionKey key = selectionKey();
if (!key.isValid()) {
return;
}
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
}

可以看到如果没写完会调用setOpWrite来给SelectionKey设置一个OP_Write事件,这样下次可以再被NioEventLoop中的Selector分配.这里不免会有一个疑问,一条msg为什么要写这么多次才写完,我们继续结合源码来分析,
我们在往Channel中写数据时会调用Channel.write()方法,该方法内部调用的是pipeline.write(),pipeline会调用tail.write(msg),上一篇文章我提到过tailContext是一个ChannelInboundHandler,它不会处理写事件,tailContext
会往上找ChannelOutboundHandler来处理写请求,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class AbstractChannelHandlerContext{
private void write(Object msg, boolean flush, ChannelPromise promise) {

AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(msg, promise);
if (flush) {
next.invokeFlush();
}
} else {
int size = channel.estimatorHandle().size(msg);
if (size > 0) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
}
}
Runnable task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, msg, size, promise);
} else {
task = WriteTask.newInstance(next, msg, size, promise);
}
safeExecute(executor, task, promise, msg);
}
}
}

经过所有用户自定义的outBoundHandler处理后,最终会走到headContext.write(),上一篇文章介绍过headContext是一个ChannelOutboundHandler,因此最后是由它处理的写事件,处理逻辑就是调用unsafe.write(msg, promise);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
protected abstract class AbstractUnsafe implements Unsafe{
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
int size;
try {
msg = filterOutboundMessage(msg);
size = estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}

outboundBuffer.addMessage(msg, size, promise);
}

public final void flush() {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
protected void flush0() {
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
outboundBuffer.failFlushed(t);
if (t instanceof IOException && config().isAutoClose()) {
close(voidPromise());
}
} finally {
inFlush0 = false;
}
}
}

可以看到最终调用了outboundBuffer.addMessage(msg, size, promise)把消息添加到ChannelOutboundBuffer这个内部的写入缓冲区中,我们调用channel.flush()来把数据写入到socket中,这时候调用
outboundBuffer.addFlush()来把之前添加到ChannelOutBoundBuffer里的所有消息标记为flushed,最后调用了flush0()来写数据,
在写数据时就会读取flushedEntry链表来写入数据到socket,这就解释了前面为什么会有最多写入writeSpinCount次的逻辑.

  1. WRITE_BUFFER_HIGH_WATER_MARK和WRITE_BUFFER_Low_WATER_MARK

这2个参数和前面的分析有紧密关联,前面介绍了在调用AbstractChannelHandlerContext.write()方法时,会把消息放到ChannelOutBoundBuffer中,然后会调用 buffer.incrementPendingOutboundBytes(size)来增加缓冲区大小,
为了避免缓冲区增长过快,当缓冲区数据大小超过High_Water_Mark时会把Channel标记为不可写,此时Channel.isWritable()返回false,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class ChannelOutBoundBuffer {
/**
* Increment the pending bytes which will be written at some point.
* This method is thread-safe!
*/
void incrementPendingOutboundBytes(long size) {
if (size == 0) {
return;
}

long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize >= channel.config().getWriteBufferHighWaterMark()) {
setUnwritable();
}
}

/**
* Decrement the pending bytes which will be written at some point.
* This method is thread-safe!
*/
void decrementPendingOutboundBytes(long size) {
if (size == 0) {
return;
}

long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (newWriteBufferSize == 0 || newWriteBufferSize <= channel.config().getWriteBufferLowWaterMark()) {
setWritable();
}
}
}

class AbstractChannel{
public boolean isWritable() {
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
return buf != null && buf.isWritable();
}
}

在flush过程中如果如果writeBufferSize降低到Low_Water_Mark以下时,会重新设置writable为true,这时候又可以写数据了.

  1. ALLOW_HALF_CLOSURE

是否允许半关闭,当socket读取出现异常时,正常会关闭该socket,如果开启了ALLOW_HALF_CLOSURE,会允许继续往socket中写数据

  1. SO_KEEPALIVE, SO_SNDBUF, SO_RCVBUF, SO_REUSEADDR, SO_LINGER, SO_BACKLOG

这几个参数是比较常见的TCP连接的参数,就不再介绍了.

🐶 您的支持将鼓励我继续创作 🐶