Netty和Mina同样是NIO通信工具框架,有必定区别也有必定类似之处。Netty和Mina在数据包处理、传输时都有可能会出现粘包和断包的状况,下图对粘包、断包进行描述。
如下是网上对粘包的解决方式:
一、消息定长,例如每一个报文的大小为固定长度200字节,若是不够,空位补空格。
二、在包尾增长回车换行符进行分割,例如FTP协议。
三、将消息分为消息头和消息体,消息头中包含消息长度的字段,一般设计思路为消息头的第一个字段使用int32来表示消息的总长。java
根据自身业务状况可选择具体适用的处理方式。Netty自身提供了部分处理方式,但仍没法知足全部状况,而网上多未对断包进行具体处理,这里我经过对上述第三种方式进行扩展加强实现Netty数据传输粘包、断包问题解决(注:对于较大数据包,如:文件数据传送 须要具体分析具体设计)。ios
代码中中Netty版本为4.1.6,如下为实际代码。
Netty Server代码:web
package com.netty.tcp.server.core; import com.netty.resolve.MessageDecoder; import com.netty.resolve.MessageEncoder; import com.netty.tcp.server.processor.ServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class NioSocketServer { private static NioSocketServer server;//声明server单例对象 private ServerBootstrap bootstrap; private EventLoopGroup boss; private EventLoopGroup worker; private ChannelFuture channelFuture; private NioSocketServer(int port) { bootstrap = new ServerBootstrap(); boss = new NioEventLoopGroup(); worker = new NioEventLoopGroup(); bootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024)//容许链接数 .localAddress(port) //server监听端口 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new MessageEncoder()) .addLast(new MessageDecoder()) .addLast(new ServerHandler()); } }); } public static NioSocketServer getInstance(int port) { if(server == null) server = new NioSocketServer(port); return server; } //打开server public boolean bind() throws InterruptedException { channelFuture = bootstrap.bind().sync(); channelFuture.channel().closeFuture().sync(); return channelFuture.isSuccess(); } //关闭server public void close() throws InterruptedException { channelFuture.channel().close(); boss.shutdownGracefully(); worker.shutdownGracefully(); } }
Server Handler代码:bootstrap
package com.netty.tcp.server.processor; import java.nio.charset.Charset; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * @ProjectName:NettyNote * @ClassName:ServerHandler * @author lujiafa * @email lujiafayx@163.com * @date 2017年1月12日 * @Description: 服务端处理 */ public class ServerHandler extends ChannelInboundHandlerAdapter { public void channelActive(ChannelHandlerContext ctx) throws Exception { //与客户端创建链接后 System.out.println("服务端-客户端链接通道激活!"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String str = null; if(msg instanceof ByteBuf) { ByteBuf bb = (ByteBuf)msg; str = bb.toString(Charset.forName("UTF-8")); }else if(msg != null){ str = msg.toString(); } System.out.println("服务端收到客户端消息:" + str); } /** * @description: 当读取不到消息后时触发(注:会受到粘包、断包等影响,因此未必是客户定义的一个数据包读取完成即调用) */ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("服务端读取客户端上送消息完毕!"); ctx.channel().writeAndFlush(Unpooled.copiedBuffer("server has been received.", Charset.forName("UTF-8"))); } /** * @description: 异常捕捉方法 */ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("服务端通道异常,异常消息:" + cause.getMessage()); ctx.close(); } }
Netty Client代码:框架
package com.netty.tcp.client.core; import java.util.HashMap; import java.util.Map; import com.netty.resolve.MessageDecoder; import com.netty.resolve.MessageEncoder; import com.netty.tcp.client.processor.ClientHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NioSocketClient { //存放NIO客户端实例对象 private static Map<String, NioSocketClient> map = new HashMap<String, NioSocketClient>(); private String serverIp; private int serverPort; private Bootstrap bootstrap; private EventLoopGroup group; private ChannelFuture channelFuture; private NioSocketClient(String ip, int port) throws InterruptedException { this.serverIp = ip; this.serverPort = port; bootstrap = new Bootstrap(); group = new NioEventLoopGroup(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new MessageEncoder()) .addLast(new MessageDecoder()) .addLast(new ClientHandler()); } }); channelFuture = bootstrap.connect(serverIp,serverPort).sync(); } /** * @description ip和port惟一肯定对同一NIO Server同ip、port只建立一次 * @param ip * @param port * @return nio客户端对象 * @throws InterruptedException */ public static NioSocketClient getInstance(String ip, int port) throws InterruptedException { NioSocketClient niosocket = null; if(ip != null && ip.matches("^((?:(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d)))\\.){3}(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d))))") && String.valueOf(port).matches("^([0-9]|[1-9]\\d|[1-9]\\d{2}|[1-9]\\d{3}|[1-5]\\d{4}|6[0-4]\\d{3}|65[0-4]\\d{2}|655[0-2]\\d|6553[0-5])$")){ String key = ip + ":" + port; if(map.containsKey(key)){ niosocket = map.get(key); }else{ niosocket = new NioSocketClient(ip, port); map.put(key, niosocket); } } return niosocket; } public void write(Object obj) { if(!channelFuture.channel().isOpen() || !channelFuture.channel().isWritable() || obj == null) return; channelFuture.channel().writeAndFlush(obj); } public void close() throws InterruptedException { channelFuture.channel().closeFuture().sync(); group.shutdownGracefully(); map.remove(serverIp + ":" + serverPort); } }
Client Handler代码:socket
package com.netty.tcp.client.processor; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ClientHandler extends ChannelInboundHandlerAdapter{ public void channelActive(ChannelHandlerContext ctx) throws Exception { //与服务端创建链接后 System.out.println("客户端-服务端链接通道激活!"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String str = null; if(msg instanceof ByteBuf) { ByteBuf bb = (ByteBuf)msg; byte[] b = new byte[bb.readableBytes()]; bb.readBytes(b); str = new String(b, "UTF-8"); }else if(msg != null){ str = msg.toString(); } System.out.println("客户端收到消息:" + str); } /** * @description: 当读取不到消息后时触发(注:会受到粘包、断包等影响,因此未必是客户定义的一个数据包读取完成即调用) */ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("客服端读取服务端下发消息完毕!"); } /** * @description: 异常捕捉方法 */ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("客户端通道异常,异常消息:" + cause.getMessage()); ctx.close(); } }
如下两个类为Netty粘包、断包处理核心类。tcp
package com.netty.resolve; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.List; import com.netty.utils.FormatUtils; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; /** * @ClassName:MessageEncoder * @author lujiafa * @email lujiafayx@163.com * @date 2017年1月12日 * @Description: 自定义报文编码器 */ public class MessageEncoder extends MessageToMessageEncoder<Object> { @Override protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { if(msg == null) return; ByteBuf tb = null; if(msg instanceof byte[]) { tb = Unpooled.copiedBuffer((byte[])msg); }else if(msg instanceof ByteBuf) { tb = (ByteBuf) msg; }else if(msg instanceof ByteBuffer) { tb = Unpooled.copiedBuffer((ByteBuffer)msg); }else { String ostr = msg.toString(); tb = Unpooled.copiedBuffer(ostr, Charset.forName("UTF-8")); } byte[] pkg = new byte[4 + tb.readableBytes()];//数据包 byte[] header = FormatUtils.intToBytes(tb.readableBytes());//报文包头 byte[] body = new byte[tb.readableBytes()];//包体 tb.readBytes(body); System.arraycopy(header, 0, pkg, 0, header.length); System.arraycopy(body, 0, pkg, header.length, body.length); out.add(Unpooled.copiedBuffer(pkg)); } }
package com.netty.resolve; import java.util.List; import javax.xml.bind.DatatypeConverter; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; /** * @ClassName:MessageDecoder * @author lujiafa * @email lujiafayx@163.com * @date 2017年1月12日 * @Description: 自定义报文解码器 */ public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> { private byte[] remainingBytes; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { ByteBuf currBB = null; if(remainingBytes == null) { currBB = msg; }else { byte[] tb = new byte[remainingBytes.length + msg.readableBytes()]; System.arraycopy(remainingBytes, 0, tb, 0, remainingBytes.length); byte[] vb = new byte[msg.readableBytes()]; msg.readBytes(vb); System.arraycopy(vb, 0, tb, remainingBytes.length, vb.length); currBB = Unpooled.copiedBuffer(tb); } while(currBB.readableBytes() > 0) { if(!doDecode(ctx, currBB, out)) { break; } } if(currBB.readableBytes() > 0) { remainingBytes = new byte[currBB.readableBytes()]; currBB.readBytes(remainingBytes); }else { remainingBytes = null; } } /** * @Title:doDecode * @Description: 此方法处理同mina中doDecode方法 * @param ctx * @param msg * @param out * @return boolean */ private boolean doDecode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) { if(msg.readableBytes() < 4) return false; msg.markReaderIndex(); byte[] header = new byte[4]; msg.readBytes(header); int len = Integer.parseInt(DatatypeConverter.printHexBinary(header), 16); if(msg.readableBytes() < len) { msg.resetReaderIndex(); return false; } byte[] body = new byte[len]; msg.readBytes(body); out.add(Unpooled.copiedBuffer(body)); if(msg.readableBytes() > 0) return true; return false; } }