Zipkin适配GaussDB开源开发

举报
韦文满 发表于 2024/12/04 17:53:22 2024/12/04
【摘要】 zipkin是知名的分布式追踪系统,被广泛应用于各种规模的分布式系统中,尤其是在微服务架构下。本任务的主要目的是让Zipkin支持华为云,包括ECS、CCE、GaussDB,帮助拓展华为云的影响力,也方便Zipkin的用户能够轻松上云。

Zipkin 描述

Zipkin是知名的分布式追踪系统,被广泛应用于各种规模的分布式系统中,尤其是在微服务架构下。本任务的主要目的是让Zipkin支持华为云,包括ECS、CCE、GaussDB,帮助拓展华为云的影响力,也方便Zipkin的用户能够轻松上云。

GaussDB 描述

GaussDB是基于华为20余年战略投入、软硬全栈协同所创新研发的分布式关系型数据库,具备高可用、高性能、高安全、高弹性、高智能、易部署、易迁移等关键能力,是企业核心业务数字化转型升级的坚实数据底座。

开发前准备工作

参考文档

开发过程

第一步 fork zipkin代码,并初始化工程

1、clone代码

wmwei@DESKTOP-7490:~$ git clone https://github.com/openzipkin/zipkin.git

2、初始化前端页面

root@DESKTOP-7490:~$ cd zipkin
root@DESKTOP-7490:~$ npm install
root@DESKTOP-7490:~$ cd zipkin-less
root@DESKTOP-7490:~$ npm run build

3、后端初始化

root@DESKTOP-7490:~$ cd zipkin
root@DESKTOP-7490:~$ ./mvnw -q --batch-mode -DskipTests --also-make -pl zipkin-server clean install
root@DESKTOP-7490:~$ java -jar .\zipkin-server\target\zipkin-server-3.4.3-SNAPSHOT-exec.jar

4、SQL初始化,{zipkin根路径}/zipkin-storage/gaussdb-v2/src/main/resources/gaussdb.sql

DROP TABLE zipkin_spans;
CREATE TABLE IF NOT EXISTS zipkin_spans (
  "span_id" serial8 NOT NULL,
  "id" VARCHAR(32) NOT NULL,
  "trace_id" VARCHAR(32) NOT NULL,
  "kind" VARCHAR(32) NOT NULL,
  "name" VARCHAR(255) NOT NULL,
  "parent_id" VARCHAR(32),
  "debug" BOOLEAN,
  "share" BOOLEAN,
  "start_ts" int8 ,
  "duration" int ,
  primary key ("span_id"),
  constraint KEY_zipkin_spans_id_trace_kind unique("id", "trace_id", "kind")
  ) WITH (OIDS=FALSE);

comment on column zipkin_spans.trace_id is 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit';
comment on column zipkin_spans.start_ts is 'Span.timestamp(): epoch micros used for endTs query and to implement TTL';
comment on column zipkin_spans.duration is 'Span.duration(): micros used for minDuration and maxDuration query';

CREATE INDEX zipkin_spans_index_trace_id_kind_id ON zipkin_spans ("trace_id", "id", "kind");


DROP TABLE zipkin_annotations;
CREATE TABLE IF NOT EXISTS zipkin_annotations (
  "span_id" int8 NOT NULL,
  "a_key" VARCHAR(255) NOT NULL,
  "a_value" VARCHAR(2000),
  "a_type" INT NOT NULL,
  "a_timestamp" int8,
  "ipv4" VARCHAR(255) ,
  "ipv6" VARCHAR(255) ,
  "port" SMALLINT ,
  "service_name" VARCHAR(255),
  constraint KEY_zipkin_annotations_span_id_a_key unique("span_id", "a_key", "a_timestamp")
  ) WITH (OIDS=FALSE);

