学习socket nio 之 mina实例

举报
chenyu 发表于 2021/07/27 00:34:47 2021/07/27
【摘要】 1:mina之浅谈             mina是apache基于socket nio的一套框架,可以从apache官网下载jar包和源码。试用起来非常方便,也非常强大。如果对socket nio还不是很了解的话:请看一下这两篇文章  &n...

1:mina之浅谈

            mina是apache基于socket nio的一套框架,可以从apache官网下载jar包和源码。试用起来非常方便,也非常强大。如果对socket nio还不是很了解的话:请看一下这两篇文章

             学习bytebuffersocket nio实例

            这里我简单的介绍一下mina的框架:

            IoService:这个接口是网络的入口,IoAcceptor和IoConnector都实现这个接口。从名字上我们可以看得出来IoAcceptor是接受链接的(服务端),而IoConnector是用来链接的(客户端)。

            IoFilter:过滤器。他是用来过滤消息的。从IoService(网络接口)出来的数据或者进入IoService(网络接口)的数据都会经过IoFilter的处理。最重要的就是日志和解码和编码。

           IoHandler:处理器。它是链接应用和IoFilter的桥梁,是进行业务处理的,从IoFilter出来的数据会发到IoHandler中处理。

           从一个图中我们来了解一下,这几个接口之间的关系:

 

看着这张图片,就应该明白mina中的数据时怎么传输的了吧。看起来很简单的样子?

2:mina实例

目标:

     不管客户端发送什么数据到服务端,服务端口返回Hello world。

服务端实现

IoHandler:业务处理层


   
  1. import org.apache.mina.core.service.IoHandlerAdapter;
  2. import org.apache.mina.core.session.IdleStatus;
  3. import org.apache.mina.core.session.IoSession;
  4. public class ServerHandler extends IoHandlerAdapter{
  5. @Override
  6. public void exceptionCaught(IoSession session, Throwable cause) throws Exception{
  7. System.out.println("报错啦............");
  8. cause.printStackTrace();
  9. }
  10. @Override
  11. public void messageReceived( IoSession session, Object message ) throws Exception{
  12. String str = message.toString();
  13. System.out.println("messageReceived:"+str);
  14. session.write("hello world");
  15. }
  16. @Override
  17. public void sessionIdle( IoSession session, IdleStatus status ) throws Exception{
  18. System.out.println( "IDLE " + session.getIdleCount( status ));
  19. }
  20. }
 

IoFilter:过滤器层

         这里我们做一个解码的编码的过滤层,这也是mina中最常用的。首先我们需要定义属于我们自己的协议,也就是数据包的格式:别以为这很复杂,其实很简单的。

        我们知道数据都是字节类型的,那么我们的协议格式如下:前两位表示数据包的长度(一个short类型正好两个字节),第三位是闲置位,后面的是数据。长度是闲置位和

数据长度的和。这样我们就可以根据前两位确定,我们的数据包到那里结束。那么我们循环这么读,就会取得所有的数据包。是不是很简单啊,这个格式就是我们的协议。

      为了更简单,这里我们客户端发往服务端的数据进行编码和解码,服务端发往客户端的就不编码了,客户端也就不用解码。服务端使用mina,客户端我们就使用基本的socket nio。

编码工厂类:


   
  1. public class CodecFactory extends DemuxingProtocolCodecFactory{
  2. public CodecFactory(){
  3. super.addMessageEncoder(String.class, Encoder.class);
  4. super.addMessageDecoder(Decoder.class);
  5. }
  6. }
