源码角度了解Skywalking之服务端OAP对心跳请求、同步网络地址、同步端点的处理
源码角度了解Skywalking之服务端OAP对心跳请求、同步网络地址、同步端点的处理
在服务已经完成了注册和服务实例注册之后,Agent会定期发送心跳请求,也就是调用serviceInstancePingStub的doPing()方法,通知Skywalking的OAP服务Agent状态良好,同时进行网络地址的同步和端点同步。
我们看一下服务端对心跳请求的处理
服务端对心跳请求的处理
skywalking-register-receiver-plugin模块的RegisterModuleProvider的start()方法中会在GRPCServer中添加处理类ServiceInstancePingServiceHandler,
ServiceInstancePingServiceHandler的doPing()方法:
@Override
public void doPing(ServiceInstancePingPkg request, StreamObserver<Commands> responseObserver) {
int serviceInstanceId = request.getServiceInstanceId();
long heartBeatTime = request.getTime();
serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime);
ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId);
if (Objects.nonNull(serviceInstanceInventory)) {
serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);
responseObserver.onNext(Commands.getDefaultInstance());
} else {
logger.warn("Can't find service by service instance id from cache," +
" service instance id is: {}, will send a reset command to agent side", serviceInstanceId);
final ServiceResetCommand resetCommand = commandService.newResetCommand(request.getServiceInstanceId(), request.getTime(), request.getServiceInstanceUUID());
final Command command = resetCommand.serialize().build();
final Commands nextCommands = Commands.newBuilder().addCommands(command).build();
responseObserver.onNext(nextCommands);
}
responseObserver.onCompleted();
}
- 从请求中获取服务实例ID和心跳发送时间
- 调用ServiceInstanceInventoryRegister的heartbeat()更新服务实例的心跳时间,也就是对应ES中的service_instance_inventory索引的相应Document的heartbeat_time字段
- 根据服务实例ID从缓存中获取服务实例清单,从而获取服务的ID
- 调用ServiceInventoryRegister的heartbeat()更新服务的心跳时间,也就是对应ES中的service_inventory索引的相应Document的heartbeat_time字段
整体的逻辑还是比较简单的,服务端做的事情就是根据Agent请求的数据修改ES数据库中的服务的心跳时间的数据和服务实例心跳时间的数据
服务端对同步网络地址的映射关系的处理
对应的是RegisterServiceHandler的doNetworkAddressRegister()方法:
@Override
public void doNetworkAddressRegister(NetAddresses request, StreamObserver<NetAddressMapping> responseObserver) {
NetAddressMapping.Builder builder = NetAddressMapping.newBuilder();
request.getAddressesList().forEach(networkAddress -> {
int addressId = networkAddressInventoryRegister.getOrCreate(networkAddress, null);
if (addressId != Const.NONE) {
builder.addAddressIds(KeyIntValuePair.newBuilder().setKey(networkAddress).setValue(addressId));
}
});
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
这个方法比较简单就是对请求的地址集合进行遍历,调用NetworkAddressInventoryRegister的getOrCreate()方法将数据放入es数据库中,对应的是network_address_inventory索引,getOrCreate()方法会填充addressId字段,还会建立网络地址与服务的关联和网络地址与服务实例的关联。
服务端同步Endpoint的映射关系的处理
服务端对应的逻辑是RegisterServiceHandler的doEndpointRegister()方法
@Override public void doEndpointRegister(Endpoints request, StreamObserver<EndpointMapping> responseObserver) {
EndpointMapping.Builder builder = EndpointMapping.newBuilder();
request.getEndpointsList().forEach(endpoint -> {
int serviceId = endpoint.getServiceId();
String endpointName = endpoint.getEndpointName();
DetectPoint detectPoint = DetectPoint.fromNetworkProtocolDetectPoint(endpoint.getFrom());
if (DetectPoint.SERVER.equals(detectPoint)) {
int endpointId = inventoryService.getOrCreate(serviceId, endpointName, detectPoint);
if (endpointId != Const.NONE) {
builder.addElements(EndpointMappingElement.newBuilder()
.setServiceId(serviceId)
.setEndpointName(endpointName)
.setEndpointId(endpointId)
.setFrom(endpoint.getFrom()));
}
} else {
logger.warn("Unexpected endpoint register, endpoint isn't detected from server side. {}", request);
}
});
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
这个方法和同步网络地址的方法逻辑差不多,也是对请求的地址集合进行遍历,获取请求中的端点的服务ID和端点名,根据网络端口探测是客户端还是服务端,如果是服务端调用EndpointInventoryRegister的getOrCreate()方法将数据放入es数据库中,对应的是endpoint_inventory索引
总结
这篇文章主要讲了Agent初始化的过程中调用服务注册服务实例注册后,服务进行心跳请求和网络地址同步和端点同步,服务端是如何处理心跳检测、同步网络地址和同步端点的。这三个方法的逻辑都差不多,都是通过InventoryStreamProcessor来操作ES数据库,ServiceInstancePingServiceHandler处理心跳请求更新服务和服务实例的心跳时间,RegisterServiceHandler在网络地址同步方法中通过NetworkAddressInventoryRegister填充 network_address_inventory的addressId字段并创建网络地址与服务的关联和网络地址与服务实例的关联,RegisterServiceHandler在端点同步中通过EndpointInventoryRegister构建endpoint_inventory索引的数据。
- 点赞
- 收藏
- 关注作者
评论(0)