EZLippi-浮生志

Netty如何解决TCP拆包和粘包问题

发生TCP粘包或拆包有很多原因,常见的几点如下所示:

  1. 要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包。
  2. 待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。
  3. 要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包。
  4. 接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。

大家从网上一搜会知道Netty提供了LineBasedFrameDecoder(基于换行符),DelimiterBasedFrameDecoder(固定分隔符),FixedLengthFrameDecoder(固定长度首部)这几种Handler来解决,这篇文章主要是结合Netty源码的角度来分析Netty内部是如何处理的.
我们先来看下Netty读取数据的过程,Netty里每个Channel会关联一个Unsafe对象,所有的IO读写操作都是经过Unfafe来处理的,先看下AbstractNioUnsafe类中是如何读取数据的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
abstract class AbstractNioUnsafe{
public void read() {
//获取channel关联的pipeline
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
//每次读取socket允许读取的最大字节数
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
//省略了try-catch
int totalReadAmount = 0;
do {
//使用allocHandler分配一块内存缓冲区
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
//调用实现类从Socket读取字节到缓冲区
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
close = localReadAmount < 0;
break;
}
//发送channelRead事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
totalReadAmount += localReadAmount;

// stop reading
if (!config.isAutoRead()) {
break;
}

if (localReadAmount < writable) {
//超过了byteBuf最大能容纳的字节数
break;
}
} while (++ messages < maxMessagesPerRead);

pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
}
}

ByteToMessageDecoder分析

从上面的代码可以看到,每次读取时都先分配一块缓冲区,然后把数据读取到缓冲区,读取完之后发送ChannelRead事件,这时候如果ChannelPipeLine里的InboundHandler直接去读取可能会读取到一条不完整的消息,也有可能读取到多条消息,Netty提供了ByteToMessageDecoder来解决这个拆包的问题.将byte转换成message最大的难点在于tcp的拆包 ,拆包意味着我们接受的二进制数据还不足以构造成一个完整的message。如果遇到这种情况,应该怎么处理呢?

第一种方案:线程一直等待,直到byte可以构造成一个完整的message,这种方式最大的问题在于,如过剩余部分数据发送很慢,线程一直被占用,不能去处理其他客户端的请求。

第二种方案:为每个客户端连接(SocketChannel)定义一个应用层面的缓存,线程处理某个SocketChannel时,如果遇到byte不足以构成一个message,则将byte到放入缓存中,线程继续去处理其他的SocketChannel的任务。

显然ByteToMessageDecoder采用的是第二种方案,在ByteToMessageDecoder 中,有一个ByteBuf类型的字段,这就是上面所说的缓存,如果每个SocketChannel实例都在其ChannelPipeline中添加了一个ByteToMessageDecoder类型实例,那么就相当于每个SocketChannel都有了自己关联的缓存。

特别要注意的是:ByteToMessageDecoder实现类上是不允许添加@Sharable注解的,添加@Sharable表示多个SocketChannel可以共用一个ChannelHandler实例,但是明显ByteToMessageDecoder是特殊的,如果多个SocketChannel共用一个ByteToMessageDecoder实例,会造成缓存中的数据分不清到底是哪个SocketChannel的。

以下是ByteToMessageDecoder的相关源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {

ByteBuf cumulation;//cumulation用户缓存解析后的二进制数据
private boolean singleDecode;//在解析数据时,是否每次只回调一次decode方法,后面介绍
private boolean decodeWasNull;
private boolean first;//是否是第一次接受到输入的数据
//当接受到数据时,channelRead方法会被回调
//关于参数Object msg的说明:由于ByteToMessageDecoder只处理二进制数据 ,因此Object类型应该是ByteBuf。
//如果不是,将会直接交给pipeline中下一个handler处理。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {//如果msg类型是ByteBuf,进行解析
//out用于存储l解析二进制流得到的结果,一个二进制流可能会解析出多个消息,所以out是一个list
RecyclableArrayList out = RecyclableArrayList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;//将msg强转为ByteBuf类型
//判断cumulation == null;并将结果赋值给first。因此如果first为true,则表示第一次接受到数据
first = cumulation == null;
if (first) {//如果是第一次接受到数据,直接将接受到的数据赋值给缓存对象cumulation
cumulation = data;
} else {//如果不是第一次接受到数据
if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()
|| cumulation.refCnt() > 1) {
//如果cumulation中的剩余空间,不足以存储接收到的data
expandCumulation(ctx, data.readableBytes());//将cumulation扩容
}
cumulation.writeBytes(data);//将data拷贝到cumulation中
data.release();
}
//调用callDecode,开始解析cumulation中的数据,解析结果放到out中,这是一个list
//因为我们可能根据cumulation中的数据,解析出多个有效数据
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
//如果cumulation没有数据可读了,说明所有的二进制数据都被解析过了
//此时对cumulation进行释放,以节省内存空间。
//反之cumulation还有数据可读,那么if中的语句不会运行,因为不对cumulation进行释放
//因此也就缓存了用户尚未解析的二进制数据。
if (cumulation != null && !cumulation.isReadable()) {
cumulation.release();
cumulation = null;
}
int size = out.size();//获得解析二进制流得到的消息的个数
decodeWasNull = size == 0;
//迭代每一个解析出来的消息,调用下一个ChannelHandler进行处理
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.get(i));
}
out.recycle();
}
} else {//如果msg类型是不是ByteBuf,直接调用下一个handler进行处理
ctx.fireChannelRead(msg);
}
}

//callDecode方法主要用于解析cumulation 中的数据,并将解析的结果放入List<Object> out中。
//由于cumulation中缓存的二进制数据,可能包含了出多条有效信息,因此在callDecode方法中,默认会调用多次decode方法
//我们在覆写decode方法时,每次只解析一个消息,添加到out中,callDecode通过多次回调decode
//每次传递进来都是相同的List<Object> out实例,因此每一次解析出来的消息,都存储在同一个out实例中。
//当cumulation没有数据可以继续读,或者某次调用decode方法后,List<Object> out中元素个数没有变化,则停止回调decode方法。
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {//如果in,即cumulation中有数据可读的话,一直循环调用decode
int outSize = out.size();//获取上一次decode方法调用后,out中元素数量,如果是第一次调用,则为0。
int oldInputLength = in.readableBytes();//上次decode方法调用后,in的剩余可读字节数
//回调decode方法,由开发者覆写,用于解析in中包含的二进制数据,并将解析结果放到out中。
decode(ctx, in, out);
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
//outSize是上一次decode方法调用时out的大小,out.size()是当前out大小
//如果二者相等,则说明当前decode方法调用没有解析出有效信息。
if (outSize == out.size()) {
//此时,如果发现上次decode方法和本次decode方法调用候,in中的剩余可读字节数相同
//则说明本次decode方法没有读取任何数据解析
//(可能是遇到半包等问题,即剩余的二进制数据不足以构成一条消息),跳出while循环。
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
//处理人为失误 。如果走到这段代码,则说明outSize != out.size()。
//也就是本次decode方法实际上是解析出来了有效信息放到out中。
//但是oldInputLength == in.readableBytes(),说明本次decode方法调用并没有读取任何数据
//但是out中元素却添加了。
//这可能是因为开发者错误的编写了代码,例如mock了一个消息放到List中。
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}

if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}

/**抽象方法,由子类覆盖,建议在decode方法中,一次只解析一条信息,不足以构成一条信息的数据,不要读取*/
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

}
🐶 您的支持将鼓励我继续创作 🐶