Netty 粘包、半包解决

Netty和Mina同样是NIO通信工具框架,有必定区别也有必定类似之处。Netty和Mina在数据包处理、传输时都有可能会出现粘包和断包的状况,下图对粘包、断包进行描述。
在这里插入图片描述
如下是网上对粘包的解决方式:
一、消息定长,例如每一个报文的大小为固定长度200字节,若是不够,空位补空格。
二、在包尾增长回车换行符进行分割,例如FTP协议。
三、将消息分为消息头和消息体,消息头中包含消息长度的字段,一般设计思路为消息头的第一个字段使用int32来表示消息的总长。java

根据自身业务状况可选择具体适用的处理方式。Netty自身提供了部分处理方式,但仍没法知足全部状况,而网上多未对断包进行具体处理,这里我经过对上述第三种方式进行扩展加强实现Netty数据传输粘包、断包问题解决(注:对于较大数据包,如:文件数据传送 须要具体分析具体设计)。ios

代码中中Netty版本为4.1.6,如下为实际代码。
Netty Server代码:web

package com.netty.tcp.server.core;
 
import com.netty.resolve.MessageDecoder;
import com.netty.resolve.MessageEncoder;
import com.netty.tcp.server.processor.ServerHandler;
 
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
 
public class NioSocketServer {
	
	private static NioSocketServer server;//声明server单例对象
	
	private ServerBootstrap bootstrap;
	private EventLoopGroup boss;
	private EventLoopGroup worker;
	private ChannelFuture channelFuture;
	
	private NioSocketServer(int port) {
		bootstrap = new ServerBootstrap();
		boss = new NioEventLoopGroup();
		worker = new NioEventLoopGroup();
		bootstrap.group(boss, worker)
		.channel(NioServerSocketChannel.class)
		.option(ChannelOption.SO_BACKLOG, 1024)//容许链接数
		.localAddress(port) //server监听端口
		.handler(new LoggingHandler(LogLevel.INFO))
		.childHandler(new ChannelInitializer<SocketChannel>() {
			protected void initChannel(SocketChannel ch) throws Exception {
				ch.pipeline()
	            	.addLast(new MessageEncoder())
	            	.addLast(new MessageDecoder())
					.addLast(new ServerHandler());
			}
		});
	}
	
	public static NioSocketServer getInstance(int port) {
		if(server == null)
			server = new NioSocketServer(port);
		return server;
	}
	
	//打开server
	public boolean bind() throws InterruptedException {
		channelFuture = bootstrap.bind().sync();
		channelFuture.channel().closeFuture().sync();
		return channelFuture.isSuccess();
	}
	
	//关闭server
	public void close() throws InterruptedException {
		channelFuture.channel().close();
		boss.shutdownGracefully();
		worker.shutdownGracefully();
	}
 
}

Server Handler代码:bootstrap

package com.netty.tcp.server.processor;
 
import java.nio.charset.Charset;
 
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
 
/**
 * @ProjectName:NettyNote
 * @ClassName:ServerHandler
 * @author lujiafa
 * @email lujiafayx@163.com
 * @date 2017年1月12日
 * @Description: 服务端处理
 */
public class ServerHandler extends ChannelInboundHandlerAdapter {
	
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //与客户端创建链接后
        System.out.println("服务端-客户端链接通道激活!");
    }
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		String str = null;
		if(msg instanceof ByteBuf) {
			ByteBuf bb = (ByteBuf)msg;
			str = bb.toString(Charset.forName("UTF-8"));
		}else if(msg != null){
			str = msg.toString();
		}
		System.out.println("服务端收到客户端消息:" + str);
	}
	
	/**
	 * @description: 当读取不到消息后时触发(注:会受到粘包、断包等影响,因此未必是客户定义的一个数据包读取完成即调用)
	 */
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		System.out.println("服务端读取客户端上送消息完毕!");
		ctx.channel().writeAndFlush(Unpooled.copiedBuffer("server has been received.", Charset.forName("UTF-8")));
	}
	
	/**
	 * @description: 异常捕捉方法
	 */
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		System.out.println("服务端通道异常,异常消息:"  + cause.getMessage());
		ctx.close();
	}
 
}

Netty Client代码:框架

package com.netty.tcp.client.core;
 
import java.util.HashMap;
import java.util.Map;
 
import com.netty.resolve.MessageDecoder;
import com.netty.resolve.MessageEncoder;
import com.netty.tcp.client.processor.ClientHandler;
 
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
 
public class NioSocketClient {
	
	//存放NIO客户端实例对象
	private static Map<String, NioSocketClient> map = new HashMap<String, NioSocketClient>();
	
	private String serverIp;
	private int serverPort;
	private Bootstrap bootstrap;
	private EventLoopGroup group;
	private ChannelFuture channelFuture;
	
