源码角度了解Skywalking之服务端OAP对服务注册与服务实例注册的处理
源码角度了解Skywalking之服务端OAP对服务注册与服务实例注册的处理
我们了解到Skywalking的agent在进行启动初始化的时候会对服务进行注册,对应的逻辑是ServiceAndEndpointRegisterClient的run()方法
服务注册
ServiceAndEndpointRegisterClient的run()方法中会调用RegisterServiceHandler的doServiceRegister()方法进行注册
RegisterServiceHandler的doServiceRegister()方法:
@Override public void doServiceRegister(Services request, StreamObserver<ServiceRegisterMapping> responseObserver) {
ServiceRegisterMapping.Builder builder = ServiceRegisterMapping.newBuilder();
request.getServicesList().forEach(service -> {
String serviceName = service.getServiceName();
if (logger.isDebugEnabled()) {
logger.debug("Register service, service code: {}", serviceName);
}
int serviceId = serviceInventoryRegister.getOrCreate(serviceName, null);
if (serviceId != Const.NONE) {
KeyIntValuePair value = KeyIntValuePair.newBuilder().setKey(serviceName).setValue(serviceId).build();
builder.addServices(value);
}
});
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
- 从请求中获取服务名
- 调用serviceInventoryRegister的getOrCreate()方法根据服务名获取服务的id
- 将服务id返回给Agent
获取服务ID
serviceInventoryRegister的getOrCreate()
@Override public int getOrCreate(String serviceName, JsonObject properties) {
int serviceId = getServiceInventoryCache().getServiceId(serviceName);
if (serviceId == Const.NONE) {
ServiceInventory serviceInventory = new ServiceInventory();
serviceInventory.setName(serviceName);
serviceInventory.setAddressId(Const.NONE);
serviceInventory.setIsAddress(BooleanUtils.FALSE);
long now = System.currentTimeMillis();
serviceInventory.setRegisterTime(now);
serviceInventory.setHeartbeatTime(now);
serviceInventory.setMappingServiceId(Const.NONE);
serviceInventory.setLastUpdateTime(now);
serviceInventory.setProperties(properties);
InventoryStreamProcessor.getInstance().in(serviceInventory);
}
return serviceId;
}
- 获取ServiceInventoryCache缓存实例,调用ServiceInventoryCache对象的getServiceId()方法获取serviceId,这个方法中会先从缓存中查找,找到直接返回,没有找到就会调用ServiceInventoryCacheEsDAO的get()方法,也就是通过ElasticSearch客户端查询根据指定的Document Id进行查询sequence,返回结果并放入缓存,下次从缓存中直接获取到。
- 如果serviceId为空,创建ServiceInventory对象,设置相关参数,调用InventoryStreamProcessor实例的in()方法对该服务生成serviceId中,放入缓存中下次直接能取到,底层其实是调用的DataCarrier的produce()方法将数据放入缓存中,等待消费者消费创建serviceId。InventoryStreamProcessor是异步的过程,不会阻塞当前方法。
从ServiceAndEndpointRegisterClient的run()方法的逻辑我们知道如果serviceId返回0的话,会创建线程不断重试,直到服务Id都不会空
服务实例注册
ServiceAndEndpointRegisterClient的run()方法中会调用RegisterServiceHandler的doServiceInstanceRegister()方法进行服务实例注册
RegisterServiceHandler的doServiceInstanceRegister()方法:
- 遍历请求的服务实例的集合,根据请求中的服务ID从缓存中获取ServiceInventory对象,这个对象是服务的相关信息
- 整合request中的host_name、os_name、ip地址、语言、进程ID等参数,放入JsonObject对象中
- 根据服务名、进程ID和主机名构建服务实例名称
- 根据请求参数调用ServiceInstanceInventoryRegister的getOrCreate()方法获取服务实例的id
- 将服务实例Id返回给Agent
获取服务实例的ID
ServiceInstanceInventoryRegister的getOrCreate()方法:
@Override public int getOrCreate(int serviceId, String serviceInstanceName, String uuid, long registerTime,
JsonObject properties) {
if (logger.isDebugEnabled()) {
logger.debug("Get or create service instance by service instance name, service id: {}, service instance name: {},uuid: {}, registerTime: {}", serviceId, serviceInstanceName, uuid, registerTime);
}
int serviceInstanceId = getServiceInstanceInventoryCache().getServiceInstanceId(serviceId, uuid);
if (serviceInstanceId == Const.NONE) {
ServiceInstanceInventory serviceInstanceInventory = new ServiceInstanceInventory();
serviceInstanceInventory.setServiceId(serviceId);
serviceInstanceInventory.setName(serviceInstanceName);
serviceInstanceInventory.setInstanceUUID(uuid);
serviceInstanceInventory.setIsAddress(BooleanUtils.FALSE);
serviceInstanceInventory.setAddressId(Const.NONE);
serviceInstanceInventory.setMappingServiceInstanceId(Const.NONE);
serviceInstanceInventory.setRegisterTime(registerTime);
serviceInstanceInventory.setHeartbeatTime(registerTime);
serviceInstanceInventory.setProperties(properties);
InventoryStreamProcessor.getInstance().in(serviceInstanceInventory);
}
return serviceInstanceId;
}
- 获取ServiceInstanceInventoryCache缓存实例,调用ServiceInstanceInventoryCache对象的getServiceInstanceId()方法获取服务实例ID,这个方法中会先从缓存中查找,找到直接返回,没有找到就会调用ServiceInstanceInventoryCacheDAO的get()方法,通过是通过ElasticSearch客户端查询根据指定的Document Id进行查询sequence,返回结果并放入缓存,下次从缓存中直接获取到。
- 如果serviceId为空,创建ServiceInstanceInventory对象,设置相关参数,调用InventoryStreamProcessor实例的in()方法对该服务生成服务实例ID,放入缓存中下次直接能取到,底层其实是调用的DataCarrier的produce()方法将数据放入缓存中,等待消费者消费创建服务实例ID。InventoryStreamProcessor是异步的过程,不会阻塞当前方法。
同样的,我们看ServiceAndEndpointRegisterClient的run()方法中的逻辑,如果服务实例ID为空的话会一直进行重试,直到服务实例ID不为空
总结
这篇文章主要介绍了服务端对服务注册和服务实例注册的处理逻辑,这两个方法的整体逻辑是非相似,获取服务Id或者服务实例ID都是先从缓存中获取,缓存中没有就从es数据库中查询,查询到放入缓存,如果es数据库中也没有就会创建异步操作的InventoryStreamProcessor创建服务ID或服务实例ID
- 点赞
- 收藏
- 关注作者
评论(0)