源码角度了解Skywalking之服务端OAP对JVM监控的处理
源码角度了解Skywalking之服务端OAP对JVM监控的处理
这篇文章带你从Skywalking源码的角度分析一下服务端OAP对JVM监控信息是怎么处理的。
Skywalking 的Agent对JVM信息收集发送给OAP后,由OAP的JVMMetricReportServiceHandler的collect()方法进行处理
客户端对应的逻辑在apm-agent-core模块的jvm包下的 JVMService类中
服务端的逻辑在skywalking-jvm-receiver-plugin模块的JVMMetricReportServiceHandler类中,接收Agent发送的JVMMetric信息。
接收JVMMetric信息
JVMMetricReportServiceHandler的collect()方法
JVMMetricReportServiceHandler的collect()方法:
@Override public void collect(JVMMetricCollection request, StreamObserver<Commands> responseObserver) {
int serviceInstanceId = request.getServiceInstanceId();
if (logger.isDebugEnabled()) {
logger.debug("receive the jvm metrics from service instance, id: {}", serviceInstanceId);
}
request.getMetricsList().forEach(metrics -> {
long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime());
jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics);
});
responseObserver.onNext(Commands.newBuilder().build());
responseObserver.onCompleted();
}
- 获取请求的服务实例ID
- 遍历JVMMetric数据,调用JVMSourceDispatcher的sendMetric()方法发送数据
JVMSourceDispatcher的sendMetric()方法
JVMSourceDispatcher的sendMetric()方法:
void sendMetric(int serviceInstanceId, long minuteTimeBucket, JVMMetric metrics) {
ServiceInstanceInventory serviceInstanceInventory = instanceInventoryCache.get(serviceInstanceId);
int serviceId;
if (Objects.nonNull(serviceInstanceInventory)) {
serviceId = serviceInstanceInventory.getServiceId();
} else {
logger.warn("Can't find service by service instance id from cache, service instance id is: {}", serviceInstanceId);
return;
}
this.sendToCpuMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getCpu());
this.sendToMemoryMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryList());
this.sendToMemoryPoolMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryPoolList());
this.sendToGCMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getGcList());
}
- 根据服务实例Id获取ServiceInstanceInventory信息,
- 如果为空就返回
- 如果不为空发送CPU、内存、内存池、GC信息,这四个方法都会调用SourceReceiverImpl的receive()方法,具体由DispatcherManager来选择对应的转发器进行转发,也就是SourceDispatcher接口的实现类,默认实现类是ServiceDispatcher,这个类是可观察性分析语言(OAL)自动生成的,这个类中最终交由MetricsStreamProcessor来处理数据放入缓存中
消费处理
MetricsStreamProcessor作为生产者,把JVMMetric数据放入缓存队列中,ConsumerThread是消费者线程,run()中执行了MetricsAggregateWorker的consume()方法来消费缓存队列中的JVMMetric信息
涉及到的相关Worker是由MetricsStreamProcessor的create()方法中定义的
MetricsAggregateWorker会对数据进行聚合,写入缓存中,然后发送给下一个Worker进行处理。
MetricsRemoteWorker会发送Metrics数据到远端的节点
MetricsTransWorker的in()方法中按照小时、天和月调整时间窗口的粒度来调用下一个Worker
MetricsPersistentWorker对Metrics数据进行持久化
总结
这篇文章主要讲了Skywalking的Agent发送JVM数据请求后服务端的处理逻辑。服务端通过JVMMetricReportServiceHandler来接收Agent的请求数据,按照CPU、内存、内存池和GC数据调用方法进行数据,最后通过MetricsStreamProcessor定义的一个个Workder处理数据,持久化到ES中。文中若有不当之处欢迎批评指正,谢谢大家的阅读。
- 点赞
- 收藏
- 关注作者
评论(0)