Netty解决粘包半包问题

一.什么是TCP粘包半包

客户端发送数据包给服务端,因服务端一次读取到的字节数是不肯定的,有好几种状况.java

  1. 服务端分两次读取到了两个独立的数据包,没有粘包和拆包;
  2. 服务端一次接收到了两个数据包,粘合在一块儿,被称为 TCP 粘包;
  3. 服务端分两次读取到了两个数据包, 第一次读取到了完整的包和另一个包的部份内容,第二次读取到了另外一个包的剩余内容, 这被称为 TCP 拆包;
  4. 服务端分两次读取到了两个数据包, 第一次读取到了包的部份内容 , 第二次读取到了以前未读完的包剩余内容和另外一个包,发生了拆包和粘包。
  5. 服务端 TCP 接收滑动窗口很小,数据包比较大, 即服务端分屡次才能将 包接收彻底,发生屡次拆包

二.粘包半包的缘由

1.粘包

TCP协议:自己是 面向链接的可靠地协议-三次握手机制。算法

客户端与服务器会维持一个链接(Channel) ,在链接不断开的状况下, 能够将多个数据包发往服务器,可是发送的网络数据包过小, 那么自己会启用 Nagle 算法(可配置是否启用) 对较小的数据包进行合并(所以,TCP 的网络延迟要 UDP 的高些)而后再发送(超时或者包大小足够)。bootstrap

服务器在接收到消息(数据流)的时候就没法区分哪些数据包是客户端本身分开发送的,这样产生了粘包;
服务器在接收到数据后,放到缓冲区中,若是消息没有被及时从缓存区取走,下次在取数据的时候可能就会出现一次取出多个
数据包的状况,形成粘包现象。缓存


UDP: 自己做为无链接的不可靠的传输协议(适合频繁发送较小的数据包) , 他不会对数据包进行合并发送,直接是一端发送什么数据, 直接就发出去了, 既然他不会对数据合并, 每个数据包都是完整的(数据+UDP 头+IP 头等等发一 次数据封装一次) 也就没有粘包了。服务器


2.半包

分包产生的缘由:多是IP分片传输致使的, 也多是传输过程当中丢失部 分包致使出现的半包, 还有可能就是一个包可能被分红了两次传输, 在取数据的时候,先取到了一部分(还可能与接收的缓冲区大小有关系) , 总之就是一个数据包被分红了屡次接收。
更具体的缘由有三个, 分别以下。网络

  1. 应用程序写入数据的字节大小大于套接字发送缓冲区的大小
  2. 进行 MSS 大小的 TCP 分段。 MSS 是最大报文段长度的缩写。 MSS 是 TCP 报文段中的数据字段的最大长度。 数据字段加上 TCP 首部才等于整个的 TCP 报文段。 因此 MSS 并非

TCP 报文段的最大长度, 而是: MSS=TCP 报文段长度-TCP 首部长度并发

  1. 以太网的 payload 大于 MTU 进行 IP 分片。 MTU 指: 一种通讯协议的某一层上面所能

经过的最大数据包大小。 若是 IP 层有一个数据包要传, 并且数据的长度比链路层的 MTU 大,
那么 IP 层就会进行分片, 把数据包分红托干片, 让每一片都不超过 MTU。 注意, IP 分片可
以发生在原始发送端主机上, 也能够发生在中间路由器上。框架


3.解决粘包半包问题异步

因为底层的 TCP 没法理解上层的业务数据, 因此在底层是没法保证数据包不被拆分和重组的, 这个问题只能经过上层的应用协议栈设计来解决。业界的主流协议的解决方案,能够概括以下。
(1) 在包尾增长分割符, 好比回车换行符进行分割, 例如 FTP 协议;
(2) 消息定长, 例如每一个报文的大小为固定长度 200 字节, 若是不够, 空位补空格;
(3) 将消息分为消息头和消息体, 消息头中包含表示消息总长度(或者消息体长度)的字段, 一般设计思路为消息头的第一个字段使用 int32 来表示消息的总长度,LengthFieldBasedFrameDecoder。socket

