节选自 Netty权威指南 第二版java
TCP是个“流”协议,所谓流,就是没有界面的一串数据。你们能够想象河里的流水,bootstrap
它们是连成一片的,其间并无分界线。TCP底层并不了解上层业务数据的具体含义,它安全
会根据TCP缓冲区的实际状况进行包的划分,因此在业务上认为,一个完整的包可能会服务器
被TCP拆分红多个包进行发送,也有可能把多个小小的包封装成一个大的数据包发送,这框架
就是所谓的TCP粘包和拆包问题。socket
咱们能够经过图解对TCP粘包和拆包问题进行说明,以下图ide
假设客户端分别发送了两个数据包D1和D2给服务端,因为服务端一次读取到的字节数是不肯定的,故可能存在如下4种状况。oop
1) 服务端分两次取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包。编码
2) 服务端一次接收到了两个数据包,D1和D2粘合在一块儿,被称为TCP粘包。spa
3) 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部份内容,第二次取到了D2包的剩余内容,这被称为TCP粘包。
4) 服务端分两次读取到了两个数据包,第一次读取到D1包的部份内容D1_1,第二次读取到了D1包的剩余内容和D2包的整包。
若是此时服务端TCP接受滑窗很是小,而数据包D1和D2比较大,颇有可能会发生第5种可能,即服务端分屡次才能将D1和D2包接收彻底,期间发生屡次拆包。
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; public class TimeServer { private final static int PORT = 8080; public static void main(String[] args) { start(); } private static void start() { final TimeServerHandler serverHandler = new TimeServerHandler(); // 建立EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); // 建立EventLoopGroup ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) //指定所使用的NIO传输Channel .channel(NioServerSocketChannel.class) //使用指定的端口设置套接字地址 .localAddress(new InetSocketAddress(PORT)) // 添加一个EchoServerHandler到Channle的ChannelPipeline .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //EchoServerHandler被标注为@shareable,因此咱们能够老是使用一样的案例 socketChannel.pipeline().addLast(serverHandler); } }); try { ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }2. TimeServerHandler.java
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; @Sharable public class TimeServerHandler extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length()); System.out.println( "The time server receive order: " + body+"; the counter is "+ ++counter ); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.writeAndFlush(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }3. TimeClient.java
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class TimeClient { private static final String HOST = "localhost"; private static final int PORT = 8080; public static void main(String[] args) { start(); } private static void start() { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(HOST, PORT)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new TimeClientHandler()); } }); try { ChannelFuture future = bootstrap.connect().sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { group.shutdownGracefully(); } } }4. TimeClientHandler.java
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class TimeClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private int counter; private byte[] req; private int req_times = 100; public TimeClientHandler() { req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf message = null; for (int i=0; i<req_times; i++){ message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { byte[] req = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println( "Now is : "+ body +" ; the counter is :" + ++counter ); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }客户端跟服务端链路创建成功以后,循环发送100条消息,每发送一条就刷新一次,保证每条消息都会被写入Channel中。按照咱们的设计,服务端应该接收到100条查询时间指令的请求消息。
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import java.net.InetSocketAddress; public class TimeServer { private final static int PORT = 8080; public static void main(String[] args) { start(); } private static void start() { final TimeServerHandler serverHandler = new TimeServerHandler(); // 建立EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); // 建立EventLoopGroup ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) //指定所使用的NIO传输Channel .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //使用指定的端口设置套接字地址 .localAddress(new InetSocketAddress(PORT)) // 添加一个EchoServerHandler到Channle的ChannelPipeline .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //EchoServerHandler被标注为@shareable,因此咱们能够老是使用一样的案例 socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(serverHandler); } }); try { ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; @Sharable public class TimeServerHandler extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println( "The time server receive order: " + body+"; the counter is "+ ++counter ); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.writeAndFlush(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }改进的 TimeClient
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import java.net.InetSocketAddress; public class TimeClient { private static final String HOST = "localhost"; private static final int PORT = 8080; public static void main(String[] args) { start(); } private static void start() { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(HOST, PORT)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new TimeClientHandler()); } }); try { ChannelFuture future = bootstrap.connect().sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { group.shutdownGracefully(); } } }改进的TimeClientHandler
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; public class TimeClientHandler extends ChannelInboundHandlerAdapter { private int counter; private byte[] req; private int req_times = 100; public TimeClientHandler() { req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf message = null; for (int i=0; i<req_times; i++){ message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println( "Now is " + body + " ; the counter is "+ ++counter ); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }分别运行TimeServer和TimeClient,执行结果以下。