源码角度了解Skywalking之建立连接与服务注册
源码角度了解Skywalking之建立连接与服务注册
这篇文章主要讲一下Agent与OAP建立连接并进行服务注册。
从SkyWalking的启动流程SkyWalkingAgent的premain()中我们了解到调用ServiceManager.INSTANCE.boot()
来启动插件服务,并分析了利用SPI机制加载配置的各种插件的BootService,然后依次遍历BootService实例,调用他们的prepare()准备方法、startup();启动方法和onComplete()完成方法。
ServiceManager.INSTANCE.boot()方法:
public void boot() {
bootedServices = loadAllServices();
prepare();
startup();
onComplete();
}
我们先看一下prepare()方法,方法中遍历配置的BootService实现类,依次调用他们的prepare()方法,其中就有一个GRPCChannelManager类
GRPCChannelManager
GRPCChannelManager从名字就可以看出来,它是对gprc协议管道的管理类,它只实现了boot()方法
boot()方法
boot()方法的关键代码:
public static long GRPC_CHANNEL_CHECK_INTERVAL = 30;
connectCheckFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("GRPCChannelManager"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
也就是开启了通过线程池开启了开启,每隔30s执行一次,那么这个线程主要是做什么工作的呢?
我们看一下GRPCChannelManager类的run()方法
run()方法
第一步:根据ip和端口与指定OAP实例建立连接
第二步:连接成功后调用notify()方法通知所有的GRPCChannelListener连接成功的状态,GRPCChannelListener会调用statusChanged()方法来改变连接状态的值
第三步:设置重新连接为false
也就是它建立网络连接并且通知其他监听者的类
ServiceAndEndpointRegisterClient
ServiceAndEndpointRegisterClient既实现了BootService接口,又实现了GRPCChannelListener接口,主要的工作就是完成服务和端点的注册
prepare()方法
@Override
public void prepare() throws Throwable {
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
INSTANCE_UUID = StringUtil.isEmpty(Config.Agent.INSTANCE_UUID) ? UUID.randomUUID().toString()
.replaceAll("-", "") : Config.Agent.INSTANCE_UUID;
SERVICE_INSTANCE_PROPERTIES = new ArrayList<KeyStringValuePair>();
for (String key : Config.Agent.INSTANCE_PROPERTIES.keySet()) {
SERVICE_INSTANCE_PROPERTIES.add(KeyStringValuePair.newBuilder()
.setKey(key).setValue(Config.Agent.INSTANCE_PROPERTIES.get(key)).build());
}
}
- 查找GRPCChannelManager实例添加ServiceAndEndpointRegisterClient实例作为监听
- 生成实例UUID,先从agent.config文件中读取,如果文件中没有定义生成UUID
- 读取agent.config文件中的配置信息添加到SERVICE_INSTANCE_PROPERTIES集合中
boot()方法
public static long APP_AND_SERVICE_REGISTER_CHECK_INTERVAL = 3;
@Override
public void boot() throws Throwable {
applicationRegisterFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ServiceAndEndpointRegisterClient"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
}
boot()方法中会每3s执行一次run()
run()方法
run()方法的代码同样比较长,主要分为三个部分,
第一部分:如果Agent还没有完成了注册,也就是服务id为空
通过doServiceRegister()方法进行服务注册,返回服务注册映射关系,遍历关系集合得到服务id
第二部分:已经完成了注册,判断服务实例id是否为空,如果为空
通过ServiceId、实例uuid和时间戳等参数来调用doServiceInstanceRegister()方法,得到ServiceInstanceRegisterMapping映射对象,遍历映射信息得到服务实例id
第三部分:已经完成了注册,并且服务实例id不为空
这种情况下定期发送心跳请求,也就是调用serviceInstancePingStub的doPing()方法,通知Skywalking的OAP服务Agent状态良好。然后同步Endpoint的映射关系和同步网络地址的映射关系。
总结
这篇文章我们详细详解了Skywalking初始化过程中ServiceManager.INSTANCE.boot()
代码的具体逻辑,这里提及到了两个BootService接口,一个是GRPCChannelManager类,这个类每30s调用线程来通过ip和端口与OAP建立连接,连接成功后通知监听者状态已经改变,另一个类是ServiceAndEndpointRegisterClient,这个类在准备方法中建立与GRPCChannelManager的监听类,当GRPCChannelManager建立完连接后,通过这个连接创建两个stub客户端,启动方法中每3s开启线程进行服务注册并获取服务实例的id,然后向OAP发送心跳请求来表示自己的状态良好,同步Endpoint映射关系和网络地址映射关系。
- 点赞
- 收藏
- 关注作者
评论(0)