源码角度了解Skywalking之Skywalking是怎么收集Trace信息的

举报
周杰伦本人 发表于 2022/10/30 18:07:01 2022/10/30
【摘要】 源码角度了解Skywalking之Skywalking是怎么收集Trace信息的 Trace信息收集 Trace信息的发送 总结 源码角度了解Skywalking之Skywalking是怎么收集Trace信息的上篇文章我们了解到SKywalking的Trace信息的生成和采样,今天我们分析一下Skywalking是怎么收集这些Trace信息发送给OAP的。 Trace信息收集Trace信息...

源码角度了解Skywalking之Skywalking是怎么收集Trace信息的

上篇文章我们了解到SKywalking的Trace信息的生成和采样,今天我们分析一下Skywalking是怎么收集这些Trace信息发送给OAP的。

Trace信息收集

Trace信息的收集涉及到的一个类是TraceSegmentServiceClient,它同样是Boot Service的实现类,在prepare()方法中作为监听器添加到GRPCChannelManager的管道中,在onComplete()方法中将实例添加到TracingContext.ListenerManager中

插件的拦截器在afterMethod()方法中一般会调用ContextManager的stopSpan()方法来停止span,具体就是设置span的结束时间,同时调用TracingContext的finish()方法,这个方法中有个关键代码:TracingContext.ListenerManager.notifyFinish(finishedSegment),这个方法中遍历TracingContext.ListenerManager集合afterFinished()方法。TraceSegmentServiceClient实现了TracingContextListener接口,并放入到了这个集合中,它作为其中一个元素,同样也会执行afterFinished()方法

TraceSegmentServiceClient的afterFinished()方法:

public void afterFinished(TraceSegment traceSegment) {
        if (traceSegment.isIgnore()) {
            return;
        }
        if (!carrier.produce(traceSegment)) {
            if (logger.isDebugEnable()) {
                logger.debug("One trace segment has been abandoned, cause by buffer is full.");
            }
        }
    }

这里调用了DataCarrier的produce()方法来生成TraceSegment

DataCarrier是apm-commons模块的apm-datacarrier子模块的类,它底层调用Channels的save()方法来保存TraceSegment信息,它是一个缓存队列,当队列满的时候有BLOCKING和IF_POSSIBLE两种缓存策略,如果是BLOCKING策略,创建ArrayBlockingQueueBuffer队列,如果是IF_POSSIBLE策略创建自己实现的环形队列Buffer对象

总之是把TraceSegment信息放入队列中。

Trace信息的发送

TraceSegmentServiceClient的boot()方法调用DataCarrier的consume()方法进行消费,跟踪这个方法会到ConsumeDriver的begin()方法这里

ConsumeDriver的begin()方法:

ConsumeDriver

@Override
    public void begin(Channels channels) {
        if (running) {
            return;
        }
        try {
            lock.lock();
            this.allocateBuffer2Thread();
            for (ConsumerThread consumerThread : consumerThreads) {
                consumerThread.start();
            }
            running = true;
        } finally {
            lock.unlock();
        }
    }

ConsumeDriver是消费者的线程池,这里循环consumerThreads消费线程数组,开启线程

ConsumerThread的run()方法中会调用IConsumer的consume()方法,而TraceSegmentServiceClient正好实现了IConsumer接口,你说巧不巧?

TraceSegmentServiceClient的consume()方法:

  1. 判断连接状态是否为已连接,如果是创建GRPCStreamServiceStatus对象
  2. 创建StreamObserver对象,定义接受者,出现错误的时候发送错误信息给GRPCChannelManager对象,完成后调用GRPCStreamServiceStatus的finished()方法
  3. 将TraceSegment对象转换为UpstreamSegment对象
  4. 自旋等待,直到TraceSegment上传完成,同时上传次数
  5. 如果连接状态不好,就放弃发送
  6. 最后打印哪些trace segments信息发送到collector了,哪些trace segments信息没有发送到collector

总结

这篇文章主要将了Skywalking是如何进行收集Trace信息和发送Trace信息给OAP的,trace信息的收集其实是执行插件拦截器的afterMethod()的时候通知TracingContextListener的实现类执行afterFinished()方法,这里就调用了DataCarrier的produce()方法将trace信息放入到了缓存队列中,而trace信息的发送是TraceSegmentServiceClient的boot()中调用了DataCarrier的consume()方法创建ConsumerThread线程,执行ConsumerThread 的run()方法,run()中执行了TraceSegmentServiceClient的consume()方法来消费缓存队列中的trace信息,具体是将TraceSegment对象转换为UpstreamSegment对象,通过GRPC发送给Collector。

这整体是一个生产者消费者模式。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。