comment on column zipkin_annotations.span_id is 'coincides with zipkin_spans.id';
comment on column zipkin_annotations.a_key is 'BinaryAnnotation.key or Annotation.value if type == -1';
comment on column zipkin_annotations.a_value is 'BinaryAnnotation.value(), which must be smaller than 64KB';
comment on column zipkin_annotations.a_type is 'BinaryAnnotation.type() or -1 if Annotation';
comment on column zipkin_annotations.a_timestamp is 'Used to implement TTL; Annotation.timestamp or zipkin_spans.timestamp';
comment on column zipkin_annotations.ipv4 is 'Null when Binary/Annotation.endpoint is null';
comment on column zipkin_annotations.ipv6 is 'Null when Binary/Annotation.endpoint is null, or no IPv6 address';
comment on column zipkin_annotations.port is 'Null when Binary/Annotation.endpoint is null';
comment on column zipkin_annotations.service_name is 'Null when Binary/Annotation.endpoint is null';

CREATE INDEX zipkin_annotations_index_span_id_a_key_id ON zipkin_annotations ("span_id", "a_key", "a_timestamp");


DROP TABLE zipkin_dependencies;
CREATE TABLE IF NOT EXISTS zipkin_dependencies (
  "day" DATE NOT NULL,
  "parent" VARCHAR(32) NOT NULL,
  "child" VARCHAR(32) NOT NULL,
  "call_count" int8,
  "error_count" int8,
  constraint KEY_zipkin_dependencies_day unique("day", "parent", "child")
  ) WITH (OIDS=FALSE);

初始化完成后 启动 zipkin ,通过浏览器访问:http://localhost:3000/zipkin

第二步 开始适配开发

1、在zipkin-storage工程下创建gaussdb-v2模块,pom.xml部分配置如下:

 <dependencies>
    <dependency>
      <groupId>org.jooq</groupId>
      <artifactId>jooq</artifactId>
      <version>${jooq.version}</version>
    </dependency>
    <dependency>
      <groupId>org.opengauss</groupId>
      <artifactId>opengauss-jdbc</artifactId>
      <version>5.0.0-htrunk3.csi.gaussdb_kernel.opengaussjdbc.r3</version>
    </dependency>
    <build>
    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.jooq</groupId>
          <artifactId>jooq-codegen-maven</artifactId>
          <version>${jooq.version}</version>
          <executions>
            <execution>
              <goals>
                <goal>generate</goal>
              </goals>
            </execution>
          </executions>
          <dependencies>
            <dependency>
              <groupId>org.jooq</groupId>
              <artifactId>jooq</artifactId>
              <version>${jooq.version}</version>
            </dependency>
          </dependencies>
          <configuration>
            <jdbc>
                <driver>org.postgresql.Driver</driver>
                <url>jdbc:postgresql://localhost:8000/test</url>
                <user>username</user>
                <password>password</password>
            </jdbc>
            <generator>
              <generate>
                <relations>false</relations>
                <deprecated>false</deprecated>
                <records>false</records>
                <!-- Prevents Intellij from conflating source level with api level -->
                <generatedAnnotationType>JAVAX_ANNOTATION_GENERATED</generatedAnnotationType>
              </generate>
              <database>
                <inputSchema>public</inputSchema>
                <includes>zipkin_.*</includes>
                <excludes />
              </database>
              <target>
                <packageName>zipkin2.storage.gaussdb.v2.internal.generated</packageName>
                <directory>src/main/java</directory>
              </target>
            </generator>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
  </dependencies>

2、修改zipkin-server-shared.yml配置文件,增加GaussDB JDBC配置信息

spring:
  storage:
    gaussdb:
      jdbc-url: ${GAUSSDB_JDBC_URL:}
      host: ${GAUSSDB_HOST:localhost}
      port: ${GAUSSDB_TCP_PORT:8000}
      username: ${GAUSSDB_USER:root}
      password: ${GAUSSDB_PASS:root}
      db: ${GAUSSDB_DB:test}
      schema: ${GAUSSDB_SCHEMA:public}
      driver-class-name: ${GAUSSDB_DRIVER:org.postgresql.Driver}

3、创建GaussDB配置文件

  • 在zipkin-server工程下创建package:zipkin2.server.internal.gaussdb,并创建如下配置文件:
ZipkinGaussDBStorageConfiguration.java
ZipkinGaussDBStorageProperties.java
ZipkinSelfTracingGaussDBStorageConfiguration.java

