(精华)2020年10月1日 微服务 事件总线

举报
愚公搬代码 发表于 2021/10/19 00:25:01 2021/10/19
【摘要】 事件总线 什么是事物 例如:事物 所有看到的一切都是事物,不能看到的也是事物 例如:团队微服务,成员微服务,聚合微服务,网关api,认证中心等等包括类,对象 所有的事件都是事物变化的结果 大家接...

事件总线

什么是事物

例如:事物 所有看到的一切都是事物,不能看到的也是事物

例如:团队微服务,成员微服务,聚合微服务,网关api,认证中心等等包括类,对象

所有的事件都是事物变化的结果

大家接触事件最早就是在js 或者是c#高级特性。大家对于事件不默认,但是对于事件不是很好理解

什么是事件

事件就是指事物状态的变化,每一次事物变化的结果都称作为事件

什么是事件总线

就是用来管理所有的事件的一种机制就称作为事件总线

包括事件发布,事件存储,事件订阅,事件处理的统称

作用:

事件总线是一种机制,它允许不同的组件彼此通信而不彼此了解。 组件可以将事件发送到Eventbus,而无需知道是谁来接听或有多少其他人来接听。 组件也可以侦听Eventbus上的事件,而无需知道谁发送了事件。 这样,组件可以相互通信而无需相互依赖。 同样,很容易替换一个组件。 只要新组件了解正在发送和接收的事件,其他组件就永远不会知道.

为什么要使用事件总线

将微服务系统各组件之间进行解耦

使用业务的发展来说

事件总线框架

CAP

masstransit

CAP内部概念

事件 : 就是一些状态信息

发布者:发布事件的角色 cap

订阅者:订阅消费事件的角色 cap

消息传输器:传输事件

消息存储器:存储事件

CAP存储事件消息队列类型Transport

Azure

rabbitmq

kafaka

In Memory Queue

CAP存储事件持久化类型

SQL Server

MySQL

PostgreSQL

MongoDB

InMemoryStorage

CAP事件监控

Dashboard

微服务系统中如何使用CAP

条件

1、微服务系统

2、RabbitMQ

3、SqlServer

4、CAP

步骤

1、微服务系统准备

​ 微服务系统全部准备完毕

2、RabbitMQ准备

​ 2.1 环境准备

​ Erlang下载地址:https://www.erlang.org/downloads

​ RabbitMQ下载地址:https://www.rabbitmq.com/download.html

​ 2.2 RabbitMQ 启动

​ 1、在安装目录下添加可视化插件

rabbitmq-plugins enable rabbitmq_management

  
 
  • 1

​ 2、在安装目录下启动

	rabbitmq-server 

  
 
  • 1

​ 3、查看rabbitmq状态

    rabbitmqctl status

  
 
  • 1

​ 4、在浏览器输入http://127.0.0.1:15672

​ 访问rabbitmq后台系统

3、SqlServer准备

​ SqlServer启动,安装

4、CAP准备

​ 4.1 CAP环境

​ CAP官网地址:https://cap.dotnetcore.xyz/user-guide/zh/monitoring/dashboard/

​ 4.2 CPA配置

​ 1、在RuanMou.MicroService.Core项目中添加依赖

CAP Nuget DotNetCore.CAP
CAP传输器Nuget DotNetCore.CAP.RabbitMQ
CAP持久化DotNetCore.CAP.SqlServer

  
 
  • 1
  • 2
  • 3

​ 2、在RuanMou.MicroService.AggregateService服务中startup.cs中添加

 // 8、添加事件总线cap
            services.AddCap(x => {
                // 8.1 使用内存存储消息(消息发送失败处理)
                x.UseInMemoryStorage();

                // 8.2 使用RabbitMQ进行事件中心处理
                x.UseRabbitMQ(rb => {
                    rb.HostName = "localhost";
                    rb.UserName = "guest";
                    rb.Password = "guest";
                    rb.Port = 5672;
                    rb.VirtualHost = "/";
                });
            });

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

​ 2.1 在AggregateController.cs中注入ICapPublisher

    private readonly ICapPublisher capPublisher;
public TeamsController(ICapPublisher capPublisher)
    {
        this.capPublisher = capPublisher;
    }	

  
 
  • 1
  • 2
  • 3
  • 4
  • 5

​ 3、在RuanMou.MicroService.VideoService服务startup.cs中添加

 // 8、添加事件总线cap
            services.AddCap(x => {
                // 8.1 使用RabbitMQ进行事件中心处理
                x.UseRabbitMQ(rb => {
                    rb.HostName = "localhost";
                    rb.UserName = "guest";
                    rb.Password = "guest";
                    rb.Port = 5672;
                    rb.VirtualHost = "/";
                });
            });		

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

​ 3.1 在VideoController.cs 中方法上添加特性[CapSubscribe]

       /// <summary>
        /// 视频添加(异步添加)
        /// </summary>
        /// <param name="Video"></param>
        /// <returns></returns>
        [NonAction]
        [CapSubscribe("tontcap")]
        public ActionResult<Video> PostVideo(Video Video)
        {
            videoService.Create(Video);
       return CreatedAtAction("GetVideo", new { id = Video.Id }, Video);
    }	

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

