源码角度了解Skywalking之服务端OAP对心跳请求、同步网络地址、同步端点的处理

举报
周杰伦本人 发表于 2022/10/30 18:18:42 2022/10/30
【摘要】 源码角度了解Skywalking之服务端OAP对心跳请求、同步网络地址、同步端点的处理 服务端对心跳请求的处理 服务端对同步网络地址的映射关系的处理 服务端同步Endpoint的映射关系的处理 总结 源码角度了解Skywalking之服务端OAP对心跳请求、同步网络地址、同步端点的处理在服务已经完成了注册和服务实例注册之后,Agent会定期发送心跳请求,也就是调用serviceInstan...

源码角度了解Skywalking之服务端OAP对心跳请求、同步网络地址、同步端点的处理

在服务已经完成了注册和服务实例注册之后,Agent会定期发送心跳请求,也就是调用serviceInstancePingStub的doPing()方法,通知Skywalking的OAP服务Agent状态良好,同时进行网络地址的同步和端点同步。

我们看一下服务端对心跳请求的处理

服务端对心跳请求的处理

skywalking-register-receiver-plugin模块的RegisterModuleProvider的start()方法中会在GRPCServer中添加处理类ServiceInstancePingServiceHandler,

ServiceInstancePingServiceHandler的doPing()方法:

@Override 
public void doPing(ServiceInstancePingPkg request, StreamObserver<Commands> responseObserver) {
        int serviceInstanceId = request.getServiceInstanceId();
        long heartBeatTime = request.getTime();
        serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime);

        ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId);
        if (Objects.nonNull(serviceInstanceInventory)) {
            serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);
            responseObserver.onNext(Commands.getDefaultInstance());
        } else {
            logger.warn("Can't find service by service instance id from cache," +
                " service instance id is: {}, will send a reset command to agent side", serviceInstanceId);

            final ServiceResetCommand resetCommand = commandService.newResetCommand(request.getServiceInstanceId(), request.getTime(), request.getServiceInstanceUUID());
            final Command command = resetCommand.serialize().build();
            final Commands nextCommands = Commands.newBuilder().addCommands(command).build();
            responseObserver.onNext(nextCommands);
        }

        responseObserver.onCompleted();
    }
  1. 从请求中获取服务实例ID和心跳发送时间
  2. 调用ServiceInstanceInventoryRegister的heartbeat()更新服务实例的心跳时间,也就是对应ES中的service_instance_inventory索引的相应Document的heartbeat_time字段
  3. 根据服务实例ID从缓存中获取服务实例清单,从而获取服务的ID
  4. 调用ServiceInventoryRegister的heartbeat()更新服务的心跳时间,也就是对应ES中的service_inventory索引的相应Document的heartbeat_time字段

整体的逻辑还是比较简单的,服务端做的事情就是根据Agent请求的数据修改ES数据库中的服务的心跳时间的数据和服务实例心跳时间的数据

服务端对同步网络地址的映射关系的处理

对应的是RegisterServiceHandler的doNetworkAddressRegister()方法:

@Override
    public void doNetworkAddressRegister(NetAddresses request, StreamObserver<NetAddressMapping> responseObserver) {
        NetAddressMapping.Builder builder = NetAddressMapping.newBuilder();

        request.getAddressesList().forEach(networkAddress -> {
            int addressId = networkAddressInventoryRegister.getOrCreate(networkAddress, null);

            if (addressId != Const.NONE) {
                builder.addAddressIds(KeyIntValuePair.newBuilder().setKey(networkAddress).setValue(addressId));
            }
        });

        responseObserver.onNext(builder.build());
        responseObserver.onCompleted();
    }

这个方法比较简单就是对请求的地址集合进行遍历,调用NetworkAddressInventoryRegister的getOrCreate()方法将数据放入es数据库中,对应的是network_address_inventory索引,getOrCreate()方法会填充addressId字段,还会建立网络地址与服务的关联和网络地址与服务实例的关联。

服务端同步Endpoint的映射关系的处理

服务端对应的逻辑是RegisterServiceHandler的doEndpointRegister()方法

@Override public void doEndpointRegister(Endpoints request, StreamObserver<EndpointMapping> responseObserver) {
        EndpointMapping.Builder builder = EndpointMapping.newBuilder();

        request.getEndpointsList().forEach(endpoint -> {
            int serviceId = endpoint.getServiceId();
            String endpointName = endpoint.getEndpointName();

            DetectPoint detectPoint = DetectPoint.fromNetworkProtocolDetectPoint(endpoint.getFrom());
            if (DetectPoint.SERVER.equals(detectPoint)) {
                int endpointId = inventoryService.getOrCreate(serviceId, endpointName, detectPoint);

                if (endpointId != Const.NONE) {
                    builder.addElements(EndpointMappingElement.newBuilder()
                        .setServiceId(serviceId)
                        .setEndpointName(endpointName)
                        .setEndpointId(endpointId)
                        .setFrom(endpoint.getFrom()));
                }
            } else {
                logger.warn("Unexpected endpoint register, endpoint isn't detected from server side. {}", request);
            }
        });

        responseObserver.onNext(builder.build());
        responseObserver.onCompleted();
    }

这个方法和同步网络地址的方法逻辑差不多,也是对请求的地址集合进行遍历,获取请求中的端点的服务ID和端点名,根据网络端口探测是客户端还是服务端,如果是服务端调用EndpointInventoryRegister的getOrCreate()方法将数据放入es数据库中,对应的是endpoint_inventory索引

总结

这篇文章主要讲了Agent初始化的过程中调用服务注册服务实例注册后,服务进行心跳请求和网络地址同步和端点同步,服务端是如何处理心跳检测、同步网络地址和同步端点的。这三个方法的逻辑都差不多,都是通过InventoryStreamProcessor来操作ES数据库,ServiceInstancePingServiceHandler处理心跳请求更新服务和服务实例的心跳时间,RegisterServiceHandler在网络地址同步方法中通过NetworkAddressInventoryRegister填充 network_address_inventory的addressId字段并创建网络地址与服务的关联和网络地址与服务实例的关联,RegisterServiceHandler在端点同步中通过EndpointInventoryRegister构建endpoint_inventory索引的数据。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。