用Netty本身写拆包粘包解码器

最近作了一个项目,项目中用到Netty来接受一些自定义的报文。java

1、背景bootstrap

tcp是以流的方式进行传输,在流里咱们要判断消息的起始位置和结束位置。为了区分消息,每每采用下面的几种方式。less

  1. 消息有固定的长度
  2. 换行符作分隔
  3. 用一个特殊的分隔符来分隔
  4. 在消息头中增长length字段

Netty中针对以上的方案都有已经实现好的解码器做为解决方案。tcp

  1. 针对有固定长度的消息,Netty提供了FixedlengthFrameDecoder解决。
  2. 针对以回车换行符做为消息结束符的,Netty提供了LineBasedFrameDecoder。
  3. 针对以分隔符做为消息结束符的,Netty提供了DelimiterBasedFrameDecoder。
  4. 针对消息头中的放长度字段来分隔的,Netty提供了LengthFieldBasedFrameDecoder。

其中前三种比较简单。第四种有不少的参数比较复杂。ide

找到该类其中的一个构造方法,下面会根据一个实例来解释它的各个参数的用法,以下所示:oop

public LengthFieldBasedFrameDecoder(
            int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
            int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
        this(
                ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength,
                lengthAdjustment, initialBytesToStrip, failFast);
    }
  • 第一个参数 maxFrameLength: 指定容许的最大长度
  • 第二个参数lengthFieldOffset:长度字段在报文中的偏移量。
  • 第三个参数lengthFieldLength: 长度字段的长度。
  • 第四个参数lengthAdjustment:长度字段的补偿长度。这个很差理解,下面会根据例子解释。这里先没必要深究。
  • 第五个参数initialBytesToStrip: 从报文开始位置截取的长度。
  • 第六个参数failFast:是指定ToLongFrameException该怎么报出来。当为true时即便流未读可是发现长度超过了咱们设置的第一个参数就报错,当为false的时候,只有读超过咱们设置的长度的时候才报错。

咱们以一个实际的例子,来解释下各个参数,假设报文格式以下表:测试

语法this

长度spa

位数日志

Event_Tag

8

Event Length

32

Event Number

16

for(i=0;i<num;i++){

 

Event_id

16

Event_parameters

32

Event_time

64

}

 

SCID

32

Event_CRC

32

注:长度字段表示的长度是从长度字段之后到报文结束位置的长度。也就是长度字段的值没有把长度字段和长度字段之前的长度算进去。

下面咱们但愿使用Netty的LengthFieldBasedFFrameDecoder来为咱们屏蔽掉底层的拆包粘包的细节。

  • 第一个参数:设置成1024*1024。这个根据须要设置。
  • 第二个参数:咱们的长度字段是Event Length,在报文中的位置是第二个字节。因此咱们设置该值为1。若是Event_Tag的长度位数为16。那么该值就应该设置为2了。
  • 第三个参数:长度字段的长度,咱们是32位的长度,是4字节,因此这个参数咱们设置为4。
  • 第四个参数:长度字段的补偿字段,因为咱们长度不包含长度字段自己的长度,因此咱们暂且设置为0。
  • 第五个参数:从开始位置开始截取的长度。咱们也暂且设置为0;
  • 第六个参数:设置为true.

下面拿一段真实的报文,咱们试着解析一下。报文以下,16进制表示。

8e000000340003020200000065002018121114515902020000083600201812111452110202000008360020181211145247055db07a0e4d0fc0

Event_tag: 0x8e

Event length: 00000034 等于10进制的52。也就是从0x34后面的52字节是属于一条消息的。

00030202000000650020181211145159

02020000083600201812111452110202

000008360020181211145247055db07a

0e4d0fc0

上面的每一行是16字节,在加上最后的4字节正好是52个。

咱们用程序测试结果来讲明,各个参数设置不一样对结果的影响。

服务端代码:

final EventLoopGroup parentGroup = new NioEventLoopGroup(2);
        final EventLoopGroup workGroup = new NioEventLoopGroup(2);
        final ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(parentGroup, workGroup).channel(NioServerSocketChannel.class)
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     protected void initChannel(final SocketChannel ch) {
                         ch.pipeline()
                           .addLast(new LoggingHandler(LogLevel.INFO))
                           .addLast(
                               new AudiencePacketFrameDecoder(ByteOrder.BIG_ENDIAN, MAX_FRAME_LENGTH, 1, 4, 0, 0, true));
                     }
                 }).option(ChannelOption.SO_BACKLOG, 1024);
        try {
            final ChannelFuture channelFuture = bootstrap.bind(9090).sync();
            if (channelFuture.isSuccess()) {
                LOGGER.info("server start");
            }
            channelFuture.channel().closeFuture().sync();

        } catch (final InterruptedException e) {
            LOGGER.info("", e);
        } finally {
            parentGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }

咱们的channelPipleLine中只有两个Channelhandler,第一个是打印日志的,第二个就是咱们实现了LengthFiledBasedFrameDecoder的类。代码以下:

final ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
    return null;
}
LOGGER.info("hexString:" + ByteBufUtil.hexDump(frame));

咱们的代码中只是打印了解码的结果。

当各个参数的设置为 

的时候,结果是

hexString:8e000000340003020200000065002018121114515902020000083600201812111452110202000008360020181211145247055db07a0e4d0fc0

对比原始报文发现全部的报文都打印出来了。

测试一:

咱们改变一下initialBytesToStrip 为5 看一下结果,其余位置不变

结果:

hexString:0003020200000065002018121114515902020000083600201812111452110202000008360020181211145247055db07a0e4d0fc0

