Netty 教程

举报
Amrf 发表于 2019/06/23 15:02:32 2019/06/23
【摘要】 原文:http://shengwangi.blogspot.com/2016/03/netty-tutorial-hello-world-example.htmlNetty is a NIO client server framework which enables quick and easy development of network applications. In this tut...

原文:http//shengwangi.blogspot.com/2016/03/netty-tutorial-hello-world-example.html

Netty是一个NIO客户端服务器框架,可以快速轻松地开发网络应用程序。在本教程中,介绍了Netty的基本概念,以及一个hello world level示例。这个基于Netty 4的hello world示例有一个服务器和一个客户端,包括它们之间的心跳,以及POJO发送和接收。

1.概念

Netty的高性能依赖于NIO。Netty有几个重要的概念:通道,管道和入站/出站处理程序。(

通道  可以被认为是I / O请求将经历的隧道。每个频道都有自己的管道。在API级别,最常用的通道是  io.netty.channel。 用于套接字服务器和io.netty.channel的NioServerSocketChannelNioSocketChannel  用于套接字客户端。

管道  是Netty最重要的概念之一。您可以将管道视为双向队列。队列中填充了入站和出站处理程序。每个处理程序都像servlet过滤器一样工作。顾名思义,  “入站”处理程序  只处理读入I / O事件,  “OutBound”处理程序  只处理写出I / O事件,  “InOutbound”处理程序  处理两者。例如,配置有5个处理程序的管道看起来如下。

image_thumb4.png

此管道等效于以下逻辑。输入I / O事件由处理程序1-3-4-5处理。输出由handes 5-2处理。

image_thumb3.png

在实际项目中,第一个输入处理程序(上图中的处理程序1)通常是解码器。上图中的最后一个输出处理程序2处理程序通常是编码器。最后一个InOutboundHandler通常执行实际业务,处理输入数据对象并发送回复。 在实际使用中,最后一个业务逻辑处理程序通常在与I / O线程不同的线程中执行,因此I / O不会被任何耗时的任务阻塞。 (见下面的例子)

解码器  将读入的ByteBuf转换为在上面的业务逻辑中使用的数据结构。例如,将字节流传输到POJO。如果没有完全接收到帧,它将一直阻塞直到完成,因此下一个处理程序不需要面对部分帧。

编码器  将内部数据结构传输到最终将由套接字写出的ByteBuf。

事件如何流经所有处理程序?需要注意的一点是,  每个处理程序都可以将事件传播到下一个处理程序。一个处理程序需要显式调用ChannelHanderContext的方法   来触发下一个处理程序才能工作。那些方法包括:

入站事件传播方法:

  • ChannelHandlerContext.fireChannelRegistered()

  • ChannelHandlerContext.fireChannelActive()

  • ChannelHandlerContext.fireChannelRead(对象)

  • ChannelHandlerContext.fireChannelReadComplete()

  • ChannelHandlerContext.fireExceptionCaught(的Throwable)

  • ChannelHandlerContext.fireUserEventTriggered(对象)

  • ChannelHandlerContext.fireChannelWritabilityChanged()

  • ChannelHandlerContext.fireChannelInactive()

  • ChannelHandlerContext.fireChannelUnregistered()

出站事件传播方法:

  • ChannelHandlerContext.bind(SocketAddress,ChannelPromise)

  • ChannelHandlerContext.connect(SocketAddress,SocketAddress,ChannelPromise)

  • ChannelHandlerContext.write(Object,ChannelPromise)

  • ChannelHandlerContext.flush()

  • ChannelHandlerContext.read()

  • ChannelHandlerContext.disconnect(ChannelPromise)

  • ChannelHandlerContext.close(ChannelPromise)

  • ChannelHandlerContext.deregister(ChannelPromise)

本文中的演示使用在客户端和服务器之间设置心跳以保持长连接。Netty的  IdleStateHandler 用于空闲时的心跳。在此  IdleStateHandler中,   调用fireUserEventTriggered()以触发下一个处理程序的操作。

2.使用Netty 4的Hello world示例

此示例有1个服务器和1个客户端。长连接用于数据传输。如果每5秒钟之间没有数据,则会从服务器向客户端发送心跳消息。心跳消息具有发送时间的时间戳。当心跳加速时,客户端什么都不做,只需将其发送回服务器即可。服务器可以通过发送时间使用recv time substract打印出环回延迟。

此示例显示:

  • 如何在编码器/解码器的帮助下发送/ recv POJO

  • 如何为长连接添加心跳。

演示服务器的管道如下所示。

image_thumb6.png

IdleStateHandler位于最开头,因此即使输入流量处于错误的帧格式,它也可以判断是否存在流量。 

演示客户端的管道如下所示。

image_thumb.png

2.1添加netty依赖

1
2
3
4
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>4.0.34.Final</version>
</dependency>

如果使用maven,则将netty添加到pom.xml。

2.2定义公共类

服务器和客户端都使用了3个类。 发送和接收的POJO类  LoopBackTimeStamp.java,编码器类  TimeStampEncoder.java  和解码器类  TimeStampDecoder.java 

首先是LoopBackTimeStamp.java

1
2
3
4
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
三十
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.shengwang.demo;
 
import java.nio.ByteBuffer;
 
public class LoopBackTimeStamp {
  private long sendTimeStamp;
  private long recvTimeStamp;
 
  public LoopBackTimeStamp() {
    this.sendTimeStamp = System.nanoTime();
  }
 
  public long timeLapseInNanoSecond() {
    return recvTimeStamp - sendTimeStamp;
  }
 
  /**
   * Transfer 2 long number to a 16 byte-long byte[], every 8 bytes represent a long number.
   * @return
   */
  public byte[] toByteArray() {
 
    final int byteOfLong = Long.SIZE / Byte.SIZE;
    byte[] ba = new byte[byteOfLong * 2];
    byte[] t1 = ByteBuffer.allocate(byteOfLong).putLong(sendTimeStamp).array();
    byte[] t2 = ByteBuffer.allocate(byteOfLong).putLong(recvTimeStamp).array();
 
    for (int i = 0; i < byteOfLong; i++) {
      ba[i] = t1[i];
    }
 
    for (int i = 0; i < byteOfLong; i++) {
      ba[i + byteOfLong] = t2[i];
    }
    return ba;
  }
 
  /**
   * Transfer a 16 byte-long byte[] to 2 long numbers, every 8 bytes represent a long number.
   * @param content
   */
  public void fromByteArray(byte[] content) {
    int len = content.length;
    final int byteOfLong = Long.SIZE / Byte.SIZE;
    if (len != byteOfLong * 2) {
      System.out.println("Error on content length");
      return;
    }
    ByteBuffer buf1 = ByteBuffer.allocate(byteOfLong).put(content, 0, byteOfLong);
    ByteBuffer buf2 = ByteBuffer.allocate(byteOfLong).put(content, byteOfLong, byteOfLong);
    buf1.rewind();
    buf2.rewind();
    this.sendTimeStamp = buf1.getLong();
    this.recvTimeStamp = buf2.getLong();
  }
   
  // getter/setter ignored
}

该  LoopBackTimeStamp  类有2个长的数字。它还有2个方法,  toByteArray()  用于将内部2长数转换为16字节的字节数组。 fromByteArray()  反向工作,将16字节数组更改回2个长数字。

然后是编码器和解码器。编码器  TimeStampEncoder  将LoopBackTimeStamp对象传输到可以发送出去的字节数组中。 

1
2
3
4
6
7
8
9
10
11
12
13
14
package com.shengwang.demo.codec;
 
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
 
import com.shengwang.demo.LoopBackTimeStamp;
 
public class TimeStampEncoder extendsMessageToByteEncoder<LoopBackTimeStamp> {
  @Override
  protected void encode(ChannelHandlerContext ctx, LoopBackTimeStamp msg, ByteBuf out) throws Exception {
    out.writeBytes(msg.toByteArray());
  }
}

解码器将从套接字接收的字节传输到  LoopBackTimeStamp  对象,以便业务处理程序进行处理。

1
2
3
4
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.shengwang.demo.codec;
 
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
 
import java.util.List;
 
import com.shengwang.demo.LoopBackTimeStamp;
 
public class TimeStampDecoder extends ByteToMessageDecoder {
 
  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    final int messageLength = Long.SIZE/Byte.SIZE *2;
    if (in.readableBytes() < messageLength) {
      return;
    }
     
    byte [] ba = new byte[messageLength];
    in.readBytes(ba, 0, messageLength);  // block until read 16 bytes from sockets
    LoopBackTimeStamp loopBackTimeStamp = new LoopBackTimeStamp();
    loopBackTimeStamp.fromByteArray(ba);
    out.add(loopBackTimeStamp);
  }
}

