首页>>后端>>java->Netty 系列(3) — Netty 处理TCP 粘包/拆包

Netty 系列(3) — Netty 处理TCP 粘包/拆包

时间:2023-12-06 本站 点击:0

TCP 粘包/拆包

问题分析

TCP 是个“流”协议,所谓流,就是没有界限的一串数据。TCP 底层并不了解上层业务数据的具体含义,它会根据 TCP 缓冲区的实际情况进行包的划分。所以在业务上认为,一个完整的包可能会被 TCP 拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的 TCP 粘包和拆包问题。

例如在上一篇文章的Demo程序中,客户端向服务端发送了两条数据,服务端也向客户端响应了两条数据。

// 服务端处理器static class NettyServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("channel read...");        // 读数据并处理请求        ByteBuf reqBuf = (ByteBuf) msg;        byte[] reqBytes = new byte[reqBuf.readableBytes()];        reqBuf.readBytes(reqBytes);        System.out.println("Request data: " + new String(reqBytes, StandardCharsets.UTF_8));        // 响应客户端请求        System.out.println("channel writing...");        ctx.channel().write(Unpooled.copiedBuffer("Hello Netty Client!", StandardCharsets.UTF_8));        ctx.channel().write(Unpooled.copiedBuffer("Hello World!!!", StandardCharsets.UTF_8));        ctx.channel().flush();    }}// 客户端处理器static class NettyClientHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        System.out.println("channel active...");        ctx.channel().write(Unpooled.copiedBuffer("Hello Netty Server!", StandardCharsets.UTF_8));        ctx.channel().write(Unpooled.copiedBuffer("Hello World!!!", StandardCharsets.UTF_8));        ctx.channel().flush();    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("channel read...");        ByteBuf resBuf = (ByteBuf) msg;        byte[] resBytes = new byte[resBuf.readableBytes()];        resBuf.readBytes(resBytes);        System.out.println("Response data: " + new String(resBytes, StandardCharsets.UTF_8));        // 关闭通道        ctx.channel().close();    }}

从输出结果来看,客户端和服务端接收到的数据输出只有一行,说明两条数据包粘在一起了,这样我们就没法区分一个独立完整的数据。

// 服务端channel read...Request data: Hello Netty Server!Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Hello World!!!

发生原因

发生 TCP 粘包/拆包的常见原因有如下几点:

要发送的数据大于 TCP 发送缓冲区剩余空间大小,将会发生拆包。

待发送数据大于 MSS(最大报文长度),TCP在传输前将根据 MSS 大小进行拆包分段发送。

要发送的数据小于 TCP 发送缓冲区的大小,TCP 将多次写入缓冲区的数据一次发送出去,将会发生粘包。

接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。

解决方案

由于底层的 TCP 无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决。

要解决粘包和拆包的问题,关键点在于读取方需要知道一个完整的数据包,它是如何开始,如何结束的。根据业界主流协议的解决方案,可以归纳如下。

消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格;

在包尾增加回车换行符作为结束标志,例如FTP协议,这种方式在文本协议中应用比较广泛;

将消息分为消息头和消息体,消息头中包含表示消息总长度的字段,通常消息头的第一个字段使用 int32 来表示消息的总长度;

使用更复杂的应用层协议。

例如下面的程序,基于在消息头添加一个固定的 int32 来表示消息的总长度的方案:

在写入数据时,首先写入消息的字节长度,再写入完整的数据。

在读取时,则先读取一个 int 值,这个值就表示消息的长度,然后再读取对应长度的字节就是一条完整的消息。

如果发生拆包,那字节长度就会小于头部指定的 int 值,此时就跳过读取,等待后续数据接收完整再读。

如果发生粘包,就需要多次解码,直到数据读完或数据长度小于 int 值。