下面列举一个包尾增长分隔符的例子:

服务端程序:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 入站处理器
 */
@ChannelHandler.Sharable
public class DelimiterServerHandler extends ChannelInboundHandlerAdapter {

    private AtomicInteger counter = new AtomicInteger(0);
    private AtomicInteger completeCounter = new AtomicInteger(0);

    /*** 服务端读取到网络数据后的处理*/
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf)msg;
        String request = in.toString(CharsetUtil.UTF_8);
        System.out.println("Server Accept["+request
                +"] and the counter is:"+counter.incrementAndGet());
        String resp = "Hello,"+request+". Welcome to Netty World!"
                + DelimiterEchoServer.DELIMITER_SYMBOL;
        ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
    }

    /*** 服务端读取完成网络数据后的处理*/
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)
            throws Exception {
        ctx.fireChannelReadComplete();
        System.out.println("the ReadComplete count is "
                +completeCounter.incrementAndGet());
    }

    /*** 发生异常后的处理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;

import java.net.InetSocketAddress;

/**
 * 服务端
 */
public class DelimiterEchoServer {

    public static final String DELIMITER_SYMBOL = "@~";
    public static final int PORT = 9997;

    public static void main(String[] args) throws InterruptedException {
        DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer();
        System.out.println("服务器即将启动");
        delimiterEchoServer.start();
    }

    public void start() throws InterruptedException {
        final DelimiterServerHandler serverHandler = new DelimiterServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
        try {
            ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
            b.group(group)/*将线程组传入*/
                .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
                .localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/
                /*服务端每接收到一个链接请求,就会新启一个socket通讯,也就是channel,
                因此下面这段代码的做用就是为这个子channel增长handle*/
                .childHandler(new ChannelInitializerImp());
            ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
            System.out.println("服务器启动完成,等待客户端的链接和数据.....");
            f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
        } finally {
            group.shutdownGracefully().sync();/*优雅关闭线程组*/
        }
    }

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL
                    .getBytes());
            ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,
                    delimiter));
            ch.pipeline().addLast(new DelimiterServerHandler());
        }
    }

}

客户端程序

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;


/**
 * 入站处理器
 */
public class DelimiterClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private AtomicInteger counter = new AtomicInteger(0);

    /*** 客户端读取到网络数据后的处理*/
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
                +"] and the counter is:"+counter.incrementAndGet());
    }

    /*** 客户端被通知channel活跃后,作事*/
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg = null;
        String request = "Mark,Lison,Peter,James,Deer"
                +  DelimiterEchoServer.DELIMITER_SYMBOL;
        for(int i=0;i<10;i++){
            msg = Unpooled.buffer(request.length());
            msg.writeBytes(request.getBytes());
            ctx.writeAndFlush(msg);
        }
    }

    /*** 发生异常后的处理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;

/**
 * 客户端
 */
public class DelimiterEchoClient {

    private final String host;

    public DelimiterEchoClient(String host) {
        this.host = host;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
        try {
            final Bootstrap b = new Bootstrap();;/*客户端启动必须*/
            b.group(group)/*将线程组传入*/
                    .channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/
                    .remoteAddress(new InetSocketAddress(host,DelimiterEchoServer.PORT))/*配置要链接服务器的ip地址和端口*/
                    .handler(new ChannelInitializerImp());
            ChannelFuture f = b.connect().sync();
            System.out.println("已链接到服务器.....");
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ByteBuf delimiter
                    = Unpooled.copiedBuffer(DelimiterEchoServer.DELIMITER_SYMBOL
                    .getBytes());
            ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,
                    delimiter));
            ch.pipeline().addLast(new DelimiterClientHandler());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new DelimiterEchoClient("127.0.0.1").start();
    }
}

关键代码:
1.创建链接后,客户端给服务端发数据包,每次发送已特殊字符`
@~结尾。
image.png

2.服务端收到数据包后通过DelimiterBasedFrameDecoder即分隔符基础框架解码器解码为一个个带有分隔符的数据包。
image.png

3.再到服务端的业务层处理器DelimiterServerHandler
image.png