【愚公系列】2022年06月 .NET架构班 076-分布式中间件 ScheduleMaster的执行原理

举报
愚公搬代码 发表于 2022/06/30 20:01:38 2022/06/30
【摘要】 一、ScheduleMaster的执行原理 1.全局架构设计任务全局执行流程:客户端=>master=>work=>调用接口1、master节点主要做了四件事情选择work节点指定work执行任务对work节点进行健康检查对任务进行故障转移2、work节点主要做了四件事情取出任务配置信息使用Quartz根据配置运行任务使用反射调用程序集使用httpclient调用http 接口 2.数据库...

一、ScheduleMaster的执行原理

1.全局架构设计

在这里插入图片描述

任务全局执行流程:客户端=>master=>work=>调用接口
1、master节点主要做了四件事情

  • 选择work节点
  • 指定work执行任务
  • 对work节点进行健康检查
  • 对任务进行故障转移

2、work节点主要做了四件事情

  • 取出任务配置信息
  • 使用Quartz根据配置运行任务
  • 使用反射调用程序集
  • 使用httpclient调用http 接口

2.数据库设计

在这里插入图片描述
如图所示:

表结构设计为3大块组成

  1. 任务表 :任务表以schedules表为代表
  2. 节点表:节点表以servernodes表为代表
  3. 系统表:系统表以系systemusers为代表

这三个表为主表,这三个表在启动Hos.ScheduleMaster.Web项目的时候,会启动进行创建。记录了任务信息,节点信息,用户信息。

3.添加任务原理

1、进入ScheduleController控制器中,找到Create方法
在这里插入图片描述

/// <summary>
/// 创建任务
/// </summary>
/// <param name="task"></param>
/// <returns></returns>
[HttpPost]
public async Task<ActionResult> Create(ScheduleInfo task)
{
    if (!ModelState.IsValid)
    {
        return this.JsonNet(false, "数据验证失败!");
    }
    var admin = CurrentAdmin;
    ScheduleEntity main = new ScheduleEntity
    {
        MetaType = task.MetaType,
        CronExpression = task.CronExpression,
        EndDate = task.EndDate,
        Remark = task.Remark,
        StartDate = task.StartDate,
        Title = task.Title,
        Status = (int)ScheduleStatus.Stop,
        CustomParamsJson = task.CustomParamsJson,
        RunLoop = task.RunLoop,
        TotalRunCount = 0,
        CreateUserName = admin.UserName
    };
    if (task.MetaType == (int)ScheduleMetaType.Assembly)
    {
        main.AssemblyName = task.AssemblyName;
        main.ClassName = task.ClassName;
    }
    ScheduleHttpOptionEntity httpOption = null;
    if (task.MetaType == (int)ScheduleMetaType.Http)
    {
        httpOption = new ScheduleHttpOptionEntity
        {
            RequestUrl = task.HttpRequestUrl,
            Method = task.HttpMethod,
            ContentType = task.HttpContentType,
            Headers = task.HttpHeaders,
            Body = task.HttpBody
        };
    }
    var result = _scheduleService.Add(main, httpOption, task.Keepers, task.Nexts, task.Executors);
    if (result.Status == ResultStatus.Success)
    {
        if (task.RunNow)
        {
            var start = await _scheduleService.Start(main);
            return this.JsonNet(true, "任务创建成功!启动状态为:" + (start.Status == ResultStatus.Success ? "成功" : "失败"), Url.Action("Index"));
        }
        return this.JsonNet(true, "任务创建成功!", Url.Action("Index"));
    }
    return this.JsonNet(false, "任务创建失败!");
}

2、进入ScheduleService类中,找到Add方法
在这里插入图片描述