	private NioSocketClient(String ip, int port) throws InterruptedException {
		this.serverIp = ip;
		this.serverPort = port;
        bootstrap = new Bootstrap();
        group = new NioEventLoopGroup();
        bootstrap.group(group)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .handler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                	.addLast(new MessageEncoder())
                	.addLast(new MessageDecoder())
                	.addLast(new ClientHandler());
            }
        });
        channelFuture = bootstrap.connect(serverIp,serverPort).sync();
	}
 
	/**
	 * @description ip和port惟一肯定对同一NIO Server同ip、port只建立一次
	 * @param ip
	 * @param port
	 * @return nio客户端对象
	 * @throws InterruptedException 
	 */
	public static NioSocketClient getInstance(String ip, int port) throws InterruptedException {
		NioSocketClient niosocket = null;
		if(ip != null
				&& ip.matches("^((?:(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d)))\\.){3}(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d))))")
				&& String.valueOf(port).matches("^([0-9]|[1-9]\\d|[1-9]\\d{2}|[1-9]\\d{3}|[1-5]\\d{4}|6[0-4]\\d{3}|65[0-4]\\d{2}|655[0-2]\\d|6553[0-5])$")){
			String key = ip + ":" + port;
			if(map.containsKey(key)){
				niosocket = map.get(key);
			}else{
				niosocket = new NioSocketClient(ip, port);
				map.put(key, niosocket);
			}
		}
		return niosocket;
	}
	
	public void write(Object obj) {
		if(!channelFuture.channel().isOpen()
				|| !channelFuture.channel().isWritable()
				|| obj == null)
			return;
		channelFuture.channel().writeAndFlush(obj);
	}
	
	public void close() throws InterruptedException {
		channelFuture.channel().closeFuture().sync();
		group.shutdownGracefully();
		map.remove(serverIp + ":" + serverPort);
	}
 
}

Client Handler代码:socket

package com.netty.tcp.client.processor;
 
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
 
public class ClientHandler extends ChannelInboundHandlerAdapter{
	
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //与服务端创建链接后
        System.out.println("客户端-服务端链接通道激活!");
    }
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		String str = null;
		if(msg instanceof ByteBuf) {
			ByteBuf bb = (ByteBuf)msg;
			byte[] b = new byte[bb.readableBytes()];
			bb.readBytes(b);
			str = new String(b, "UTF-8");
		}else if(msg != null){
			str = msg.toString();
		}
		System.out.println("客户端收到消息:" + str);
	}
	
	/**
	 * @description: 当读取不到消息后时触发(注:会受到粘包、断包等影响,因此未必是客户定义的一个数据包读取完成即调用)
	 */
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		System.out.println("客服端读取服务端下发消息完毕!");
	}
	
	/**
	 * @description: 异常捕捉方法
	 */
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		System.out.println("客户端通道异常,异常消息:"  + cause.getMessage());
		ctx.close();
	}
 
}

如下两个类为Netty粘包、断包处理核心类。tcp

package com.netty.resolve;
 
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.List;
 
import com.netty.utils.FormatUtils;
 
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
 
/**
 * @ClassName:MessageEncoder
 * @author lujiafa
 * @email lujiafayx@163.com
 * @date 2017年1月12日
 * @Description: 自定义报文编码器
 */
public class MessageEncoder extends MessageToMessageEncoder<Object> {
 
	@Override
	protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
		if(msg == null)
			return;
		ByteBuf tb = null;
		if(msg instanceof byte[]) {
			tb = Unpooled.copiedBuffer((byte[])msg);
		}else if(msg instanceof ByteBuf) {
			tb = (ByteBuf) msg;
		}else if(msg instanceof ByteBuffer) {
			tb = Unpooled.copiedBuffer((ByteBuffer)msg);
		}else {
			String ostr = msg.toString();
			tb = Unpooled.copiedBuffer(ostr, Charset.forName("UTF-8"));
		}
		byte[] pkg = new byte[4 + tb.readableBytes()];//数据包
		byte[] header = FormatUtils.intToBytes(tb.readableBytes());//报文包头
		byte[] body = new byte[tb.readableBytes()];//包体
		tb.readBytes(body);
		System.arraycopy(header, 0, pkg, 0, header.length);
		System.arraycopy(body, 0, pkg, header.length, body.length);
		out.add(Unpooled.copiedBuffer(pkg));
	}
 
}
package com.netty.resolve;
 
import java.util.List;
 
import javax.xml.bind.DatatypeConverter;
 
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
 
/**
 * @ClassName:MessageDecoder
 * @author lujiafa
 * @email lujiafayx@163.com
 * @date 2017年1月12日
 * @Description: 自定义报文解码器
 */
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
	
	private byte[] remainingBytes;
 
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
		ByteBuf currBB = null;
		if(remainingBytes == null) {
			currBB = msg;
		}else {
			byte[] tb = new byte[remainingBytes.length + msg.readableBytes()];
			System.arraycopy(remainingBytes, 0, tb, 0, remainingBytes.length);
			byte[] vb = new byte[msg.readableBytes()];
			msg.readBytes(vb);
			System.arraycopy(vb, 0, tb, remainingBytes.length, vb.length);
			currBB = Unpooled.copiedBuffer(tb);
		}
		while(currBB.readableBytes() > 0) {
			if(!doDecode(ctx, currBB, out)) {
				break;
			}
		}
		if(currBB.readableBytes() > 0) {
			remainingBytes = new byte[currBB.readableBytes()];
			currBB.readBytes(remainingBytes);
		}else {
			remainingBytes = null;
		}
	}
	
	/**
	 * @Title:doDecode
	 * @Description: 此方法处理同mina中doDecode方法
	 * @param ctx
	 * @param msg
	 * @param out
	 * @return boolean
	 */
	private boolean doDecode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
		if(msg.readableBytes() < 4)
			return false;
		msg.markReaderIndex();
		byte[] header = new byte[4];
		msg.readBytes(header);
		int len = Integer.parseInt(DatatypeConverter.printHexBinary(header), 16);
		if(msg.readableBytes() < len) {
			msg.resetReaderIndex();
			return false;
		}
		byte[] body = new byte[len];
		msg.readBytes(body);
		out.add(Unpooled.copiedBuffer(body));
		if(msg.readableBytes() > 0)
			return true;
		return false;
	}
	
}