public class RequestUtil {    /**     * 写入通道     */    public static void writeAndFlush(String data, ChannelHandlerContext ctx) {        ByteBuf buf = Unpooled.buffer();        byte[] bytes = data.getBytes(StandardCharsets.UTF_8);        // 先写入数据的长度        buf.writeInt(bytes.length);        // 再写入完整的数据        buf.writeBytes(bytes);        // 写入通道        ctx.channel().writeAndFlush(buf);    }    /**     * 对消息解码,可能有多块数据粘包,所以解码每块消息后放入 out 集合中     */    public static void decode(Object msg, List<String> out) {        ByteBuf buf = (ByteBuf) msg;        // 校验消息长度,必须达到4个字节        if (buf.readableBytes() < 4) {            return;        }        // 标记当前可读位置,便于后面重新读取        buf.markReaderIndex();        // 先读取4个字节的int,代表消息的bytes长度        int len = buf.readInt();        // 检查是否有拆包,小于数据长度,说明数据不完整,等待后续的数据进来        if (buf.readableBytes() < len) {            // 还原读索引            buf.resetReaderIndex();            return;        }        // 读取指定长度的字节,多余的不读取,避免粘包问题        byte[] bytes = new byte[len];        buf.readBytes(bytes);        out.add(new String(bytes, StandardCharsets.UTF_8));        // 如果还有数据就继续读        if (buf.isReadable()) {            decode(msg, out);        }    }}

服务端、客户端读取和写入改造:

// 服务端static class NettyServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("channel read...");        // 读数据并处理请求        List<String> dataList = new ArrayList<>();        RequestUtil.decode(msg, dataList);        // 处理数据        for (String data : dataList) {            handleMsg(ctx, data);        }    }    private void handleMsg(ChannelHandlerContext ctx, String msg) {        System.out.println("Request data: " + msg);        // 响应客户端请求        System.out.println("channel writing...");        RequestUtil.writeAndFlush("Hello Netty Client!", ctx);        RequestUtil.writeAndFlush("Hello World!!!", ctx);    }}// 客户端static class NettyClientHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        System.out.println("channel active...");        RequestUtil.writeAndFlush("Hello Netty Server!", ctx);        RequestUtil.writeAndFlush("Hello World!!!", ctx);    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("channel read...");        List<String> dataList = new ArrayList<>();        RequestUtil.decode(msg, dataList);        // 处理数据        for (String data : dataList) {            handleMsg(ctx, data);        }    }    private void handleMsg(ChannelHandlerContext ctx, String msg) {        System.out.println("Response data: " + msg);    }}

服务端、客户端输出,通过输出可以看到这时处理的数据就是一个完整的数据了。

// 服务端channel read...Request data: Hello Netty Server!Request data: Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Response data: Hello World!!!Response data: Hello Netty Client!Response data: Hello World!!!

编码器和解码器

为了解决 TCP 粘包/拆包导致的半包读写问题,Netty 默认提供了多种编解码器用于处理半包,使用起来也非常方便。

LineBasedFrameDecoder

LineBasedFrameDecoder 行解码器 支持按换行符 /n/r/n 来读取字节数组,我们在写入数据的时候需要在一条数据的最后加上换行符,读取的时候 LineBasedFrameDecoder 已经自动去掉了换行符,无需再处理。

构造 LineBasedFrameDecoder 必须传入 maxLength 参数,表示最大长度,如果遍历超过这个长度的字节都没有找到分隔符就会报错。

public LineBasedFrameDecoder(final int maxLength) {    this(maxLength, true, false);}/** * @param maxLength 最大长度 * @param stripDelimiter 去除分隔符 * @param failFast 超过长度时是否直接报错 */public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {    this.maxLength = maxLength;    this.failFast = failFast;    this.stripDelimiter = stripDelimiter;}

只需要在服务端和客户端添加这个处理器即可:

.childHandler(new ChannelInitializer<SocketChannel>() {    @Override    protected void initChannel(SocketChannel socketChannel) throws Exception {        ChannelPipeline channelPipeline = socketChannel.pipeline();        // 添加一些处理器        channelPipeline                // 行解码器                .addLast(new LineBasedFrameDecoder(1024))                // 自定义的服务端处理器                .addLast(new NettyServerHandler());    }})

再看此时的服务端和客户端数据交互代码,可以看到写入数据的时候需要加上换行符,读数据的时候跟以前是一样的方式,但此时读出来的数据不会粘包在一起了。

static final String SEPARATOR = System.getProperty("line.separator");// 服务端static class NettyServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("channel read...");        // 读数据并处理请求        ByteBuf buf = (ByteBuf) msg;        byte[] bytes = new byte[buf.readableBytes()];        buf.readBytes(bytes);        System.out.println("Request data:" + new String(bytes, StandardCharsets.UTF_8));        // 响应请求        ctx.channel().writeAndFlush(Unpooled.copiedBuffer("Hello Netty Client!" + SEPARATOR, StandardCharsets.UTF_8));        ctx.channel().writeAndFlush(Unpooled.copiedBuffer("Hello World!!!" + SEPARATOR, StandardCharsets.UTF_8));    }}// 客户端static class NettyClientHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        System.out.println("channel active...");        // 写数据时加上分隔符        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Netty Server!" + SEPARATOR, StandardCharsets.UTF_8));        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello World!!!" + SEPARATOR, StandardCharsets.UTF_8));    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("channel read...");        ByteBuf buf = (ByteBuf) msg;        byte[] bytes = new byte[buf.readableBytes()];        buf.readBytes(bytes);        System.out.println("Response data:" + new String(bytes, StandardCharsets.UTF_8));    }}

通过 LineBasedFrameDecoder 的继承体系可以知道,它是一个通道输入处理器,用于处理输入数据。父类 ByteToMessageDecoder,从字面意思可以知道它是将字节转换为具体消息的解码器。

通过 ByteToMessageDecoder 的 channelRead 方法进去可以看到,它最终会调用子类的 decode() 方法来解码消息,并将消息放入 out 集合中。解析完后就会遍历每条数据触发通道的读事件,这样在后续的处理器中得到的 msg 就是一个完整的消息了。

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    if (msg instanceof ByteBuf) {        // 解析结果        CodecOutputList out = CodecOutputList.newInstance();        try {            // 解码            decode(ctx, msg, out);        } catch (Exception e) {            throw new DecoderException(e);        } finally {            try {                // 触发通道读事件                fireChannelRead(ctx, out, out.size());            } finally {                out.recycle();            }        }    } else {        ctx.fireChannelRead(msg);    }}// 抽象方法,消息解码protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out);static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {    for (int i = 0; i < numElements; i ++) {        // 传入每条消息,触发读事件        ctx.fireChannelRead(msgs.getUnsafe(i));    }}

接着看 LineBasedFrameDecoder 实现的父类 decode 方法,它首先会基于内存地址遍历字节,找到第一个分隔符的位置,然后读取对应长度的数据,默认跳过分隔符。

@Overrideprotected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {    Object decoded = decode(ctx, in);    if (decoded != null) {        out.add(decoded);    }}protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {    // 从缓冲区查找一行的结束位置,即分隔符的位置    final int eol = findEndOfLine(buffer);    if (eol >= 0) {        final ByteBuf frame;        // 结束符位置 - 当前位置 = 数据长度        final int length = eol - buffer.readerIndex();        // 分隔符长度        final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;        // 超过最大长度则报错        if (length > maxLength) {            buffer.readerIndex(eol + delimLength);            fail(ctx, length);            return null;        }        // 默认去除分隔符        if (stripDelimiter) {            // 读取指定长度的数据            frame = buffer.readRetainedSlice(length);            // 跳过分隔符            buffer.skipBytes(delimLength);        } else {            // 读取数据+分隔符            frame = buffer.readRetainedSlice(length + delimLength);        }        return frame;    } else {        //...    }}private int findEndOfLine(final ByteBuf buffer) {    int totalLength = buffer.readableBytes();    // 基于内存地址遍历字节 寻找分隔符    int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteProcessor.FIND_LF);    //...    return i;}

通过简单的分析源码可以了解到,LineBasedFrameDecoder 的工作原理是它依次遍历 ByteBuf 中的可读字节,判断看是否有\n 或者 \r\n, 如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。

StringDecoder & StringEncoder

前面在写入数据时,都是先将字符串写入一个 ByteBuf 缓冲区再写入 Channel;读取数据时都是将 msg 强转成 ByteBuf,再读取字节转换成字符串。可以看到这样转来转去很麻烦,我们可以添加内置的 StringDecoder 和 StringEncoder 来避免这种转换。

// 服务端channel read...Request data: Hello Netty Server!Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Hello World!!!0

再看此时的服务端和客户端数据交互代码,可以看到此时就是直接写入字符串,读取的时候直接将 msg 转成字符串来处理。这样就可以直接写入和读取字符串数据了,避免通过 ByteBuf 来读取。

// 服务端channel read...Request data: Hello Netty Server!Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Hello World!!!1

简单分析它的源码可知,写入数据时编码,就是将字符串写入一个 ByteBuf 中;读取数据时解码,读取 ByteBuf 中的字节数组再转成字符串。

// 服务端channel read...Request data: Hello Netty Server!Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Hello World!!!2

DelimiterBasedFrameDecoder

DelimiterBasedFrameDecoder 可以自动完成以分隔符做结束标志的消息的解码。

同样的方式,只需要在 ChannelPipeline 中添加 DelimiterBasedFrameDecoder 解码器即可,在构造 DelimiterBasedFrameDecoder 时需传入特定的分隔符标志。

// 服务端channel read...Request data: Hello Netty Server!Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Hello World!!!3

在写数据时,需在末尾添加分隔符标志。

// 服务端channel read...Request data: Hello Netty Server!Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Hello World!!!4

FixedLengthFrameDecoder

FixedLengthFrameDecoder 是固定长度解码器,能够按照指定的长度对消息进行自动解码。

// 服务端channel read...Request data: Hello Netty Server!Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Hello World!!!3

应用层协议

Netty 提供了多种协议支持,例如 HTTP、WebSocket、SSL 等等,我们可以通过它提供的组件快速开发出对应协议的服务器,而不用担心 TCP 粘包/拆包等网络问题。

HTTP 协议

HTTP 是建立在 TCP 传输协议之上的应用层协议,下面这段程序是基于 Netty 开发的一个 HTTP 服务器。

// 服务端channel read...Request data: Hello Netty Server!Hello World!!!// 客户端channel read...Response data: Hello Netty Client!Hello World!!!6

可以看到主要就是添加如下几个处理器来支持HTTP协议:

HttpRequestDecoder:HTTP 请求消息解码器,对 HTTP 请求行、请求头、请求体进行解析。

HttpObjectAggregator:聚合解码器,用于将多个消息转换为单一的 FullHttpRequest 或 FullHttpResponse,因为 HTTP 解码器会将每个 HTTP 消息解码生成多个消息对象。

HttpResponseEncoder:HTTP 响应消息编码器,对 HTTP 状态行、响应头、响应体 进行编码。

ChunkedWriteHandler:支持异步发送大的码流,例如大的文件传输,但不会占用过多的内存,防止内存溢出。

最后是添加自定义的业务处理器,处理 HTTP 请求和响应。

从 Netty 的源码可以了解到,Netty 提供了 http、http2、redis、mqtt、smtp、xml 等纵多主流协议的支持,基于 Netty 我们可以快速开发出相应的服务器和客户端程序,而不用担心网络问题的处理。

原文:https://juejin.cn/post/7100846192795811871


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/15912.html