​ 4、效果展示​

RabbitMQ宕机情况

步骤

1、将RabbitMQ直接关闭

事件消息无法发送,存储到内存缓存中

2、当将RabbitMQ启动后,消息正常发送

​ 内部使用定时器轮询机制实现

AggregateService宕机情况

AggregateService 执行业务成功,发送消息前宕机

使用本地消息表解决(思想:持久化操作)

条件

1、本地消息表

步骤

1、在RuanMou.MicroService.AggregateService服务中

​ 1.1 创建Context文件,然后在Context文件夹内创建AggregateContext

/// <summary>
    /// Aggregate服务上下文
    /// </summary>
    public class AggregateContext : DbContext
    {
        public AggregateContext(DbContextOptions<AggregateContext> options) : base(options)
        {
        }
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

​ 1.2 在appsettings.json中添加

{
  "ConnectionStrings": {
    "DefaultConnection": "Data Source=.;Initial Catalog=aggregateservice;Persist Security Info=True;User ID=sa;Password=tony"
  }
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5

​ 1.3 在startup.cs中添加消息持久化

         // 9、注册上下文到IOC容器
            services.AddDbContext<AggregateContext>(options =>
            {
                options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
            });
        // 8、添加事件总线cap
        services.AddCap(x => {
            // 8.1 使用EntityFramework进行存储操作
            x.UseEntityFramework<AggregateContext>();
            // 8.2 使用sqlserver进行事务处理
            x.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
        // 8.2 使用RabbitMQ进行事件中心处理
        x.UseRabbitMQ(rb => {
            rb.HostName = "localhost";
            rb.UserName = "guest";
            rb.Password = "guest";
            rb.Port = 5672;
            rb.VirtualHost = "/";
        });
    });

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

​ 1.4测试演示效果

​ 数据库中多了两张表

在这里插入图片描述

​ 当业务执行成功,发送消息时,聚合微服务宕机,消息被持久化到数据库

​ 当重启聚合微服务时,消息发送成功,被成功消费

​ 1.5 原理

​ 1、定时器 消息重试

​ 2、幂等性 一个函数每次都是相同的结果,状态只有一个

VideoService宕机情况

VideoService接受消息失败

当VideoService直接宕机的时候接受消息失败,

然后重启VideoService消息消费成功

VideoService接受消息成功执行失败

条件

1、本地消息表

步骤

1、在RuanMou.MicroService.Core项目中

​ 1.1 安装SqlServer

Nuget DotNetCore.CAP.SqlServer

  
 
  • 1

2、在RuanMou.MicroService.VideoService项目中

​ 2.1 在startup.cs中添加消息持久化

    // 8、添加事件总线cap
    services.AddCap(x => {
        // 8.1 使用EntityFramework进行存储操作
        x.UseEntityFramework<AggregateContext>();
        // 8.2 使用sqlserver进行事务处理
        x.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
    // 8.2 使用RabbitMQ进行事件中心处理
    x.UseRabbitMQ(rb => {
        rb.HostName = "localhost";
        rb.UserName = "guest";
        rb.Password = "guest";
        rb.Port = 5672;
        rb.VirtualHost = "/";
    });
});

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

​ 2.2 效果展示

​ 数据库多了两张表

在这里插入图片描述

​ 2.3 原理

​ 1、定时器 消息重试

​ 2、幂等性 一个函数每次都是相同的结果,状态只有一个

消息重试完还是消费失败情况

使用人工干预实现

条件

1、Dashboard – 后台管理页面

步骤

1、在RuanMou.MicroService.Core项目中

​ 1.1 安装Dashboard

Nuget DotNetCore.CAP.Dashboard

  
 
  • 1

2、在RuanMou.MicroService.VideoService项目中

​ 2.1 在startup.cs中添加Dashboard

// 8、添加事件总线cap
services.AddCap(x => {
    // 8.1 使用EntityFramework进行存储操作
    x.UseEntityFramework<AggregateContext>();
    // 8.2 使用sqlserver进行事务处理
    x.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
// 8.3 使用RabbitMQ进行事件中心处理
x.UseRabbitMQ(rb => {
    rb.HostName = "localhost";
    rb.UserName = "guest";
    rb.Password = "guest";
    rb.Port = 5672;
    rb.VirtualHost = "/";
});

// 8.4添加cap后台监控页面
    x.UseDashboard();
});

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

​ 2.2 运行打开cap后台监控页面

​ http://localhost:5007/cap

​ 对于发送失败的消息进行重复发送

​ 对于消费失败的消息进行重复消费

文章来源: codeboy.blog.csdn.net,作者:愚公搬代码,版权归原作者所有,如需转载,请联系作者。

原文链接:codeboy.blog.csdn.net/article/details/108437402

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。