TCP 粘包/拆包
问题分析
TCP 是个“流”协议,所谓流,就是没有界限的一串数据。TCP 底层并不了解上层业务数据的具体含义,它会根据 TCP 缓冲区的实际情况进行包的划分。所以在业务上认为,一个完整的包可能会被 TCP 拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的 TCP 粘包和拆包问题。
例如在上一篇文章的Demo程序中,客户端向服务端发送了两条数据,服务端也向客户端响应了两条数据。
// 服务端处理器static class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channel read..."); // 读数据并处理请求 ByteBuf reqBuf = (ByteBuf) msg; byte[] reqBytes = new byte[reqBuf.readableBytes()]; reqBuf.readBytes(reqBytes); System.out.println("Request data: " + new String(reqBytes, StandardCharsets.UTF_8)); // 响应客户端请求 System.out.println("channel writing..."); ctx.channel().write(Unpooled.copiedBuffer("Hello Netty Client!", StandardCharsets.UTF_8)); ctx.channel().write(Unpooled.copiedBuffer("Hello World!!!", StandardCharsets.UTF_8)); ctx.channel().flush(); }}// 客户端处理器static class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channel active..."); ctx.channel().write(Unpooled.copiedBuffer("Hello Netty Server!", StandardCharsets.UTF_8)); ctx.channel().write(Unpooled.copiedBuffer("Hello World!!!", StandardCharsets.UTF_8)); ctx.channel().flush(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channel read..."); ByteBuf resBuf = (ByteBuf) msg; byte[] resBytes = new byte[resBuf.readableBytes()]; resBuf.readBytes(resBytes); System.out.println("Response data: " + new String(resBytes, StandardCharsets.UTF_8)); // 关闭通道 ctx.channel().close(); }}
从输出结果来看,客户端和服务端接收到的数据输出只有一行,说明两条数据包粘在一起了,这样我们就没法区分一个独立完整的数据。
// 服务端channel read...Request data: Hello Netty Server!Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Hello World!!!
发生原因
发生 TCP 粘包/拆包的常见原因有如下几点:
要发送的数据大于 TCP 发送缓冲区剩余空间大小,将会发生拆包。
待发送数据大于 MSS(最大报文长度),TCP在传输前将根据 MSS 大小进行拆包分段发送。
要发送的数据小于 TCP 发送缓冲区的大小,TCP 将多次写入缓冲区的数据一次发送出去,将会发生粘包。
接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。
解决方案
由于底层的 TCP 无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决。
要解决粘包和拆包的问题,关键点在于读取方需要知道一个完整的数据包,它是如何开始,如何结束的。根据业界主流协议的解决方案,可以归纳如下。
消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格;
在包尾增加回车换行符作为结束标志,例如FTP协议,这种方式在文本协议中应用比较广泛;
将消息分为消息头和消息体,消息头中包含表示消息总长度的字段,通常消息头的第一个字段使用 int32
来表示消息的总长度;
使用更复杂的应用层协议。
例如下面的程序,基于在消息头添加一个固定的 int32 来表示消息的总长度的方案:
在写入数据时,首先写入消息的字节长度,再写入完整的数据。
在读取时,则先读取一个 int 值,这个值就表示消息的长度,然后再读取对应长度的字节就是一条完整的消息。
如果发生拆包,那字节长度就会小于头部指定的 int 值,此时就跳过读取,等待后续数据接收完整再读。
如果发生粘包,就需要多次解码,直到数据读完或数据长度小于 int 值。
public class RequestUtil { /** * 写入通道 */ public static void writeAndFlush(String data, ChannelHandlerContext ctx) { ByteBuf buf = Unpooled.buffer(); byte[] bytes = data.getBytes(StandardCharsets.UTF_8); // 先写入数据的长度 buf.writeInt(bytes.length); // 再写入完整的数据 buf.writeBytes(bytes); // 写入通道 ctx.channel().writeAndFlush(buf); } /** * 对消息解码,可能有多块数据粘包,所以解码每块消息后放入 out 集合中 */ public static void decode(Object msg, List<String> out) { ByteBuf buf = (ByteBuf) msg; // 校验消息长度,必须达到4个字节 if (buf.readableBytes() < 4) { return; } // 标记当前可读位置,便于后面重新读取 buf.markReaderIndex(); // 先读取4个字节的int,代表消息的bytes长度 int len = buf.readInt(); // 检查是否有拆包,小于数据长度,说明数据不完整,等待后续的数据进来 if (buf.readableBytes() < len) { // 还原读索引 buf.resetReaderIndex(); return; } // 读取指定长度的字节,多余的不读取,避免粘包问题 byte[] bytes = new byte[len]; buf.readBytes(bytes); out.add(new String(bytes, StandardCharsets.UTF_8)); // 如果还有数据就继续读 if (buf.isReadable()) { decode(msg, out); } }}
服务端、客户端读取和写入改造:
// 服务端static class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channel read..."); // 读数据并处理请求 List<String> dataList = new ArrayList<>(); RequestUtil.decode(msg, dataList); // 处理数据 for (String data : dataList) { handleMsg(ctx, data); } } private void handleMsg(ChannelHandlerContext ctx, String msg) { System.out.println("Request data: " + msg); // 响应客户端请求 System.out.println("channel writing..."); RequestUtil.writeAndFlush("Hello Netty Client!", ctx); RequestUtil.writeAndFlush("Hello World!!!", ctx); }}// 客户端static class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channel active..."); RequestUtil.writeAndFlush("Hello Netty Server!", ctx); RequestUtil.writeAndFlush("Hello World!!!", ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channel read..."); List<String> dataList = new ArrayList<>(); RequestUtil.decode(msg, dataList); // 处理数据 for (String data : dataList) { handleMsg(ctx, data); } } private void handleMsg(ChannelHandlerContext ctx, String msg) { System.out.println("Response data: " + msg); }}
服务端、客户端输出,通过输出可以看到这时处理的数据就是一个完整的数据了。
// 服务端channel read...Request data: Hello Netty Server!Request data: Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Response data: Hello World!!!Response data: Hello Netty Client!Response data: Hello World!!!
编码器和解码器
为了解决 TCP 粘包/拆包导致的半包读写问题,Netty 默认提供了多种编解码器用于处理半包,使用起来也非常方便。
LineBasedFrameDecoder
LineBasedFrameDecoder 行解码器 支持按换行符 /n
或 /r/n
来读取字节数组,我们在写入数据的时候需要在一条数据的最后加上换行符,读取的时候 LineBasedFrameDecoder 已经自动去掉了换行符,无需再处理。
构造 LineBasedFrameDecoder 必须传入 maxLength 参数,表示最大长度,如果遍历超过这个长度的字节都没有找到分隔符就会报错。
public LineBasedFrameDecoder(final int maxLength) { this(maxLength, true, false);}/** * @param maxLength 最大长度 * @param stripDelimiter 去除分隔符 * @param failFast 超过长度时是否直接报错 */public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) { this.maxLength = maxLength; this.failFast = failFast; this.stripDelimiter = stripDelimiter;}
只需要在服务端和客户端添加这个处理器即可:
.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline channelPipeline = socketChannel.pipeline(); // 添加一些处理器 channelPipeline // 行解码器 .addLast(new LineBasedFrameDecoder(1024)) // 自定义的服务端处理器 .addLast(new NettyServerHandler()); }})
再看此时的服务端和客户端数据交互代码,可以看到写入数据的时候需要加上换行符,读数据的时候跟以前是一样的方式,但此时读出来的数据不会粘包在一起了。
static final String SEPARATOR = System.getProperty("line.separator");// 服务端static class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channel read..."); // 读数据并处理请求 ByteBuf buf = (ByteBuf) msg; byte[] bytes = new byte[buf.readableBytes()]; buf.readBytes(bytes); System.out.println("Request data:" + new String(bytes, StandardCharsets.UTF_8)); // 响应请求 ctx.channel().writeAndFlush(Unpooled.copiedBuffer("Hello Netty Client!" + SEPARATOR, StandardCharsets.UTF_8)); ctx.channel().writeAndFlush(Unpooled.copiedBuffer("Hello World!!!" + SEPARATOR, StandardCharsets.UTF_8)); }}// 客户端static class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channel active..."); // 写数据时加上分隔符 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Netty Server!" + SEPARATOR, StandardCharsets.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello World!!!" + SEPARATOR, StandardCharsets.UTF_8)); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channel read..."); ByteBuf buf = (ByteBuf) msg; byte[] bytes = new byte[buf.readableBytes()]; buf.readBytes(bytes); System.out.println("Response data:" + new String(bytes, StandardCharsets.UTF_8)); }}
通过 LineBasedFrameDecoder 的继承体系可以知道,它是一个通道输入处理器,用于处理输入数据。父类 ByteToMessageDecoder
,从字面意思可以知道它是将字节转换为具体消息的解码器。
通过 ByteToMessageDecoder 的 channelRead
方法进去可以看到,它最终会调用子类的 decode() 方法来解码消息,并将消息放入 out 集合中。解析完后就会遍历每条数据触发通道的读事件,这样在后续的处理器中得到的 msg 就是一个完整的消息了。
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { // 解析结果 CodecOutputList out = CodecOutputList.newInstance(); try { // 解码 decode(ctx, msg, out); } catch (Exception e) { throw new DecoderException(e); } finally { try { // 触发通道读事件 fireChannelRead(ctx, out, out.size()); } finally { out.recycle(); } } } else { ctx.fireChannelRead(msg); }}// 抽象方法,消息解码protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out);static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) { for (int i = 0; i < numElements; i ++) { // 传入每条消息,触发读事件 ctx.fireChannelRead(msgs.getUnsafe(i)); }}
接着看 LineBasedFrameDecoder 实现的父类 decode 方法,它首先会基于内存地址遍历字节,找到第一个分隔符的位置,然后读取对应长度的数据,默认跳过分隔符。
@Overrideprotected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); }}protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { // 从缓冲区查找一行的结束位置,即分隔符的位置 final int eol = findEndOfLine(buffer); if (eol >= 0) { final ByteBuf frame; // 结束符位置 - 当前位置 = 数据长度 final int length = eol - buffer.readerIndex(); // 分隔符长度 final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1; // 超过最大长度则报错 if (length > maxLength) { buffer.readerIndex(eol + delimLength); fail(ctx, length); return null; } // 默认去除分隔符 if (stripDelimiter) { // 读取指定长度的数据 frame = buffer.readRetainedSlice(length); // 跳过分隔符 buffer.skipBytes(delimLength); } else { // 读取数据+分隔符 frame = buffer.readRetainedSlice(length + delimLength); } return frame; } else { //... }}private int findEndOfLine(final ByteBuf buffer) { int totalLength = buffer.readableBytes(); // 基于内存地址遍历字节 寻找分隔符 int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteProcessor.FIND_LF); //... return i;}
通过简单的分析源码可以了解到,LineBasedFrameDecoder 的工作原理是它依次遍历 ByteBuf 中的可读字节,判断看是否有\n
或者 \r\n
, 如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。
StringDecoder & StringEncoder
前面在写入数据时,都是先将字符串写入一个 ByteBuf 缓冲区再写入 Channel;读取数据时都是将 msg 强转成 ByteBuf,再读取字节转换成字符串。可以看到这样转来转去很麻烦,我们可以添加内置的 StringDecoder 和 StringEncoder 来避免这种转换。
// 服务端channel read...Request data: Hello Netty Server!Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Hello World!!!0
再看此时的服务端和客户端数据交互代码,可以看到此时就是直接写入字符串,读取的时候直接将 msg 转成字符串来处理。这样就可以直接写入和读取字符串数据了,避免通过 ByteBuf 来读取。
// 服务端channel read...Request data: Hello Netty Server!Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Hello World!!!1
简单分析它的源码可知,写入数据时编码,就是将字符串写入一个 ByteBuf 中;读取数据时解码,读取 ByteBuf 中的字节数组再转成字符串。
// 服务端channel read...Request data: Hello Netty Server!Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Hello World!!!2
DelimiterBasedFrameDecoder
DelimiterBasedFrameDecoder 可以自动完成以分隔符做结束标志的消息的解码。
同样的方式,只需要在 ChannelPipeline 中添加 DelimiterBasedFrameDecoder 解码器即可,在构造 DelimiterBasedFrameDecoder 时需传入特定的分隔符标志。
// 服务端channel read...Request data: Hello Netty Server!Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Hello World!!!3
在写数据时,需在末尾添加分隔符标志。
// 服务端channel read...Request data: Hello Netty Server!Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Hello World!!!4
FixedLengthFrameDecoder
FixedLengthFrameDecoder 是固定长度解码器,能够按照指定的长度对消息进行自动解码。
// 服务端channel read...Request data: Hello Netty Server!Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Hello World!!!3
应用层协议
Netty 提供了多种协议支持,例如 HTTP、WebSocket、SSL 等等,我们可以通过它提供的组件快速开发出对应协议的服务器,而不用担心 TCP 粘包/拆包等网络问题。
HTTP 协议
HTTP 是建立在 TCP 传输协议之上的应用层协议,下面这段程序是基于 Netty 开发的一个 HTTP 服务器。
// 服务端channel read...Request data: Hello Netty Server!Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Hello World!!!6
可以看到主要就是添加如下几个处理器来支持HTTP协议:
HttpRequestDecoder:HTTP 请求消息解码器,对 HTTP 请求行、请求头、请求体进行解析。
HttpObjectAggregator:聚合解码器,用于将多个消息转换为单一的 FullHttpRequest 或 FullHttpResponse,因为 HTTP 解码器会将每个 HTTP 消息解码生成多个消息对象。
HttpResponseEncoder:HTTP 响应消息编码器,对 HTTP 状态行、响应头、响应体 进行编码。
ChunkedWriteHandler:支持异步发送大的码流,例如大的文件传输,但不会占用过多的内存,防止内存溢出。
最后是添加自定义的业务处理器,处理 HTTP 请求和响应。
从 Netty 的源码可以了解到,Netty 提供了 http、http2、redis、mqtt、smtp、xml 等纵多主流协议的支持,基于 Netty 我们可以快速开发出相应的服务器和客户端程序,而不用担心网络问题的处理。
原文:https://juejin.cn/post/7100846192795811871