解码类:

   
  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import org.apache.mina.core.buffer.IoBuffer;
  4. import org.apache.mina.core.session.IoSession;
  5. import org.apache.mina.filter.codec.ProtocolDecoderOutput;
  6. import org.apache.mina.filter.codec.demux.MessageDecoder;
  7. import org.apache.mina.filter.codec.demux.MessageDecoderResult;
  8. public class Decoder implements MessageDecoder {
  9. private byte[] r_curPkg = null;
  10. private int r_pos = -1; // 包计数器
  11. static private final int PKG_SIZE_BYTES = 2;//包长度
  12. public Decoder() { }
  13. @Override
  14. public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
  15. return MessageDecoderResult.OK;
  16. }
  17. @Override
  18. public MessageDecoderResult decode(IoSession session, IoBuffer in,ProtocolDecoderOutput out) throws Exception {
  19. List<String> list = new ArrayList<String>();
  20. while (in.remaining() >= PKG_SIZE_BYTES || (r_pos >= 0 && in.hasRemaining())) {// 循环接收包,4为一个整型,表示包长度b, 如果上一个包未接收完成时,继续接收
  21. // 如果上个包已收完整,则创建新的包
  22. if (r_pos == -1) {
  23. //得到下一个包的长度,长度不包括前两位,即包的长度=压缩位长度+数据位长度
  24. int pkgLen = in.getShort();
  25. //如果包长度小于0,那么此包错误,解码失败,返回。
  26. if (pkgLen < 0) {
  27. return MessageDecoderResult.NOT_OK;
  28. }
  29. in.get();
  30. r_curPkg = new byte[pkgLen-1]; //数组长度为数据长度
  31. r_pos = 0;
  32. }
  33. int need = r_curPkg.length - r_pos; //需要读取的数据长度
  34. int length = in.remaining();//缓冲区中可读的数据长度
  35. if (length >= need) {// 可以把当前包读完整
  36. in.get(r_curPkg, r_pos, need); // 复制缓冲区中的数据到r_curPkg中
  37. // 处理接收到一个完整的包数据后,把包添加到池中,判断是否需要需要解压
  38. byte[] data = r_curPkg;
  39. String str = new String(data);
  40. list.add(str);
  41. r_curPkg = null;
  42. r_pos = -1;
  43. } else {
  44. // 如果剩下的字节数,不够一个包则
  45. int remainBytes = in.remaining();
  46. in.get(r_curPkg, r_pos, remainBytes);
  47. r_pos += remainBytes;
  48. return MessageDecoderResult.NEED_DATA;
  49. }
  50. }
  51. for (String protocol : list) {
  52. out.write(protocol);
  53. }
  54. return MessageDecoderResult.OK;
  55. }
  56. @Override
  57. public void finishDecode(IoSession session, ProtocolDecoderOutput out) {
  58. }
  59. }
编码类:(没有进行编码,只进行了数据发送)

   
  1. import org.apache.mina.core.buffer.IoBuffer;
  2. import org.apache.mina.core.session.IoSession;
  3. import org.apache.mina.filter.codec.ProtocolEncoderOutput;
  4. import org.apache.mina.filter.codec.demux.MessageEncoder;
  5. public class Encoder implements MessageEncoder<String>{
  6. public Encoder(){
  7. }
  8. @Override
  9. public void encode(IoSession session, String message, ProtocolEncoderOutput out)
  10. throws Exception {
  11. System.out.println("encode..................");
  12. String value = (String) message;
  13. IoBuffer buf = IoBuffer.allocate(value.getBytes().length);
  14. buf.setAutoExpand(true);
  15. if (value != null){
  16. buf.put(value.trim().getBytes());
  17. }
  18. buf.flip();
  19. out.write(buf);
  20. out.flush();
  21. }
  22. }

IoService层:


   
  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import org.apache.mina.core.service.IoAcceptor;
  4. import org.apache.mina.core.session.IdleStatus;
  5. import org.apache.mina.filter.codec.ProtocolCodecFilter;
  6. import org.apache.mina.filter.logging.LoggingFilter;
  7. import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
  8. public class MinaServer {
  9. private static final int PORT = 9123;
  10. public static void main(String [] args) throws IOException{
  11. IoAcceptor acceptor = new NioSocketAcceptor();
  12. acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
  13. acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new CodecFactory()));
  14. acceptor.setHandler(new ServerHandler());
  15. acceptor.getSessionConfig().setReadBufferSize( 3 );
  16. acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
  17. acceptor.bind( new InetSocketAddress(PORT) );
  18. }
  19. }

到这里我们的服务端代码就写完了,

客户端实现


   
  1. <span style="font-size:12px">public class SocketClient {
  2. public static void main(String...args)throws Exception{
  3. SocketChannel socketChannel = SocketChannel.open();
  4. socketChannel.connect(new InetSocketAddress("localhost",9123));
  5. byte [] bytes = "aaaa".getBytes();
  6. //对数据包进行编码
  7. ByteBuffer buffer = ByteBuffer.allocate(bytes.length+3);
  8. buffer.putShort((short)(bytes.length+1)); //包长度
  9. buffer.put((byte)1);//闲置位
  10. buffer.put(bytes);//数据
  11. buffer.flip();
  12. socketChannel.write(buffer);
  13. socketChannel.socket().shutdownOutput();
  14. String obj = receive(socketChannel);
  15. System.out.println(obj);
  16. }
  17. private static String receive(SocketChannel socketChannel)throws Exception{
  18. ByteBuffer buffer = ByteBuffer.allocate(1024);
  19. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  20. int size = 0;
  21. byte [] bytes = null;
  22. while((size = socketChannel.read(buffer))>=0){
  23. buffer.flip();
  24. bytes = new byte[size];
  25. buffer.get(bytes);
  26. baos.write(bytes);
  27. buffer.clear();
  28. }
  29. bytes = baos.toByteArray();
  30. baos.close();
  31. return new String(bytes);
  32. }
  33. }
  34. </span>

所有的代码都写完了,先启动服务端的MinaServer,然后再启动客户端,我们就会看到结果。






 

 

文章来源: chenyu.blog.csdn.net,作者:chen.yu,版权归原作者所有,如需转载,请联系作者。

原文链接:chenyu.blog.csdn.net/article/details/49328927

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。