对比原始报文发现少了8e00000034五个字节,咱们从上可知,initialBytesToStrip 能帮助咱们把报文的头部截取掉,或者截取一下与业务无关的数据。

测试二:

假设咱们的长度字段把其自己和它以前的字段长度也包括进去了。

之前的报文:

8e000000340003020200000065002018121114515902020000083600201812111452110202000008360020181211145247055db07a0e4d0fc0

长度字段是00000034=52,它没有包含本身的长度和Event_tag的长度。下面咱们稍做改变。假设它包含本身的长度。报文以下:

8e000000390003020200000065002018121114515902020000083600201812111452110202000008360020181211145247055db07a0e4d0fc0

注意长度字段变为 00000039。

下面参数不变,咱们看程序的结果:


 

执行的时候,咱们发现日志居然没有打印。这是为何呢?咱们试着改变一下参数。咱们把lengthAdjustment 设置为 --5,以下所示:

hexString:0003020200000065002018121114515902020000083600201812111452110202000008360020181211145247055db07a0e4d0fc0

咱们发现结果又能够正常打印出来了。致使结果不同的缘由只有lengthAdjustment 参数的值,设置为0的时候咱们打印不出结果,设置为-5的时候能正常打印出来。

看源码的过程当中,发现这么一句。

报文的长度 += lengthAdjustment+ 长度字段结束位置的偏移量。

这个报文的长度就是咱们返回的数据。当咱们把lengthAdjustment 设置为0的时候, 

报文的长度= 报文的长度(00000039=57)+0+5=62,咱们的报文的总长度才为57,因此解码不出来结果。

当咱们把lengthAdjustment 设置为--5的时候,

报文的长度=报文的长度(00000039=57)+(--5)+5=57。这样就能解析出结果了。

综合所述,若是报文中的长度字段的值包含了长度字段自己,那么设置补偿字段长度的时候应该把这个长度减去。

例如咱们例子中的 长度字段加Tag的长度为5,长度字段的值为 00000039=57里面包含了这个长度字段的长度。因此Netty提供了这个参数来减去长度字段的长度值。

 

2、自定义拆包

因为业务需求,我须要把两种报文合在一块儿当成一个报文。这样我就没有办法用Netty提供给咱们使用的解码器来出来拆包,粘包了。

思路就是,从流里读,只有读到符合本身要求的格式的数据的时候才往下传,不然就丢弃字节或等待。

代码以下:

public class UnpackingDecoder extends ByteToMessageDecoder {

    private final static Logger LOGGER = LoggerFactory.getLogger(UnpackingDecoder.class);

    int count=0;

    @Override
    protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
        LOGGER.info("start UnpackingDecoder!");
        LOGGER.info("unpacking execute count:"+ count);
        count+=1;
        try {
            // 这里是防止出现 咱们不想要的报文
            while (true) {
                if (in.readableBytes() < 2) {
                    return;
                }
                if ((in.getUnsignedByte(in.readerIndex()) ^ 0x8E)
                    == 0 || (in.getUnsignedShort(in.readerIndex()) ^ 0x020E) == 0) {
                    break;
                }
                LOGGER.warn("this frame is undefined:"+ByteBufUtil.hexDump(in,in.readerIndex(),1));
                in.skipBytes(1);
            }
            // 根据报文的开头肯定是哪一种报文,
            if ((in.getUnsignedByte(in.readerIndex()) ^ 0x8E) == 0) {
                LOGGER.info("this package is begin with 0x8E.");
                if (in.readableBytes() < 5) {
                    LOGGER.info("buffer size is less than 5,return");
                    return;
                }
                int businessTagByteSize = 1;
                int businessLengthFiledSize = 4;
                int sigTagByteSize = 1;
                int sigLengFiledSize = 2;
                long businessLength = in.getUnsignedInt(in.readerIndex() + businessTagByteSize);
                LOGGER.info("business frame length filed:" + ByteBufUtil
                    .hexDump(in, in.readerIndex() + businessTagByteSize, businessLengthFiledSize));
                int businessFrameLen = (int) (businessTagByteSize + businessLengthFiledSize + businessLength);
                if (in.readableBytes() < (businessFrameLen + sigTagByteSize + sigLengFiledSize)) {
                    LOGGER.info("this package is not enough,sign data is not enough.");
                    return;
                }
                int sigLen = in.getUnsignedShort(in.readerIndex() + businessFrameLen + sigTagByteSize);
                LOGGER.info("signature frame length filed:" + ByteBufUtil
                    .hexDump(in, in.readerIndex() + businessFrameLen + sigTagByteSize, sigLengFiledSize));
                int signFrameLen = sigTagByteSize + sigLengFiledSize + sigLen;
                int totalFrameSize = businessFrameLen + signFrameLen;
                LOGGER.info("total frame length" + totalFrameSize);
                if (in.readableBytes() < totalFrameSize) {
                    LOGGER.info("this package is not enough,sign data is not enough.");
                    return;
                }
                // 若是找到了须要的报文,那么放到out种,调用下面的解码器来处理。
                ByteBuf frame = in.slice(in.readerIndex(), totalFrameSize).retain();
                in.skipBytes(totalFrameSize);
                out.add(frame);
            } else {
                LOGGER.info("this package is begin with 0x020E.");
                if (in.readableBytes() < 13) {
                    return;
                }
                // 与上面相同
                ByteBuf frame = in.slice(in.readerIndex(), 13).retain();
                in.skipBytes(13);
                out.add(frame);
            }
        } catch (final Throwable t) {
            LOGGER.error("UnpackingDecoder Error!", t);
        }
        LOGGER.info("end UnpackingDecoder!");
    }
}