/// <summary>
/// 添加一个任务
/// </summary>
/// <param name="model"></param>
/// <param name="httpOption"></param>
/// <param name="keepers"></param>
/// <param name="nexts"></param>
/// <param name="executors"></param>
/// <returns></returns>
public ServiceResponseMessage Add(ScheduleEntity model, ScheduleHttpOptionEntity httpOption, List<int> keepers, List<Guid> nexts, List<string> executors = null)
{
    if (executors == null || !executors.Any())
    {
        //没有指定worker就根据权重选择2个
        executors = _nodeService.GetAvaliableWorkerByPriority(null, 2).Select(x => x.NodeName).ToList();
    }
    if (!executors.Any())
    {
        return ServiceResult(ResultStatus.Failed, "没有可用节点!");
    }
    model.CreateTime = DateTime.Now;
    var user = _repositoryFactory.SystemUsers.FirstOrDefault(x => x.UserName == model.CreateUserName);
    if (user != null)
    {
        model.CreateUserId = user.Id;
    }
    //保存主信息
    _repositoryFactory.Schedules.Add(model);
    //创建并保存任务锁
    _repositoryFactory.ScheduleLocks.Add(new ScheduleLockEntity { ScheduleId = model.Id, Status = 0 });
    //保存http数据
    if (httpOption != null)
    {
        httpOption.ScheduleId = model.Id;
        _repositoryFactory.ScheduleHttpOptions.Add(httpOption);
    }
    //保存运行节点
    _repositoryFactory.ScheduleExecutors.AddRange(executors.Select(x => new ScheduleExecutorEntity
    {
        ScheduleId = model.Id,
        WorkerName = x
    }));
    //保存监护人
    if (keepers != null && keepers.Count > 0)
    {
        _repositoryFactory.ScheduleKeepers.AddRange(keepers.Select(x => new ScheduleKeeperEntity
        {
            ScheduleId = model.Id,
            UserId = x
        }));
    }
    //保存子任务
    if (nexts != null && nexts.Count > 0)
    {
        _repositoryFactory.ScheduleReferences.AddRange(nexts.Select(x => new ScheduleReferenceEntity
        {
            ScheduleId = model.Id,
            ChildId = x
        }));
    }
    //事务提交
    if (_unitOfWork.Commit() > 0)
    {
        return ServiceResult(ResultStatus.Success, "任务创建成功!", model.Id);
    }
    return ServiceResult(ResultStatus.Failed, "数据保存失败!");
}

3、进入RepositoryFactory类中,找到
在这里插入图片描述

在这里插入图片描述

4.任务启动原理

1、进入ScheduleController控制器中,找到_scheduleService.Add()方法
在这里插入图片描述

2、然后进入到IScheduleService中,找到Start方法

/// <summary>
/// 启动一个任务
/// </summary>
/// <param name="model"></param>
/// <returns></returns>
public async Task<ServiceResponseMessage> Start(ScheduleEntity model)
{
    if (model == null) return ServiceResult(ResultStatus.Failed, "任务信息不能为空!");
    if (model.Status != (int)ScheduleStatus.Stop)
    {
        return ServiceResult(ResultStatus.Failed, "任务在停止状态下才能启动!");
    }
    if (model.EndDate.HasValue && model.EndDate < DateTime.Now)
    {
        return ServiceResult(ResultStatus.Failed, "任务结束时间不能小于当前时间!");
    }
    return await InnerStart(model.Id);
}

3、然后在Start方法中,找到InnerStart方法

private async Task<ServiceResponseMessage> InnerStart(Guid sid)
{
    //启动任务
    bool success = await _workerDispatcher.ScheduleStart(sid);
    if (success)
    {
        //启动成功后更新任务状态为运行中
        _repositoryFactory.Schedules.UpdateBy(m => m.Id == sid, m => new ScheduleEntity
        {
            Status = (int)ScheduleStatus.Running
        });
        if (await _unitOfWork.CommitAsync() > 0)
        {
            return ServiceResult(ResultStatus.Success, "任务启动成功!");
        }
        return ServiceResult(ResultStatus.Failed, "更新任务状态失败!");
    }
    else
    {
        await _workerDispatcher.ScheduleStop(sid);
        _repositoryFactory.Schedules.UpdateBy(m => m.Id == sid, m => new ScheduleEntity
        {
            Status = (int)ScheduleStatus.Stop,
            NextRunTime = null
        });
        await _unitOfWork.CommitAsync();
        return ServiceResult(ResultStatus.Failed, "任务启动失败!");
    }
}

4、然后在InnerStart方法中,找到WorkerDispatcher类
在这里插入图片描述
​ 5、然后在WorkerDispatcher类中,找到ScheduleStart方法

public async Task<bool> ScheduleStart(Guid sid)
{
    return await DispatcherHandler(sid, async (ServerNodeEntity node) =>
     {
         _scheduleClient.Server = node;
         return await _scheduleClient.Start(sid);
     });
}

6、然后在ScheduleStart方法中,找到DispatcherHandler

private async Task<bool> DispatcherHandler(Guid sid, RequestDelegate func)
{
    var nodeList = _nodeService.GetAvaliableWorkerForSchedule(sid);
    if (nodeList.Any())
    {
        foreach (var item in nodeList)
        {
            if (!await func(item))
            {
                return false;
            }
        }
        return true;
    }
    throw new InvalidOperationException("running worker not found.");
}

7、然后在ScheduleStart方法中,找到ScheduleServiceClient类
在这里插入图片描述
8、然后在ScheduleServiceClient类中,找到Start方法

public async Task<bool> Start(Guid sid)
        {
            return await PostRequest("/api/quartz/start", sid);
        }

9、进入到Hos.ScheduleMaster.QuartzHost项目中,找到QuartzController类
在这里插入图片描述
10、进入到QuartzController类中,找到Start方法

[HttpPost]
public async Task<IActionResult> Start([FromForm]Guid sid)
{
    bool success = await QuartzManager.StartWithRetry(sid);
    if (success) return Ok();
    return BadRequest();
}

