源码角度了解Skywalking之Skywalking是怎么收集Trace信息的
源码角度了解Skywalking之Skywalking是怎么收集Trace信息的
上篇文章我们了解到SKywalking的Trace信息的生成和采样,今天我们分析一下Skywalking是怎么收集这些Trace信息发送给OAP的。
Trace信息收集
Trace信息的收集涉及到的一个类是TraceSegmentServiceClient,它同样是Boot Service的实现类,在prepare()方法中作为监听器添加到GRPCChannelManager的管道中,在onComplete()方法中将实例添加到TracingContext.ListenerManager中
插件的拦截器在afterMethod()方法中一般会调用ContextManager的stopSpan()方法来停止span,具体就是设置span的结束时间,同时调用TracingContext的finish()方法,这个方法中有个关键代码:TracingContext.ListenerManager.notifyFinish(finishedSegment),这个方法中遍历TracingContext.ListenerManager集合afterFinished()方法。TraceSegmentServiceClient实现了TracingContextListener接口,并放入到了这个集合中,它作为其中一个元素,同样也会执行afterFinished()方法
TraceSegmentServiceClient的afterFinished()方法:
public void afterFinished(TraceSegment traceSegment) {
if (traceSegment.isIgnore()) {
return;
}
if (!carrier.produce(traceSegment)) {
if (logger.isDebugEnable()) {
logger.debug("One trace segment has been abandoned, cause by buffer is full.");
}
}
}
这里调用了DataCarrier的produce()方法来生成TraceSegment
DataCarrier是apm-commons模块的apm-datacarrier子模块的类,它底层调用Channels的save()方法来保存TraceSegment信息,它是一个缓存队列,当队列满的时候有BLOCKING和IF_POSSIBLE两种缓存策略,如果是BLOCKING策略,创建ArrayBlockingQueueBuffer队列,如果是IF_POSSIBLE策略创建自己实现的环形队列Buffer对象
总之是把TraceSegment信息放入队列中。
Trace信息的发送
TraceSegmentServiceClient的boot()方法调用DataCarrier的consume()方法进行消费,跟踪这个方法会到ConsumeDriver的begin()方法这里
ConsumeDriver的begin()方法:
ConsumeDriver
@Override
public void begin(Channels channels) {
if (running) {
return;
}
try {
lock.lock();
this.allocateBuffer2Thread();
for (ConsumerThread consumerThread : consumerThreads) {
consumerThread.start();
}
running = true;
} finally {
lock.unlock();
}
}
ConsumeDriver是消费者的线程池,这里循环consumerThreads消费线程数组,开启线程
ConsumerThread的run()方法中会调用IConsumer的consume()方法,而TraceSegmentServiceClient正好实现了IConsumer接口,你说巧不巧?
TraceSegmentServiceClient的consume()方法:
- 判断连接状态是否为已连接,如果是创建GRPCStreamServiceStatus对象
- 创建StreamObserver对象,定义接受者,出现错误的时候发送错误信息给GRPCChannelManager对象,完成后调用GRPCStreamServiceStatus的finished()方法
- 将TraceSegment对象转换为UpstreamSegment对象
- 自旋等待,直到TraceSegment上传完成,同时上传次数
- 如果连接状态不好,就放弃发送
- 最后打印哪些trace segments信息发送到collector了,哪些trace segments信息没有发送到collector
总结
这篇文章主要将了Skywalking是如何进行收集Trace信息和发送Trace信息给OAP的,trace信息的收集其实是执行插件拦截器的afterMethod()的时候通知TracingContextListener的实现类执行afterFinished()方法,这里就调用了DataCarrier的produce()方法将trace信息放入到了缓存队列中,而trace信息的发送是TraceSegmentServiceClient的boot()中调用了DataCarrier的consume()方法创建ConsumerThread线程,执行ConsumerThread 的run()方法,run()中执行了TraceSegmentServiceClient的consume()方法来消费缓存队列中的trace信息,具体是将TraceSegment对象转换为UpstreamSegment对象,通过GRPC发送给Collector。
这整体是一个生产者消费者模式。
- 点赞
- 收藏
- 关注作者
评论(0)