【愚公系列】2022年06月 .NET架构班 079-分布式中间件 ScheduleMaster的集群原理
【摘要】 一、ScheduleMaster的集群原理当worker1宕机,任务会自动切换到worker2上面运行,当worker2宕机,任务会自动切换到worker1上面运行,主要依靠什么什么做到的?核心:健康检测,故障转移1、进入到Hos.ScheduleMaster.Web项目中,找到SystemSchedulerRegistry类2、然后SystemSchedulerRegistry找到Wor...
一、ScheduleMaster的集群原理
当worker1宕机,任务会自动切换到worker2上面运行,
当worker2宕机,任务会自动切换到worker1上面运行,主要依靠什么什么做到的?核心:健康检测,故障转移
1、进入到Hos.ScheduleMaster.Web项目中,找到SystemSchedulerRegistry类
2、然后SystemSchedulerRegistry找到WorkerCheckJob类
internal class WorkerCheckJob : IJob
{
/// <summary>
/// 执行计划
/// </summary>
public void Execute()
{
using (var scope = ConfigurationCache.RootServiceProvider.CreateScope())
{
Core.Interface.INodeService service = scope.ServiceProvider.GetService<Core.Interface.INodeService>();
AutowiredServiceProvider provider = new AutowiredServiceProvider();
provider.PropertyActivate(service, scope.ServiceProvider);
service.WorkerHealthCheck();
}
}
}
3、然后在WorkerCheckJob类中找到NodeService,进入到Hos.ScheduleMaster.Core项目
4、然后进入到NodeService类中,找到WorkerHealthCheck方法
/// <summary>
/// worker健康检查
/// </summary>
public void WorkerHealthCheck()
{
var workers = _repositoryFactory.ServerNodes.Where(x => x.NodeType == "worker" && x.Status != 0).ToList();
if (!workers.Any())
{
return;
}
//允许最大失败次数
int allowMaxFailed = ConfigurationCache.GetField<int>("System_WorkerUnHealthTimes");
if (allowMaxFailed <= 0) allowMaxFailed = 3;
//遍历处理
workers.ForEach(async (w) =>
{
using (var scope = new Core.ScopeDbContext())
{
var db = scope.GetDbContext();
_serverClient.Server = w;
//初始化计数器
ConfigurationCache.WorkerUnHealthCounter.TryAdd(w.NodeName, 0);
var success = await _serverClient.HealthCheck();
if (success)
{
w.LastUpdateTime = DateTime.Now;
db.ServerNodes.Update(w);
await db.SaveChangesAsync();
ConfigurationCache.WorkerUnHealthCounter[w.NodeName] = 0;
}
else
{
//获取已失败次数
int failedCount = ConfigurationCache.WorkerUnHealthCounter[w.NodeName];
System.Threading.Interlocked.Increment(ref failedCount);
if (failedCount >= allowMaxFailed)
{
w.Status = 0;//标记下线,实际上可能存在因为网络抖动等原因导致检查失败但worker进程还在运行的情况
db.ServerNodes.Update(w);
//释放该节点占据的锁
var locks = db.ScheduleLocks.Where(x => x.LockedNode == w.NodeName && x.Status == 1).ToList();
locks.ForEach(x =>
{
x.Status = 0;
x.LockedNode = null;
x.LockedTime = null;
});
db.ScheduleLocks.UpdateRange(locks);
await db.SaveChangesAsync();
//重置计数器
ConfigurationCache.WorkerUnHealthCounter[w.NodeName] = 0;
}
else
{
ConfigurationCache.WorkerUnHealthCounter[w.NodeName] = failedCount;
}
}
}
});
}
5、然后在WorkerHealthCheck方法中,找到ServerClient类
6、然后在ServerClient类中,找到HealthCheck方法
public async Task<bool> HealthCheck()
{
HttpClient client = CreateClient();
try
{
var response = await client.GetAsync("/health");
return await ClientResponse(response);
}
catch (Exception ex)
{
Log.LogHelper.Warn($"请求地址:{client.BaseAddress.ToString()}/health,响应消息:{(ex.InnerException ?? ex).Message}");
return false;
}
}
7、在Hos.ScheduleMaster.QuartzHost项目中,找到Startup类
然后在Startup类中,进入ConfigureServices方法中,找到AddHealthChecks方法
然后在Startup类中,进入Configure方法中,找到MapHealthChecks方法
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)