4、重点代码展示

  1. GaussDBSpanConsumer.java,主要是将接收到的span信息插入到数据库表中
    public Call<Void> accept(List<Span> spans) {
    if (spans.isEmpty()) return Call.create(null);
    try (Connection conn = dataSource.getConnection()) {
      DSLContext ctx = context.get(conn);
      for (Span span : spans) {
        Record record = ctx.selectFrom(ZIPKIN_SPANS).where(
          ZIPKIN_SPANS.TRACE_ID.eq(span.traceId())
            .and(ZIPKIN_SPANS.TRACE_ID.eq(span.traceId()))
            .and(ZIPKIN_SPANS.KIND.eq(span.kind().name()))
        ).fetchOne();
        Long spanId;
        if (Objects.isNull(record)) {
          spanId = Objects.requireNonNull(ctx.insertInto(ZIPKIN_SPANS)
              .set(ZIPKIN_SPANS.TRACE_ID, span.traceId())
              .set(ZIPKIN_SPANS.ID, span.id())
              .set(ZIPKIN_SPANS.PARENT_ID, span.parentId())
              .set(ZIPKIN_SPANS.KIND, span.kind().name())
              .set(ZIPKIN_SPANS.NAME, span.name())
              .set(ZIPKIN_SPANS.DEBUG, span.debug())
              .set(ZIPKIN_SPANS.SHARE, span.shared())
              .set(ZIPKIN_SPANS.START_TS, span.timestamp())
              .set(ZIPKIN_SPANS.DURATION, span.duration() == null ? 0 : span.duration().intValue())
              .returning(ZIPKIN_SPANS.SPAN_ID)
              .fetchOne())
            .getValue(ZIPKIN_SPANS.SPAN_ID);
          LOG.info("begin accept trace data:" + span);
        } else {
          spanId = record.getValue(ZIPKIN_SPANS.SPAN_ID);
        }

        if (Objects.nonNull(span.parentId())) {
          String sql = "insert into zipkin_dependencies values(?,?,?,?,?) on duplicate key update nothing";
          int rs = ctx.execute(sql,
            LocalDate.now(),
            span.parentId(),
            span.traceId(),
            span.duration(), 0L
          );
        }

        for (Annotation annotation : span.annotations()) {
          Result<Record> records1 = ctx.selectFrom(ZIPKIN_ANNOTATIONS).where(
            ZIPKIN_ANNOTATIONS.SPAN_ID.eq(spanId)
              .and(ZIPKIN_ANNOTATIONS.A_KEY.eq(annotation.value()))
              .and(ZIPKIN_ANNOTATIONS.A_TIMESTAMP.eq(annotation.timestamp()))
          ).fetch();
          if (records1.isEmpty()) {
            InsertSetMoreStep<Record> ann = ctx.insertInto(ZIPKIN_ANNOTATIONS)
              .set(ZIPKIN_ANNOTATIONS.SPAN_ID, spanId)
              .set(ZIPKIN_ANNOTATIONS.A_KEY, annotation.value())
              .set(ZIPKIN_ANNOTATIONS.A_TYPE, -1)
              .set(ZIPKIN_ANNOTATIONS.A_TIMESTAMP, annotation.timestamp());

            if (span.localEndpoint() != null) {
              ann = ann.set(ZIPKIN_ANNOTATIONS.SERVICE_NAME, span.localEndpoint().serviceName())
                .set(ZIPKIN_ANNOTATIONS.IPV4, span.localEndpoint().ipv4());
              if (schema.hasIpv6 && Objects.nonNull(span.localEndpoint().ipv6())) {
                ann = ann.set(ZIPKIN_ANNOTATIONS.IPV6, span.localEndpoint().ipv6());
              }
              if (span.localEndpoint().port() != null) {
                ann = ann.set(ZIPKIN_ANNOTATIONS.PORT, span.localEndpoint().port().shortValue());
              }
            }
            ann.execute();
          }
        }
        for (String key : span.tags().keySet()) {
          Result<Record> records1 = ctx.selectFrom(ZIPKIN_ANNOTATIONS).where(
            ZIPKIN_ANNOTATIONS.SPAN_ID.eq(spanId)
              .and(ZIPKIN_ANNOTATIONS.A_KEY.eq(key))
              .and(ZIPKIN_ANNOTATIONS.A_TIMESTAMP.eq(span.timestamp()))
          ).fetch();
          if (records1.isEmpty()) {
            InsertSetMoreStep<Record> anno = ctx.insertInto(ZIPKIN_ANNOTATIONS)
              .set(ZIPKIN_ANNOTATIONS.SPAN_ID, spanId)
              .set(ZIPKIN_ANNOTATIONS.A_KEY, key)
              .set(ZIPKIN_ANNOTATIONS.A_TYPE, 0)
              .set(ZIPKIN_ANNOTATIONS.A_VALUE, span.tags().get(key))
              .set(ZIPKIN_ANNOTATIONS.A_TIMESTAMP, span.timestamp());

            if (span.localEndpoint() != null) {
              anno = anno.set(ZIPKIN_ANNOTATIONS.SERVICE_NAME, span.localEndpoint().serviceName())
                .set(ZIPKIN_ANNOTATIONS.IPV4, span.localEndpoint().ipv4());
              if (schema.hasIpv6) {
                anno = anno.set(ZIPKIN_ANNOTATIONS.IPV6, span.localEndpoint().ipv6());
              }
              if (span.localEndpoint().port() != null) {
                anno = anno.set(ZIPKIN_ANNOTATIONS.PORT, span.localEndpoint().port().shortValue());
              }
            }
            anno.execute();
          }
        }
      }
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    return Call.create(null);
  }

  1. SelectSpansAndAnnotations.java,主要是从数据库读取span信息并返回给前端页面展示
   public List<Span> apply(DSLContext context) {
    List<Span> spans = new ArrayList<>();
    Result<Record> records = context
      .select(schema.spanFields)
      .from(ZIPKIN_SPANS)
      .where(traceIdCondition(context)).fetch();

    for (Record r : records) {
      Long spanId = r.get(ZIPKIN_SPANS.SPAN_ID);
      Span.Builder builder = Span.newBuilder()
        .traceId(r.getValue(ZIPKIN_SPANS.TRACE_ID))
        .name(r.getValue(ZIPKIN_SPANS.NAME))
        .id(r.getValue(ZIPKIN_SPANS.ID))
        .parentId(maybeGet(r, ZIPKIN_SPANS.PARENT_ID, "0"))
        .timestamp(maybeGet(r, ZIPKIN_SPANS.START_TS, 0L))
        .duration(maybeGet(r, ZIPKIN_SPANS.DURATION, 0))
        .debug(r.getValue(ZIPKIN_SPANS.DEBUG));
      Result<Record> records1 = context.selectFrom(ZIPKIN_ANNOTATIONS).where(ZIPKIN_ANNOTATIONS.SPAN_ID.eq(spanId)).fetch();
      if (records1.isNotEmpty()) {
        Record record1 = records1.get(0);
        Endpoint endpoint = Endpoint.newBuilder().serviceName(record1.get(ZIPKIN_ANNOTATIONS.SERVICE_NAME))
          .ip(record1.get(ZIPKIN_ANNOTATIONS.IPV4)).build();
        builder.localEndpoint(endpoint);
      }
      spans.add(builder.build());
    }
    return spans;
  }

编译打包代码

root@DESKTOP-7490:~$ ./mvnw -q --batch-mode -DskipTests --also-make -pl zipkin-server clean install

创建dockerfile

root@DESKTOP-7490:~$ cat dockerfile
FROM openjdk:17-alpine
LABEL ZIPKIN_VERSION=3.4.3
RUN mkdir -p /opt
COPY ./zipkin-server/target/zipkin-server-3.4.3-SNAPSHOT-exec.jar /opt
WORKDIR /opt
RUN mv zipkin-server-3.4.3-SNAPSHOT-exec.jar zipkin-server.jar
EXPOSE 9098
ENTRYPOINT ["java", "-jar","zipkin-server.jar"]

打包image并上传华为镜像云服务

root@DESKTOP-7490:~$ docker build -t swr.cn-north-4.myhuaweicloud.com/xxx/zipkin-server:0.0.1 .
root@DESKTOP-7490:~$ docker push swr.cn-north-4.myhuaweicloud.com/xxx/zipkin-server:0.0.1

创建k8s部署文件

root@DESKTOP-7490:~$ cat zipkin-server.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: zipkin-server
  labels:
    app: zipkin-server-deploy
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zipkin-server-pods
  template:
    metadata:
      name: zipkin-server
      labels:
        app: zipkin-server-pods
    spec:
      containers:
        - name: zipkin-server-container
          image: swr.cn-north-4.myhuaweicloud.com/xxx/zipkin-server:0.0.1
          env:
            - name: ZIPKIN-SERVER
              value: zipkin-server
            - name: STORAGE_TYPE
              value: gaussdb
            - name: DB_PASSWORD
              valueFrom:
                configMapKeyRef:
                  key: DB_PASSWORD
                  name: opengauss
            - name: DB_URL
              valueFrom:
                configMapKeyRef:
                  key: DB_URL
                  name: opengauss
            - name: DB_USERNAME
              valueFrom:
                configMapKeyRef:
                  key: DB_USERNAME
                  name: opengauss
            - name: DB_DRIVER
              valueFrom:
                configMapKeyRef:
                  key: DB_DRIVER
                  name: opengauss
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 9411
              protocol: TCP
          resources:
            requests:
              memory: "256Mi"
              cpu: "250m"
            limits:
              memory: "256Mi"
              cpu: "250m"
      imagePullSecrets:
        - name: default-secret
      restartPolicy: Always
---
apiVersion: v1
kind: Service
metadata:
  name: zipkin-server
spec:
  selector:
    app: zipkin-server-pods
  ports:
    - protocol: TCP
      port: 9441
      targetPort: 9441
  type: NodePort

demo设计基于service-comb微服务

  1. pom.xml引入zipkin在依赖
    <dependency>
      <groupId>org.apache.servicecomb</groupId>
      <artifactId>handler-tracing-zipkin</artifactId>
      <version>3.2.0</version>
    </dependency>
  1. application.yaml新增zipkin配置
root@DESKTOP-7490:~$ cat applicaion.yml
servicecomb:
  tracing:
    enabled: true
    reporter:
      log.enabled: true
      zipkin.enabled: true
    collector:
      apiVersion: v2
      address: http://${ZIPKIN-SERVER:localhost}:9411/

开发过程遇到的问题

  1. 使用jooq带的onConflictDoNothing时,数据插入GaussDB时遇到""insert into on conflict错误,GaussDB不支持该语法,通过查阅GaussDB官方文档,采用手写sql,后面接上on duplicate key update nothing,最终问题解决,参考代码如下:
String sql = "insert into zipkin_dependencies values(?,?,?,?,?) on duplicate key update nothing";
int rs = ctx.execute(sql,
  LocalDate.now(),
  span.parentId(),
  span.traceId(),
  span.duration(), 0L);
  1. 开始是参考mysql-v1来写,遇到版本对不上的问题,新版本zipkin采用v2接口,v2 span数据结构和span有差异,最后通过调整sql数据库表,适配v2版本数据,解决了数据插入和查询的问题。v2版本span数据结构如下
{
  "traceId": "7a41359e2f07d273",
  "id": "7a41359e2f07d273",
  "kind": "CLIENT",
  "name": "consumer rest admin-service.scb-management.health",
  "timestamp": 1733301792016891,
  "duration": 2405,
  "localEndpoint": {
    "serviceName": "admin-service",
    "ipv4": "192.168.185.222"
  },
  "tags": {
    "http.method": "GET",
    "http.path": "/admin/scb/management/health?instanceId=94c74b45-d4c1-42a5-a948-561206f2f7ed&registryName=zookeeper-registry",
    "http.route": "rest://172.24.112.1:9096?urlPrefix=%2Fadmin",
    "http.status_code": "200"
  }
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。