源码角度了解Skywalking之服务端OAP对Trace的处理

举报
周杰伦本人 发表于 2022/10/30 18:26:20 2022/10/30
【摘要】 源码角度了解Skywalking之服务端OAP对Trace的处理 接收Agent数据 预构建 通知监听器者构建 总结 源码角度了解Skywalking之服务端OAP对Trace的处理从前几篇的文章我们知道Skywalking对Trace信息进行生成收集后,将TraceSegment对象转换为UpstreamSegment对象,通过GRPC发送给OAP服务端,服务端处理对应的模块是skywa...

源码角度了解Skywalking之服务端OAP对Trace的处理

从前几篇的文章我们知道Skywalking对Trace信息进行生成收集后,将TraceSegment对象转换为UpstreamSegment对象,通过GRPC发送给OAP服务端,服务端处理对应的模块是skywalking-trace-receiver-plugin模块

TraceModuleProvider向GRPCHandlerRegister中添加处理器TraceSegmentReportServiceHandler

接收Agent数据

TraceSegmentReportServiceHandler的collect()方法接收Agent的数据,调用SegmentParseV2.Producer的send()方法发送

SegmentParseV2.Producer的send()方法:

public void send(UpstreamSegment segment, SegmentSource source) {
    SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config);
    segmentParse.setStandardizationWorker(standardizationWorker);
    segmentParse.parse(new BufferData<>(segment), source);
}
  1. 创建SegmentParseV2解析器
  2. 调用SegmentParseV2的parse()方法

SegmentParseV2的parse()方法:

  1. 创建SpanListener集合
  2. 获取UpstreamSegment对象
  3. 获取UpstreamSegment对象中关联的所有的TraceID
  4. 如果UpstreamSegment中的SegmentObject实例为空,就解析UpstreamSegment实例得到SegmentObject对象进行填充
  5. 重新检查段信息是否来自文件缓冲区,如果缓存中不存这个段信息对应的服务实例Id,然后返回true
  6. 把SegmentObject对象封装成SegmentDecorator对象,这里是装饰者模式的体现
  7. 调用preBuild()方法进行预构建操作,预构建不成功写入缓存文件中,构建成功会通知具体的监听器来进行构建

预构建

SegmentParseV2的preBuild()方法:

  1. 构建SegmentCoreInfo对象中的segmentId
  2. 调用notifyGlobalsListener()方法通知这个TraceSegment所关联的TraceId对应的监听器进行解析TraceId,需要采样的进行采样
  3. 将Segment信息填充到SegmentCoreInfo对象中
  4. 遍历TraceSegment的所有span,如果是TraceSegment的第一个span,调用notifyFirstListener()方法解析第一个span,将SegmentCoreInfo对象的属性添加到Segment对象中,记录firstEndpointId和firstEndpointName,其实就是对应的请求URL,根据Span类型通知不同的监听类

通知监听器者构建

这个方法遍历所有的span调用SegmentSpanListener的build()方法,设置Segment信息的端点id和端点名后调用SourceReceiverImpl的receive()方法,最终调用SegmentDispatcher的dispatch()方法

SegmentDispatcher的dispatch()方法:

@Override public void dispatch(Segment source) {
    SegmentRecord segment = new SegmentRecord();
    segment.setSegmentId(source.getSegmentId());
    segment.setTraceId(source.getTraceId());
    segment.setServiceId(source.getServiceId());
    segment.setServiceInstanceId(source.getServiceInstanceId());
    segment.setEndpointName(source.getEndpointName());
    segment.setEndpointId(source.getEndpointId());
    segment.setStartTime(source.getStartTime());
    segment.setEndTime(source.getEndTime());
    segment.setLatency(source.getLatency());
    segment.setIsError(source.getIsError());
    segment.setDataBinary(source.getDataBinary());
    segment.setTimeBucket(source.getTimeBucket());
    segment.setVersion(source.getVersion());

    RecordStreamProcessor.getInstance().in(segment);
}

组装SegmentRecord对象,通过RecordStreamProcessor创建实例,in()方法中调用RecordPersistentWorker来批量异步插入ES数据库中。

总结

这篇文章主要讲解了Skywalking的OAP接收到Agent发来的Trace信息的处理逻辑,入口是TraceSegmentReportServiceHandler的collect()方法,会对Agent封装的UpstreamSegment对象进行反序列化,构建Segment、Span等信息,最终由RecordStreamProcessor来批量异步把SegmentRecord写入ES数据库中

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。