最近作了一个项目,项目中用到Netty来接受一些自定义的报文。java
1、背景bootstrap
tcp是以流的方式进行传输,在流里咱们要判断消息的起始位置和结束位置。为了区分消息,每每采用下面的几种方式。less
Netty中针对以上的方案都有已经实现好的解码器做为解决方案。tcp
其中前三种比较简单。第四种有不少的参数比较复杂。ide
找到该类其中的一个构造方法,下面会根据一个实例来解释它的各个参数的用法,以下所示:oop
public LengthFieldBasedFrameDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { this( ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); }
咱们以一个实际的例子,来解释下各个参数,假设报文格式以下表:测试
语法this |
长度spa 位数日志 |
Event_Tag |
8 |
Event Length |
32 |
Event Number |
16 |
for(i=0;i<num;i++){ |
|
Event_id |
16 |
Event_parameters |
32 |
Event_time |
64 |
} |
|
SCID |
32 |
Event_CRC |
32 |
注:长度字段表示的长度是从长度字段之后到报文结束位置的长度。也就是长度字段的值没有把长度字段和长度字段之前的长度算进去。
下面咱们但愿使用Netty的LengthFieldBasedFFrameDecoder来为咱们屏蔽掉底层的拆包粘包的细节。
下面拿一段真实的报文,咱们试着解析一下。报文以下,16进制表示。
8e000000340003020200000065002018121114515902020000083600201812111452110202000008360020181211145247055db07a0e4d0fc0
Event_tag: 0x8e
Event length: 00000034 等于10进制的52。也就是从0x34后面的52字节是属于一条消息的。
00030202000000650020181211145159
02020000083600201812111452110202
000008360020181211145247055db07a
0e4d0fc0
上面的每一行是16字节,在加上最后的4字节正好是52个。
咱们用程序测试结果来讲明,各个参数设置不一样对结果的影响。
服务端代码:
final EventLoopGroup parentGroup = new NioEventLoopGroup(2); final EventLoopGroup workGroup = new NioEventLoopGroup(2); final ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(parentGroup, workGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(final SocketChannel ch) { ch.pipeline() .addLast(new LoggingHandler(LogLevel.INFO)) .addLast( new AudiencePacketFrameDecoder(ByteOrder.BIG_ENDIAN, MAX_FRAME_LENGTH, 1, 4, 0, 0, true)); } }).option(ChannelOption.SO_BACKLOG, 1024); try { final ChannelFuture channelFuture = bootstrap.bind(9090).sync(); if (channelFuture.isSuccess()) { LOGGER.info("server start"); } channelFuture.channel().closeFuture().sync(); } catch (final InterruptedException e) { LOGGER.info("", e); } finally { parentGroup.shutdownGracefully(); workGroup.shutdownGracefully(); }
咱们的channelPipleLine中只有两个Channelhandler,第一个是打印日志的,第二个就是咱们实现了LengthFiledBasedFrameDecoder的类。代码以下:
final ByteBuf frame = (ByteBuf) super.decode(ctx, in); if (null == frame) { return null; } LOGGER.info("hexString:" + ByteBufUtil.hexDump(frame));
咱们的代码中只是打印了解码的结果。
当各个参数的设置为
的时候,结果是
hexString:8e000000340003020200000065002018121114515902020000083600201812111452110202000008360020181211145247055db07a0e4d0fc0
对比原始报文发现全部的报文都打印出来了。
测试一:
咱们改变一下initialBytesToStrip 为5 看一下结果,其余位置不变
结果:
hexString:0003020200000065002018121114515902020000083600201812111452110202000008360020181211145247055db07a0e4d0fc0
对比原始报文发现少了8e00000034五个字节,咱们从上可知,initialBytesToStrip 能帮助咱们把报文的头部截取掉,或者截取一下与业务无关的数据。
测试二:
假设咱们的长度字段把其自己和它以前的字段长度也包括进去了。
之前的报文:
8e000000340003020200000065002018121114515902020000083600201812111452110202000008360020181211145247055db07a0e4d0fc0
长度字段是00000034=52,它没有包含本身的长度和Event_tag的长度。下面咱们稍做改变。假设它包含本身的长度。报文以下:
8e000000390003020200000065002018121114515902020000083600201812111452110202000008360020181211145247055db07a0e4d0fc0
注意长度字段变为 00000039。
下面参数不变,咱们看程序的结果:
执行的时候,咱们发现日志居然没有打印。这是为何呢?咱们试着改变一下参数。咱们把lengthAdjustment 设置为 --5,以下所示:
hexString:0003020200000065002018121114515902020000083600201812111452110202000008360020181211145247055db07a0e4d0fc0
咱们发现结果又能够正常打印出来了。致使结果不同的缘由只有lengthAdjustment 参数的值,设置为0的时候咱们打印不出结果,设置为-5的时候能正常打印出来。
看源码的过程当中,发现这么一句。
报文的长度 += lengthAdjustment+ 长度字段结束位置的偏移量。
这个报文的长度就是咱们返回的数据。当咱们把lengthAdjustment 设置为0的时候,
报文的长度= 报文的长度(00000039=57)+0+5=62,咱们的报文的总长度才为57,因此解码不出来结果。
当咱们把lengthAdjustment 设置为--5的时候,
报文的长度=报文的长度(00000039=57)+(--5)+5=57。这样就能解析出结果了。
综合所述,若是报文中的长度字段的值包含了长度字段自己,那么设置补偿字段长度的时候应该把这个长度减去。
例如咱们例子中的 长度字段加Tag的长度为5,长度字段的值为 00000039=57里面包含了这个长度字段的长度。因此Netty提供了这个参数来减去长度字段的长度值。
2、自定义拆包
因为业务需求,我须要把两种报文合在一块儿当成一个报文。这样我就没有办法用Netty提供给咱们使用的解码器来出来拆包,粘包了。
思路就是,从流里读,只有读到符合本身要求的格式的数据的时候才往下传,不然就丢弃字节或等待。
代码以下:
public class UnpackingDecoder extends ByteToMessageDecoder { private final static Logger LOGGER = LoggerFactory.getLogger(UnpackingDecoder.class); int count=0; @Override protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) { LOGGER.info("start UnpackingDecoder!"); LOGGER.info("unpacking execute count:"+ count); count+=1; try { // 这里是防止出现 咱们不想要的报文 while (true) { if (in.readableBytes() < 2) { return; } if ((in.getUnsignedByte(in.readerIndex()) ^ 0x8E) == 0 || (in.getUnsignedShort(in.readerIndex()) ^ 0x020E) == 0) { break; } LOGGER.warn("this frame is undefined:"+ByteBufUtil.hexDump(in,in.readerIndex(),1)); in.skipBytes(1); } // 根据报文的开头肯定是哪一种报文, if ((in.getUnsignedByte(in.readerIndex()) ^ 0x8E) == 0) { LOGGER.info("this package is begin with 0x8E."); if (in.readableBytes() < 5) { LOGGER.info("buffer size is less than 5,return"); return; } int businessTagByteSize = 1; int businessLengthFiledSize = 4; int sigTagByteSize = 1; int sigLengFiledSize = 2; long businessLength = in.getUnsignedInt(in.readerIndex() + businessTagByteSize); LOGGER.info("business frame length filed:" + ByteBufUtil .hexDump(in, in.readerIndex() + businessTagByteSize, businessLengthFiledSize)); int businessFrameLen = (int) (businessTagByteSize + businessLengthFiledSize + businessLength); if (in.readableBytes() < (businessFrameLen + sigTagByteSize + sigLengFiledSize)) { LOGGER.info("this package is not enough,sign data is not enough."); return; } int sigLen = in.getUnsignedShort(in.readerIndex() + businessFrameLen + sigTagByteSize); LOGGER.info("signature frame length filed:" + ByteBufUtil .hexDump(in, in.readerIndex() + businessFrameLen + sigTagByteSize, sigLengFiledSize)); int signFrameLen = sigTagByteSize + sigLengFiledSize + sigLen; int totalFrameSize = businessFrameLen + signFrameLen; LOGGER.info("total frame length" + totalFrameSize); if (in.readableBytes() < totalFrameSize) { LOGGER.info("this package is not enough,sign data is not enough."); return; } // 若是找到了须要的报文,那么放到out种,调用下面的解码器来处理。 ByteBuf frame = in.slice(in.readerIndex(), totalFrameSize).retain(); in.skipBytes(totalFrameSize); out.add(frame); } else { LOGGER.info("this package is begin with 0x020E."); if (in.readableBytes() < 13) { return; } // 与上面相同 ByteBuf frame = in.slice(in.readerIndex(), 13).retain(); in.skipBytes(13); out.add(frame); } } catch (final Throwable t) { LOGGER.error("UnpackingDecoder Error!", t); } LOGGER.info("end UnpackingDecoder!"); } }