解码器尝试整个读取16个字节,然后从这个16字节的数组中创建一个LoopBackTimeStamp对象。如果收到的字节少于16个字节,则会阻塞,直到收到完整的帧。

2.3定义服务器类

除了上面3个常用类之外,服务器和客户端都分别有2个自己的类,Main +一个用于实际逻辑的Handler。服务器ServerHandler.java的逻辑处理程序如下所示。

1
2
3
4
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
三十
31
32
33
34
package com.shengwang.demo;
 
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
 
public class ServerHandler extends ChannelInboundHandlerAdapter {
 
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {
    LoopBackTimeStamp ts = (LoopBackTimeStamp) msg;
    ts.setRecvTimeStamp(System.nanoTime());
    System.out.println("loop delay in ms : " 1.0 * ts.timeLapseInNanoSecond() / 1000000L);
  }
 
  // Here is how we send out heart beat for idle to long
  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
    if (evt instanceof IdleStateEvent) {
      IdleStateEvent event = (IdleStateEvent) evt;
      if (event.state() == IdleState.ALL_IDLE) { // idle for no read and write
        ctx.writeAndFlush(new LoopBackTimeStamp());
      }
    }
  }
 
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    // Close the connection when an exception is raised.
    cause.printStackTrace();
    ctx.close();
  }
}

