c# 服务中间件开发

举报
王阿毛 发表于 2020/04/16 18:27:02 2020/04/16
【摘要】 最近由于产品需求,要求自己实现一个中间件,用来转发客户端和服务端中间的数据,并对数据做一些逻辑处理。考虑后期可能会有功能扩展及协议的扩展,就搭了一个框架。该项目共分为两部分,主程序负责,对客户端和服务端数据进行接收并根据配置调用不同协议的数据读取。协议插件只负责对转发的数据进行解析并返回给主程序代码层级如下Simulator.Business层:负责事务性逻辑的实现,包括了后期的功能扩展,转...

最近由于产品需求,要求自己实现一个中间件,用来转发客户端和服务端中间的数据,并对数据做一些逻辑处理。考虑后期可能会有功能扩展及协议的扩展,就搭了一个框架。

该项目共分为两部分,主程序负责,对客户端和服务端数据进行接收并根据配置调用不同协议的数据读取。协议插件只负责对转发的数据进行解析并返回给主程序

代码层级如下

Simulator.Business层:

负责事务性逻辑的实现,包括了后期的功能扩展,转发的数据修改保存等

Simulator.Common层:

是对所有公共方法的抽取,包括了常用的文件读取写入

Simulator.IDAL层

接口层,包括定义了协议插件、功能插件需要实现的方法及通用方法的抽取和定义

Simulator.Model层

定义了各模块中的实体类

SimulatorPreconsole层

表现层,并将与客户端服务端数据通讯的方法实现集成到了该层

通讯过程中使用的框架有:Autofac,Dotnetty

部分功能性代码如下:

加载协议插件,并将方法注册到builder上

ProtocolDic.TryGetValue(this.comboBox_Protocol.SelectedItem.ToString(), out  Path);
                var ass = System.Reflection.Assembly.LoadFile(Path);
                if (ass != null)
                {
                    ContainerBuilder builder = new ContainerBuilder();
                    builder.RegisterAssemblyTypes(ass).Where(t =>  t.Name.Equals("Protocol",  StringComparison.Ordinal)).AsImplementedInterfaces();
                    builder.RegisterType<BytesbufferBusiness>().As<IBytebufferBusiness>();     
                    Container = builder.Build();
                    return true;
                }
                else
                {
                    return true;
                }

由于Dotnetty要求数据头部占用数据长度的位数只允许有1,2,3,4,8位(The length of the prepended length field. Only 1, 2, 3, 4, and 8 are allowed.),我们接受的报文恰好占6位,导致只能自己处理这部分所以定义了线程变量来存储分次接收报文时的缓存数据。

       private ThreadLocal<byte[]> clientBytes = new ThreadLocal<byte[]>(() =>  Array.Empty<byte>(), true);

       private ThreadLocal<int> clientBytesLength = new ThreadLocal<int>(() => 0,  true);

       public override void ChannelRead(IChannelHandlerContext context, object  message)

       {

         var buffer = message as IByteBuffer;

            serverctx = context;

            if (buffer != null)

            {

                byte[] tempbytes = new byte[buffer.ReadableBytes];

                buffer.GetBytes(buffer.ReaderIndex, tempbytes);

                ExecuteBusinessesAsync(context, message, tempbytes);

            }

        }

        private async Task ExecuteBusinessesAsync(IChannelHandlerContext context,  object message, byte[] tempbytes)

        {

            clientBytes.Value = clientBytes.Value.Concat(tempbytes).ToArray();

           if (clientBytesLength.Value <= clientBytes.Value.Length)

            {

                switch (typemodel)

                {

                    case 0:

                        await Transfer(context,  clientBytes.Value).ConfigureAwait(true);   //启动客户端

                        break;

                    case 1:

                        ProxyModel(context, tempbytes);

                        break;

                    default:

                        break;

                }

                clientBytes.Value = Array.Empty<byte>();

                clientBytesLength.Value = 0;

                ReferenceCountUtil.Release(message);

            }

        }

        private async Task Transfer(IChannelHandlerContext context, byte[]  byteBuffer)

        {

             await DotNettyBusiness.StartClient(MainWindow._IP, MainWindow._Port,  byteBuffer).ConfigureAwait(true);

        }


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。