11、进入到Start方法中,找到QuartzManager类
在这里插入图片描述
12、进入到QuartzManager类,找到StartWithRetry方法

/// <summary>
        /// 启动一个任务,带重试机制
        /// </summary>
        /// <param name="task"></param>
        /// <returns></returns>
        public static async Task<bool> StartWithRetry(Guid sid)
        {
            var jk = new JobKey(sid.ToString().ToLower());
            if (await _scheduler.CheckExists(jk))
            {
                return true;
            }
            ScheduleContext context = GetScheduleContext(sid);
            IHosSchedule schedule = await HosScheduleFactory.GetHosSchedule(context);
            try
            {
                for (int i = 0; i < 3; i++)
                {
                    try
                    {
                        await Start(schedule);
                        return true;
                    }
                    catch (SchedulerException sexp)
                    {
                        LogHelper.Error($"任务启动失败!开始第{i + 1}次重试...", sexp, context.Schedule.Id);
                    }
                }
                //最后一次尝试
                await Start(schedule);
                return true;
            }
            catch (SchedulerException sexp)
            {
                LogHelper.Error($"任务所有重试都失败了,已放弃启动!", sexp, context.Schedule.Id);
                return false;
            }
            catch (Exception exp)
            {
                LogHelper.Error($"任务启动失败!", exp, context.Schedule.Id);
                return false;
            }
        }

13、进入到StartWithRetry方法中,找到HosScheduleFactory类
在这里插入图片描述
14、进入到HosScheduleFactory类中,找到GetHosSchedule方法

public static async Task<IHosSchedule> GetHosSchedule(ScheduleContext context)
        {
            IHosSchedule result;
            switch ((ScheduleMetaType)context.Schedule.MetaType)
            {
                case ScheduleMetaType.Assembly:
                    {
                        result = new AssemblySchedule();
                        await LoadPluginFile(context.Schedule);
                        break;
                    }
                case ScheduleMetaType.Http:
                    {
                        result = new HttpSchedule();
                        break;
                    }
                default: throw new InvalidOperationException("unknown schedule type.");
            }
            result.Main = context.Schedule;
            result.CustomParams = ConvertParamsJson(context.Schedule.CustomParamsJson);
            result.Keepers = context.Keepers;
            result.Children = context.Children;
            result.CancellationTokenSource = new System.Threading.CancellationTokenSource();
            result.CreateRunnableInstance(context);
            result.RunnableInstance.TaskId = context.Schedule.Id;
            result.RunnableInstance.CancellationToken = result.CancellationTokenSource.Token;
            result.RunnableInstance.Initialize();
            return result;
        }	

15、进入到QuartzManager类,找到Start方法

private static async Task Start(IHosSchedule schedule)
        {
            JobDataMap map = new JobDataMap
            {
                new KeyValuePair<string, object> ("instance",schedule),
            };
            string jobKey = schedule.Main.Id.ToString();
            try
            {
                IJobDetail job = JobBuilder.Create().OfType(schedule.GetQuartzJobType()).WithIdentity(jobKey).UsingJobData(map).Build();
			//添加监听器
            var listener = new JobRunListener(jobKey);
            listener.OnSuccess += StartedEvent;
            _scheduler.ListenerManager.AddJobListener(listener, KeyMatcher<JobKey>.KeyEquals(new JobKey(jobKey)));

            ITrigger trigger = GetTrigger(schedule.Main);
            await _scheduler.ScheduleJob(job, trigger, schedule.CancellationTokenSource.Token);

            using (var scope = new Core.ScopeDbContext())
            {
                var db = scope.GetDbContext();
                var task = db.Schedules.FirstOrDefault(x => x.Id == schedule.Main.Id);
                if (task != null)
                {
                    task.NextRunTime = TimeZoneInfo.ConvertTimeFromUtc(trigger.GetNextFireTimeUtc().Value.UtcDateTime, TimeZoneInfo.Local);
                    await db.SaveChangesAsync();
                }
            }
        }
        catch (Exception ex)
        {
            throw new SchedulerException(ex);
        }
        LogHelper.Info($"任务[{schedule.Main.Title}]启动成功!", schedule.Main.Id);

        _ = Task.Run(async () =>
          {
              while (true)
              {
                  if (schedule.RunnableInstance == null) break;
                  var log = schedule.RunnableInstance.ReadLog();
                  if (log != null)
                  {
                      LogManager.Queue.Write(new SystemLogEntity
                      {
                          Category = log.Category,
                          Message = log.Message,
                          ScheduleId = log.ScheduleId,
                          Node = log.Node,
                          StackTrace = log.StackTrace,
                          TraceId = log.TraceId,
                          CreateTime = log.CreateTime
                      });
                  }
                  else
                  {
                      await Task.Delay(3000);
                  }
              }
          });
    }

16、Start方法为最核心方法。使用Quartz框架进行任务调度
在这里插入图片描述

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。