这三种方法都是重写方法。第一个  channelRead()  读取循环返回消息并打印出行程所花费的时间。第二种方法处理IdleStateHandler 触发的事件  (您可能需要向上滚动以查看服务器管道的配置方式)。当空闲时间过长时,  LoopBackTimeStamp  对象将作为心跳发送出去。

服务器的另一个类是主类NettyServer.java。

1
2
3
4
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
三十
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.shengwang.demo;
 
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.io.IOException;
import com.shengwang.demo.codec.TimeStampDecoder;
import com.shengwang.demo.codec.TimeStampEncoder;
 
public class NettyServer {
 
  public static void main(String[] args) throws IOException, InterruptedException {
    NioEventLoopGroup boosGroup = new NioEventLoopGroup();
    NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(boosGroup, workerGroup);
    bootstrap.channel(NioServerSocketChannel.class);
     
    // ===========================================================
    // 1. define a separate thread pool to execute handlers with
    //    slow business logic. e.g database operation
    // ===========================================================
    final EventExecutorGroup group = newDefaultEventExecutorGroup(1500); //thread pool of 1500
     
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("idleStateHandler",newIdleStateHandler(0,0,5)); // add with name
        pipeline.addLast(new TimeStampEncoder()); // add without name, name auto generated
        pipeline.addLast(new TimeStampDecoder()); // add without name, name auto generated
         
        //===========================================================
        // 2. run handler with slow business logic
        //    in separate thread from I/O thread
        //===========================================================
        pipeline.addLast(group,"serverHandler",newServerHandler());
      }
    });
     
    bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
    bootstrap.bind(19000).sync();
  }
}

大多数主要代码都是如何初始化netty服务器的样板,注意如何将thoese处理程序添加到管道以及如何在分离的线程中运行业务逻辑处理程序。

2.4定义客户端类

客户端和服务器一样,也有2个类。主+一个处理程序。ClientHandler类与ServerHandler类一样,也是一个“入站处理程序”,只有进程收入消息。

1
2
3
4
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.shengwang.demo;
 
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
 
public class ClientHandler extends ChannelInboundHandlerAdapter {
 
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {
    LoopBackTimeStamp ts = (LoopBackTimeStamp) msg;
    ctx.writeAndFlush(ts); //recieved message sent back directly
  }
 
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    // Close the connection when an exception is raised.
    cause.printStackTrace();
    ctx.close();
  }
}

客户端读取消息并直接将其发回以进行环回。

客户端NettyClient.java的主要类如下所示。

1
2
3
4
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
三十
package com.shengwang.demo;
 
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
 
import com.shengwang.demo.codec.TimeStampDecoder;
import com.shengwang.demo.codec.TimeStampEncoder;
 
public class NettyClient {
 
  public static void main(String[] args) {
    NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    Bootstrap b = new Bootstrap();
    b.group(workerGroup);
    b.channel(NioSocketChannel.class);
 
    b.handler(new ChannelInitializer<SocketChannel>() {
      @Override
      public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeStampEncoder(),newTimeStampDecoder(),new ClientHandler());
      }
    });
     
    String serverIp = "192.168.203.156";
    b.connect(serverIp, 19000);
  }
}

演示客户端连接到硬代码ip和端口。  

最后,项目层次结构如下:

image_thumb [1] .PNG

3.运行它

首先让我们运行服务器,然后打开另一个窗口来运行客户端。连接客户端后,每隔5秒钟会看到一个回送行程消息在屏幕上打印出来。

image_thumb [2] .PNG

此外,该演示还用于粗略估计我们项目中的硬件需求,以支持服务器支持大型长连接客户端。在具有2个CPU(Xeon E5-2650 2.0GHZ,20M高速缓存,8个内核,16个线程)和32G RAM的服务器上运行NettyServer时。工作负载如下所示,有264,000个连接。

image_thumb [1](1).png

6台主机用作运行NettyClient的客户端。所以每个主机都有大约40,000个连接。同一客户端主机上的连接同时触发心跳,因此cpu使用大致会减少此工作负载。如果心跳可以分散一点,cpu工作量就会明显下降。


【版权声明】本文为华为云社区用户翻译文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容, 举报邮箱:cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。