高性能网络通信框架Netty一套就够(下集)
Netty进阶
粘包和半包/拆包问题
粘包问题演示
服务器端:
private static final Logger log= LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(6);
new ServerBootstrap()
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//不进行加解密不然展示不出粘包效果
// ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端已成功连接服务器");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("msg={}",msg);
super.channelRead(ctx, msg);
}
});
}
}).bind(8080);
}
客户端:
private static final Logger log= LoggerFactory.getLogger(NettyClient.class);
public static void main(String[] args) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
//不进行加解密不然展示不出粘包效果
// ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new LoggingHandler());
}
}).connect("localhost", 8080);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
ByteBuf byteBuf = channel.alloc().buffer(16);
for (int i=0;i<10;i++){
byteBuf.retain();
byteBuf.writeBytes(("hello").getBytes("utf-8"));
channel.writeAndFlush(byteBuf);
byteBuf.clear();
}
channel.close();
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
eventLoopGroup.shutdownGracefully();
}
});
}
});
}
服务器端输出结果:
16:00:37.869 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xa191631f, L:/127.0.0.1:8080 - R:/127.0.0.1:53693] READ: 50B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 68 65 6c 6c 6f 68 65 6c 6c 6f 68 |hellohellohelloh|
|00000010| 65 6c 6c 6f 68 65 6c 6c 6f 68 65 6c 6c 6f 68 65 |ellohellohellohe|
|00000020| 6c 6c 6f 68 65 6c 6c 6f 68 65 6c 6c 6f 68 65 6c |llohellohellohel|
|00000030| 6c 6f |lo |
+--------+-------------------------------------------------+----------------+
可以看出原来我们是在客户端分10次发送,而服务器端却一下把10次的数据都粘在一起了,这就是粘包问题。
半包问题展示
服务器端:
private static final Logger log= LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(6);
new ServerBootstrap()
.group(boss, worker)
.channel(NioServerSocketChannel.class)
// 半包问题:例如,发送方发送100字节数据,而接收方最多只能接收30字节数据,这就是半包问题
//option(ChannelOption.SO_RCVBUF,10),调整接收缓冲区大小(滑动窗口)
.option(ChannelOption.SO_RCVBUF,10)
.childHandler(
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 不进行加解密不然展示不出粘包效果
// ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline()
.addLast(
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端已成功连接服务器");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
log.info("msg={}", msg);
super.channelRead(ctx, msg);
}
});
}
})
.bind(8080);
}
只需使用这个方法即可
- option(ChannelOption.SO_RCVBUF,10)
option(ChannelOption.SO_RCVBUF,10),调整接收缓冲区大小。由于接收缓存区的大小<发送方发送的数据大小,所以产生了半包问题。
现象分析
粘包:
- 产生现象
- 第一次发送abc,第二次发送def,接收到的是一整个abcdef
- 原因
- Netty层
- 接收方的接收缓冲区太大,Netty的接收缓冲区默认是1024字节
- 网络层
- TCP滑动窗口:假如发送方发送100字节数据,而滑动窗口缓冲区可容纳>100字节数据,这时候就会出现粘包问题。
- Nagle 算法:会造成粘包
- Netty层
半包/拆包:
- 产生现象
- 发送abcdef数据,接收方第一次收到ab,第二次收到cd,第三次收到ef
- 原因
- Netty层
- 接收方的接收缓冲区太小,发送方的数据过大,导致接收方无法一次接收下所有数据,就会半包/拆包
- 网络层
- 滑动窗口:假设接收方的窗口只剩了128bytes,发送方的报文大小是256bytes,这时接收方窗口中无法容纳发送方的全部报文,发送方只能先发送前128bytes,等待ack后才能发送剩余部分,这就造成了半包
- 数据链路层
- MSS 限制:当发送的数据超过MSS限制后,会将数据切分发送,就会造成半包
- Netty层
发送这些问题的本质:因为 TCP 是流式协议,消息无边界
粘包和半包/拆包解决方案
短连接
短连接:即每发送一条数据就重新连接再次发送,反复此操作。
短连接的缺点是显而易见的,每次发送一条数据都要重新连接,这样会大大的浪费时间,因为连接是需要时间的。
客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开。
这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。
但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象
采用短连接解决粘包代码
服务端:
private static final Logger log = LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(6);
new ServerBootstrap()
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline()
.addLast(
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端已成功连接服务器");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("msg="+msg);
super.channelRead(ctx, msg);
}
});
}
})
.bind(8080);
}
客户端:
private static final Logger log= LoggerFactory.getLogger(NettyClient.class);
public static void main(String[] args) {
//采用短连接解决“”粘包“”问题,无法解决半包问题
for (int i = 0; i < 10; i++) {
sendMessage("hello");
}
}
public static void sendMessage(String msg){
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(eentLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
//不进行加解密不然展示不出粘包效果
// ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(msg.getBytes("utf-8"));
ch.writeAndFlush(buffer);
ch.close();
ChannelFuture closeFuture = ch.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
eventLoopGroup.shutdownGracefully();
}
});
}
});
}
}).connect("localhost", 8080);
}
定长解码器
客户端于服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若发送数据长度不足则需要补齐至该长度。
服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到FixedLengthFrameDecoder对数据进行定长解码
行解码器(推荐)
对于其他解码器,我还是更喜欢行解码器。行解码器主要是靠分隔符\n来判断行进行解码,不过需要进行限制长度,以免服务器一直搜索\n造成卡死。
改造前的粘包代码
服务端:
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(6);
new ServerBootstrap()
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("msg={}",msg);
super.channelRead(ctx, msg);
}
});
}
}).bind(8080);
客户端:
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
try{
new Bootstrap()
.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer(16);
for(int i=0;i<10;i++){
buffer.retain();
buffer.writeBytes("hello world".getBytes("utf-8"));
ctx.channel().writeAndFlush(buffer);
}
ch.close();//关闭Channel
ChannelFuture closeFuture = ch.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
nioEventLoopGroup.shutdownGracefully();
}
});
}
});
}
}).connect("localhost",8080);
}catch (Exception e){
e.printStackTrace();
}
结果:
13:37:15.286 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x36ce6c5f, L:/127.0.0.1:8080 - R:/127.0.0.1:64550] READ: 110B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 68 65 6c 6c 6f |hello worldhello|
|00000010| 20 77 6f 72 6c 64 68 65 6c 6c 6f 20 77 6f 72 6c | worldhello worl|
|00000020| 64 68 65 6c 6c 6f 20 77 6f 72 6c 64 68 65 6c 6c |dhello worldhell|
|00000030| 6f 20 77 6f 72 6c 64 68 65 6c 6c 6f 20 77 6f 72 |o worldhello wor|
|00000040| 6c 64 68 65 6c 6c 6f 20 77 6f 72 6c 64 68 65 6c |ldhello worldhel|
|00000050| 6c 6f 20 77 6f 72 6c 64 68 65 6c 6c 6f 20 77 6f |lo worldhello wo|
|00000060| 72 6c 64 68 65 6c 6c 6f 20 77 6f 72 6c 64 |rldhello world |
+--------+-------------------------------------------------+----------------+
接收方使用行解码器改造后
服务端:
private static final Logger log= LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(6);
new ServerBootstrap()
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));//配置行解码器
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("msg={}",msg);
super.channelRead(ctx, msg);
}
});
}
}).bind(8080);
客户端:
//把消息加工成可以被行解码器识别的消息
private static String getMsg(String oldMsg){
oldMsg+='\n';
return oldMsg;
}
public static void main(String[] args) {
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
try{
new Bootstrap()
.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer(16);
for(int i=0;i<10;i++){
buffer.retain();
String msg = getMsg("hello world");
buffer.writeBytes(msg.getBytes("utf-8"));
ctx.channel().writeAndFlush(buffer);
//清理缓存,防止数据堆叠
buffer.clear();
}
ch.close();//关闭Channel
ChannelFuture closeFuture = ch.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
nioEventLoopGroup.shutdownGracefully();
}
});
}
});
}
}).connect("localhost",8080);
}catch (Exception e){
e.printStackTrace();
}
}
- 点赞
- 收藏
- 关注作者
评论(0)