Nacos心跳机制解读(含简单源码分析)
心跳机制概述
心跳机制是一种用于监测和管理微服务可用性的机制,它用来维护注册中心和服务提供者之间的连接状态,并及时更新服务实例的状态信息。
心跳机制包括两个主要组件:心跳发送方(客户端)和心跳接收方(服务端)。
每隔几分钟发送一个固定信息给服务端,服务端收到后回复一个固定信息如果服务端几分钟内没有收到客户端信息则视客户端断开。发包方可以是客户也可以是服务端
-
心跳发送方(Heartbeat Sender):每个微服务都会定期发送称为心跳消息的请求到一个中央位置(例如注册中心或负载均衡器)。这个心跳消息包含有关该微服务的健康信息,如服务是否正常运行、负载情况、资源消耗等。心跳消息的频率可以根据需求进行配置,通常是以固定的时间间隔发送。
-
心跳接收方(Heartbeat Receiver):中央位置上的组件(如注册中心或负载均衡器)负责接收并处理微服务发送的心跳消息。它会记录每个微服务的心跳,并根据心跳消息的到达情况和内容来判断微服务的可用性。如果心跳消息超过一定时间没有到达,或者心跳消息中报告了错误状态,中央位置可以采取相应的措施,如将该微服务标记为不可用、重新分配负载或发送警报通知等。
Nacos中 的 2 种健康检查机制
客户端主动上报机制:
-
客户端通过心跳上报方式告知服务端(nacos注册中心)健康状态;
-
默认心跳间隔5秒;
-
nacos会在超过15秒未收到心跳后将实例设置为不健康状态;
-
超过30秒将实例删除
服务器端反向探测机制:
-
nacos主动探知客户端健康状态,默认间隔为20秒;
-
健康检查失败后实例会被标记为不健康,不会被立即删除。
Nacos 中的健康检查机制不能主动设置,但健康检查机制是和 Nacos 的服务实例类型强相关的。 也就是说 Nacos 中的两种服务实例分别对应了两种健康检查机制:
-
临时实例(也可以叫做非持久化实例):对应的是客户端主动上报机制。
-
永久实例(也可以叫做持久化实例):服务端反向探测机制。
临时实例 配置
spring:
application:
name: order-service
cloud:
nacos:
discovery:
ephemeral: false # 设置实例为永久实例。true:临时; false:永久
server-addr: 192.168.137.2:8845
如果是临时实例,则不会在 Nacos 服务端持久化存储,需要通过上报心跳的方式进行包活,如果一段时间内没有上报心跳,则会被 Nacos 服务端摘除。在被摘除后如果又开始上报心跳,则会重新将这个实例注册。持久化实例则会持久化被 Nacos 服务端,此时即使注册实例的客户端进程不在,这个实例也不会从服务端删除,只会将健康状态设为不健康。
客户端发送心跳简单源码解析
心跳请求接口
Nacos提供的心跳的API接口为:
接口描述:发送某个实例的心跳
请求类型:PUT
请求路径:/nacos/v1/ns/instance/beat
请求参数:
名称 | 类型 | 是否必选 | 描述 |
---|---|---|---|
serviceName | 字符串 | 是 | 服务名 |
groupName | 字符串 | 否 | 分组名 |
ephemeral | boolean | 否 | 是否临时实例 |
beat | JSON格式字符串 | 是 | 实例心跳内容 |
错误编码:
错误代码 | 描述 | 语义 |
---|---|---|
400 | Bad Request | 客户端请求中的语法错误 |
403 | Forbidden | 没有权限 |
404 | Not Found | 无法找到资源 |
500 | Internal Server Error | 服务器内部错误 |
200 | OK | 正常 |
NacosNamingService
NacosNamingService这个类实现了服务的注册,同时也实现了服务心跳:
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 判断是否是临时实例。
if (instance.isEphemeral()) {
// 如果是临时实例,则构建心跳信息BeatInfo
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
// 添加心跳任务
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
serverProxy.registerService(groupedServiceName, groupName, instance);
}
BeatInfo
BeanInfo就包含心跳需要的各种信息
BeatReactor
BeatReactor
这个类则维护了一个线程池:
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
// 利用线程池,定期执行心跳任务,周期为 beatInfo.getPeriod()
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
BeatTask
跳的任务封装在BeatTask
这个类中,是一个Runnable,其run方法如下 :
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
// 获取心跳周期
long nextTime = beatInfo.getPeriod();
try {
// 发送心跳
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
// 判断心跳结果
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
// 如果失败,则需要 重新注册实例
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
} catch (Exception unknownEx) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",
JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
} finally {
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
发送心跳
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
// 组织请求参数
Map<String, String> params = new HashMap<String, String>(8);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put("ip", beatInfo.getIp());
params.put("port", String.valueOf(beatInfo.getPort()));
// 发送请求,这个地址就是:/v1/ns/instance/beat
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}
基本流程总结
-
总的来说就是在注册这个实例的时候,客户端就会创建一个心跳的实例,一起发送到这个服务端,
-
这个时候服务端会开启一个线程去执行这个客户端给服务端发送心跳的的这个延迟队列线程。
-
客户端注册到这个服务端之后,会开启一个延迟的线程池任务,在注册成功5s之后再发送这个心跳给服务端。
-
- 点赞
- 收藏
- 关注作者
评论(0)