源码角度了解Skywalking之服务端OAP对JVM监控的处理

举报
周杰伦本人 发表于 2022/10/30 18:23:59 2022/10/30
【摘要】 源码角度了解Skywalking之服务端OAP对JVM监控的处理 接收JVMMetric信息 JVMMetricReportServiceHandler的collect()方法 JVMSourceDispatcher的sendMetric()方法 消费处理 总结 源码角度了解Skywalking之服务端OAP对JVM监控的处理这篇文章带你从Skywalking源码的角度分析一下服务端OAP...

源码角度了解Skywalking之服务端OAP对JVM监控的处理

这篇文章带你从Skywalking源码的角度分析一下服务端OAP对JVM监控信息是怎么处理的。

Skywalking 的Agent对JVM信息收集发送给OAP后,由OAP的JVMMetricReportServiceHandler的collect()方法进行处理

客户端对应的逻辑在apm-agent-core模块的jvm包下的 JVMService类中

服务端的逻辑在skywalking-jvm-receiver-plugin模块的JVMMetricReportServiceHandler类中,接收Agent发送的JVMMetric信息。

接收JVMMetric信息

JVMMetricReportServiceHandler的collect()方法

JVMMetricReportServiceHandler的collect()方法:

@Override public void collect(JVMMetricCollection request, StreamObserver<Commands> responseObserver) {
    int serviceInstanceId = request.getServiceInstanceId();

    if (logger.isDebugEnabled()) {
        logger.debug("receive the jvm metrics from service instance, id: {}", serviceInstanceId);
    }

    request.getMetricsList().forEach(metrics -> {
        long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime());
        jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics);
    });

    responseObserver.onNext(Commands.newBuilder().build());
    responseObserver.onCompleted();
}

  1. 获取请求的服务实例ID
  2. 遍历JVMMetric数据,调用JVMSourceDispatcher的sendMetric()方法发送数据

JVMSourceDispatcher的sendMetric()方法

JVMSourceDispatcher的sendMetric()方法:

void sendMetric(int serviceInstanceId, long minuteTimeBucket, JVMMetric metrics) {
    ServiceInstanceInventory serviceInstanceInventory = instanceInventoryCache.get(serviceInstanceId);
    int serviceId;
    if (Objects.nonNull(serviceInstanceInventory)) {
        serviceId = serviceInstanceInventory.getServiceId();
    } else {
        logger.warn("Can't find service by service instance id from cache, service instance id is: {}", serviceInstanceId);
        return;
    }

    this.sendToCpuMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getCpu());
    this.sendToMemoryMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryList());
    this.sendToMemoryPoolMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryPoolList());
    this.sendToGCMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getGcList());
}
  1. 根据服务实例Id获取ServiceInstanceInventory信息,
  2. 如果为空就返回
  3. 如果不为空发送CPU、内存、内存池、GC信息,这四个方法都会调用SourceReceiverImpl的receive()方法,具体由DispatcherManager来选择对应的转发器进行转发,也就是SourceDispatcher接口的实现类,默认实现类是ServiceDispatcher,这个类是可观察性分析语言(OAL)自动生成的,这个类中最终交由MetricsStreamProcessor来处理数据放入缓存中

消费处理

MetricsStreamProcessor作为生产者,把JVMMetric数据放入缓存队列中,ConsumerThread是消费者线程,run()中执行了MetricsAggregateWorker的consume()方法来消费缓存队列中的JVMMetric信息

涉及到的相关Worker是由MetricsStreamProcessor的create()方法中定义的

MetricsAggregateWorker会对数据进行聚合,写入缓存中,然后发送给下一个Worker进行处理。

MetricsRemoteWorker会发送Metrics数据到远端的节点

MetricsTransWorker的in()方法中按照小时、天和月调整时间窗口的粒度来调用下一个Worker

MetricsPersistentWorker对Metrics数据进行持久化

总结

这篇文章主要讲了Skywalking的Agent发送JVM数据请求后服务端的处理逻辑。服务端通过JVMMetricReportServiceHandler来接收Agent的请求数据,按照CPU、内存、内存池和GC数据调用方法进行数据,最后通过MetricsStreamProcessor定义的一个个Workder处理数据,持久化到ES中。文中若有不当之处欢迎批评指正,谢谢大家的阅读。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。