Netty 教程
原文:http://shengwangi.blogspot.com/2016/03/netty-tutorial-hello-world-example.html
Netty是一个NIO客户端服务器框架,可以快速轻松地开发网络应用程序。在本教程中,介绍了Netty的基本概念,以及一个hello world level示例。这个基于Netty 4的hello world示例有一个服务器和一个客户端,包括它们之间的心跳,以及POJO发送和接收。
1.概念
Netty的高性能依赖于NIO。Netty有几个重要的概念:通道,管道和入站/出站处理程序。(
通道 可以被认为是I / O请求将经历的隧道。每个频道都有自己的管道。在API级别,最常用的通道是 io.netty.channel。 用于套接字服务器和io.netty.channel的NioServerSocketChannel。NioSocketChannel 用于套接字客户端。
管道 是Netty最重要的概念之一。您可以将管道视为双向队列。队列中填充了入站和出站处理程序。每个处理程序都像servlet过滤器一样工作。顾名思义, “入站”处理程序 只处理读入I / O事件, “OutBound”处理程序 只处理写出I / O事件, “InOutbound”处理程序 处理两者。例如,配置有5个处理程序的管道看起来如下。
此管道等效于以下逻辑。输入I / O事件由处理程序1-3-4-5处理。输出由handes 5-2处理。
在实际项目中,第一个输入处理程序(上图中的处理程序1)通常是解码器。上图中的最后一个输出处理程序2处理程序通常是编码器。最后一个InOutboundHandler通常执行实际业务,处理输入数据对象并发送回复。 在实际使用中,最后一个业务逻辑处理程序通常在与I / O线程不同的线程中执行,因此I / O不会被任何耗时的任务阻塞。 (见下面的例子)
解码器 将读入的ByteBuf转换为在上面的业务逻辑中使用的数据结构。例如,将字节流传输到POJO。如果没有完全接收到帧,它将一直阻塞直到完成,因此下一个处理程序不需要面对部分帧。
编码器 将内部数据结构传输到最终将由套接字写出的ByteBuf。
事件如何流经所有处理程序?需要注意的一点是, 每个处理程序都可以将事件传播到下一个处理程序。一个处理程序需要显式调用ChannelHanderContext的方法 来触发下一个处理程序才能工作。那些方法包括:
入站事件传播方法:
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(对象)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(的Throwable)
ChannelHandlerContext.fireUserEventTriggered(对象)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()
出站事件传播方法:
ChannelHandlerContext.bind(SocketAddress,ChannelPromise)
ChannelHandlerContext.connect(SocketAddress,SocketAddress,ChannelPromise)
ChannelHandlerContext.write(Object,ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)
本文中的演示使用在客户端和服务器之间设置心跳以保持长连接。Netty的 IdleStateHandler 用于空闲时的心跳。在此 IdleStateHandler中, 调用fireUserEventTriggered()以触发下一个处理程序的操作。
2.使用Netty 4的Hello world示例
此示例有1个服务器和1个客户端。长连接用于数据传输。如果每5秒钟之间没有数据,则会从服务器向客户端发送心跳消息。心跳消息具有发送时间的时间戳。当心跳加速时,客户端什么都不做,只需将其发送回服务器即可。服务器可以通过发送时间使用recv time substract打印出环回延迟。
此示例显示:
如何在编码器/解码器的帮助下发送/ recv POJO
如何为长连接添加心跳。
演示服务器的管道如下所示。
IdleStateHandler位于最开头,因此即使输入流量处于错误的帧格式,它也可以判断是否存在流量。
演示客户端的管道如下所示。
2.1添加netty依赖
1 2 3 4 五 | < dependency > < groupId >io.netty</ groupId > < artifactId >netty-all</ artifactId > < version >4.0.34.Final</ version > </ dependency > |
如果使用maven,则将netty添加到pom.xml。
2.2定义公共类
服务器和客户端都使用了3个类。 发送和接收的POJO类 LoopBackTimeStamp.java,编码器类 TimeStampEncoder.java 和解码器类 TimeStampDecoder.java
首先是LoopBackTimeStamp.java
1 2 3 4 五 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 三十 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | package com.shengwang.demo; import java.nio.ByteBuffer; public class LoopBackTimeStamp { private long sendTimeStamp; private long recvTimeStamp; public LoopBackTimeStamp() { this .sendTimeStamp = System.nanoTime(); } public long timeLapseInNanoSecond() { return recvTimeStamp - sendTimeStamp; } /** * Transfer 2 long number to a 16 byte-long byte[], every 8 bytes represent a long number. * @return */ public byte [] toByteArray() { final int byteOfLong = Long.SIZE / Byte.SIZE; byte [] ba = new byte [byteOfLong * 2 ]; byte [] t1 = ByteBuffer.allocate(byteOfLong).putLong(sendTimeStamp).array(); byte [] t2 = ByteBuffer.allocate(byteOfLong).putLong(recvTimeStamp).array(); for ( int i = 0 ; i < byteOfLong; i++) { ba[i] = t1[i]; } for ( int i = 0 ; i < byteOfLong; i++) { ba[i + byteOfLong] = t2[i]; } return ba; } /** * Transfer a 16 byte-long byte[] to 2 long numbers, every 8 bytes represent a long number. * @param content */ public void fromByteArray( byte [] content) { int len = content.length; final int byteOfLong = Long.SIZE / Byte.SIZE; if (len != byteOfLong * 2 ) { System.out.println( "Error on content length" ); return ; } ByteBuffer buf1 = ByteBuffer.allocate(byteOfLong).put(content, 0 , byteOfLong); ByteBuffer buf2 = ByteBuffer.allocate(byteOfLong).put(content, byteOfLong, byteOfLong); buf1.rewind(); buf2.rewind(); this .sendTimeStamp = buf1.getLong(); this .recvTimeStamp = buf2.getLong(); } // getter/setter ignored } |
该 LoopBackTimeStamp 类有2个长的数字。它还有2个方法, toByteArray() 用于将内部2长数转换为16字节的字节数组。 fromByteArray() 反向工作,将16字节数组更改回2个长数字。
然后是编码器和解码器。编码器 TimeStampEncoder 将LoopBackTimeStamp对象传输到可以发送出去的字节数组中。
1 2 3 4 五 6 7 8 9 10 11 12 13 14 | package com.shengwang.demo.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import com.shengwang.demo.LoopBackTimeStamp; public class TimeStampEncoder extends MessageToByteEncoder<LoopBackTimeStamp> { @Override protected void encode(ChannelHandlerContext ctx, LoopBackTimeStamp msg, ByteBuf out) throws Exception { out.writeBytes(msg.toByteArray()); } } |
解码器将从套接字接收的字节传输到 LoopBackTimeStamp 对象,以便业务处理程序进行处理。
1 2 3 4 五 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | package com.shengwang.demo.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; import com.shengwang.demo.LoopBackTimeStamp; public class TimeStampDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { final int messageLength = Long.SIZE/Byte.SIZE * 2 ; if (in.readableBytes() < messageLength) { return ; } byte [] ba = new byte [messageLength]; in.readBytes(ba, 0 , messageLength); // block until read 16 bytes from sockets LoopBackTimeStamp loopBackTimeStamp = new LoopBackTimeStamp(); loopBackTimeStamp.fromByteArray(ba); out.add(loopBackTimeStamp); } } |
解码器尝试整个读取16个字节,然后从这个16字节的数组中创建一个LoopBackTimeStamp对象。如果收到的字节少于16个字节,则会阻塞,直到收到完整的帧。
2.3定义服务器类
除了上面3个常用类之外,服务器和客户端都分别有2个自己的类,Main +一个用于实际逻辑的Handler。服务器ServerHandler.java的逻辑处理程序如下所示。
1 2 3 4 五 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 三十 31 32 33 34 | package com.shengwang.demo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { LoopBackTimeStamp ts = (LoopBackTimeStamp) msg; ts.setRecvTimeStamp(System.nanoTime()); System.out.println( "loop delay in ms : " + 1.0 * ts.timeLapseInNanoSecond() / 1000000L); } // Here is how we send out heart beat for idle to long @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.ALL_IDLE) { // idle for no read and write ctx.writeAndFlush( new LoopBackTimeStamp()); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } } |
这三种方法都是重写方法。第一个 channelRead() 读取循环返回消息并打印出行程所花费的时间。第二种方法处理IdleStateHandler 触发的事件 (您可能需要向上滚动以查看服务器管道的配置方式)。当空闲时间过长时, LoopBackTimeStamp 对象将作为心跳发送出去。
服务器的另一个类是主类NettyServer.java。
1 2 3 4 五 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 三十 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | package com.shengwang.demo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; import java.io.IOException; import com.shengwang.demo.codec.TimeStampDecoder; import com.shengwang.demo.codec.TimeStampEncoder; public class NettyServer { public static void main(String[] args) throws IOException, InterruptedException { NioEventLoopGroup boosGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boosGroup, workerGroup); bootstrap.channel(NioServerSocketChannel. class ); // =========================================================== // 1. define a separate thread pool to execute handlers with // slow business logic. e.g database operation // =========================================================== final EventExecutorGroup group = new DefaultEventExecutorGroup( 1500 ); //thread pool of 1500 bootstrap.childHandler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( "idleStateHandler" , new IdleStateHandler( 0 , 0 , 5 )); // add with name pipeline.addLast( new TimeStampEncoder()); // add without name, name auto generated pipeline.addLast( new TimeStampDecoder()); // add without name, name auto generated //=========================================================== // 2. run handler with slow business logic // in separate thread from I/O thread //=========================================================== pipeline.addLast(group, "serverHandler" , new ServerHandler()); } }); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true ); bootstrap.bind( 19000 ).sync(); } } |
大多数主要代码都是如何初始化netty服务器的样板,注意如何将thoese处理程序添加到管道以及如何在分离的线程中运行业务逻辑处理程序。
2.4定义客户端类
客户端和服务器一样,也有2个类。主+一个处理程序。ClientHandler类与ServerHandler类一样,也是一个“入站处理程序”,只有进程收入消息。
1 2 3 4 五 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | package com.shengwang.demo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { LoopBackTimeStamp ts = (LoopBackTimeStamp) msg; ctx.writeAndFlush(ts); //recieved message sent back directly } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } } |
客户端读取消息并直接将其发回以进行环回。
客户端NettyClient.java的主要类如下所示。
1 2 3 4 五 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 三十 | package com.shengwang.demo; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import com.shengwang.demo.codec.TimeStampDecoder; import com.shengwang.demo.codec.TimeStampEncoder; public class NettyClient { public static void main(String[] args) { NioEventLoopGroup workerGroup = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel. class ); b.handler( new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new TimeStampEncoder(), new TimeStampDecoder(), new ClientHandler()); } }); String serverIp = "192.168.203.156" ; b.connect(serverIp, 19000 ); } } |
演示客户端连接到硬代码ip和端口。
最后,项目层次结构如下:
3.运行它
首先让我们运行服务器,然后打开另一个窗口来运行客户端。连接客户端后,每隔5秒钟会看到一个回送行程消息在屏幕上打印出来。
此外,该演示还用于粗略估计我们项目中的硬件需求,以支持服务器支持大型长连接客户端。在具有2个CPU(Xeon E5-2650 2.0GHZ,20M高速缓存,8个内核,16个线程)和32G RAM的服务器上运行NettyServer时。工作负载如下所示,有264,000个连接。
6台主机用作运行NettyClient的客户端。所以每个主机都有大约40,000个连接。同一客户端主机上的连接同时触发心跳,因此cpu使用大致会减少此工作负载。如果心跳可以分散一点,cpu工作量就会明显下降。
- 点赞
- 收藏
- 关注作者
评论(0)