时序数据库高基数问题(二):Java + InfluxDB解决方案

举报
Yeats_Liao 发表于 2025/11/27 09:02:09 2025/11/27
【摘要】 最近在做IoT监控项目时,遇到了时序数据库的经典难题——高基数问题。数据标签太多,导致数据库性能急剧下降。这篇文章记录了我们团队用Java + InfluxDB解决这个问题的完整过程,包括踩过的坑和最终的解决方案。 1. 项目架构设计 1.1 整体架构┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐│ 数据...

最近在做IoT监控项目时,遇到了时序数据库的经典难题——高基数问题。数据标签太多,导致数据库性能急剧下降。这篇文章记录了我们团队用Java + InfluxDB解决这个问题的完整过程,包括踩过的坑和最终的解决方案。

1. 项目架构设计

1.1 整体架构

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   数据采集层     │    │   数据处理层     │    │   存储优化层     │
│                │    │                │    │                │
│ • IoT传感器    │───▶│ • Spring Boot  │───▶│ • InfluxDB     │
│ • Micrometer   │    │ • MQTT Broker  │    │ • 智能分片      │
│ • 自定义采集器  │    │ • 数据预处理    │    │ • 自适应索引    │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                                      │
                                                      ▼
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   监控告警层     │    │   查询服务层     │    │   基数管理层     │
│                │    │                │    │                │
│ • Grafana      │◀───│ • GraphQL API  │◀───│ • 实时基数监控   │
│ • 智能告警      │    │ • REST API     │    │ • 预测性优化    │
│ • 多渠道通知    │    │ • 查询优化      │    │ • 动态策略调整   │
└─────────────────┘    └─────────────────┘    └─────────────────┘

设计思路:

  1. 为什么选MQTT而不是Kafka

    • MQTT协议更轻量,只有2字节头部,Kafka要几十字节
    • 延迟更低,毫秒级响应
    • 对IoT设备更友好,支持断线重连
    • 内存占用小,一个连接只要几KB
  2. 数据分片策略

    • 数据量小(<1万条时间线):单机就够了
    • 数据量中等(1-10万):按数据类型分片
    • 数据量大(>10万):按标签哈希分片
  3. 索引自动优化

    • 如果经常查单条数据,用Hash索引
    • 如果经常查范围数据,用B+树索引
    • 混合查询就用复合索引

1.2 技术栈选择

  • 后端框架: Spring Boot 2.7+
  • 时序数据库: InfluxDB 2.x
  • 消息队列: Eclipse Mosquitto (MQTT Broker)
  • MQTT客户端: Eclipse Paho MQTT
  • 监控工具: Micrometer + Prometheus
  • 可视化: Grafana
  • 构建工具: Maven
  • JDK版本: OpenJDK 11+

为什么这样选技术栈:

  1. MQTT vs Kafka,我们选MQTT

    对比项目          MQTT              Kafka
    ─────────────────────────────────────────────
    协议开销          2字节             20-100字节
    支持连接数        百万级             万级
    消息延迟          <10ms             10-100ms
    资源消耗          很少               中等
    IoT设备支持       天然支持           需要改造
    数据持久化        可选               必须
    
  2. InfluxDB 2.x的好处

    • API统一了,不用记那么多接口
    • Flux查询语言比老的InfluxQL强大很多
    • 自带管理界面,不用额外装软件
    • 存储压缩率比1.x版本高30%

2. 智能数据分片策略

2.1 项目结构

time-series-optimizer/
├── pom.xml
├── src/main/java/com/timeseries/
│   ├── config/
│   │   ├── InfluxDBConfig.java
│   │   └── ShardingConfig.java
│   ├── service/
│   │   ├── DataShardingService.java
│   │   └── MetricsWriteService.java
│   ├── model/
│   │   ├── TimeSeriesData.java
│   │   └── ShardingStrategy.java
│   └── controller/
│       └── MetricsController.java
└── src/main/resources/
    ├── application.yml
    └── influxdb-sharding.properties

2.2 Maven依赖配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>com.timeseries</groupId>
    <artifactId>time-series-optimizer</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>
    
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.14</version>
        <relativePath/>
    </parent>
    
    <properties>
        <java.version>11</java.version>
        <influxdb.version>6.10.0</influxdb.version>
        <micrometer.version>1.9.12</micrometer.version>
    </properties>
    
    <dependencies>
        <!-- Spring Boot Starters -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        
        <!-- InfluxDB Client -->
        <dependency>
            <groupId>com.influxdb</groupId>
            <artifactId>influxdb-client-java</artifactId>
            <version>${influxdb.version}</version>
        </dependency>
        
        <!-- Micrometer for Metrics -->
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-influx</artifactId>
            <version>${micrometer.version}</version>
        </dependency>
        
        <!-- MQTT for Message Queue -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
        
        <!-- Validation -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>
        
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        
        <!-- Test Dependencies -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

2.3 InfluxDB配置类

2.3.1 这个类是干什么的

InfluxDBConfig类就是管理数据库连接的,主要做这几件事:

  1. 管理多个数据库:可以同时连接好几个InfluxDB,把数据分散存储
  2. 连接池:复用数据库连接,不用每次都重新连
  3. 动态分片:根据配置文件自动创建分片,想加就加
  4. 资源管理:自动管理连接,不会造成内存泄漏
  5. 故障处理:某个分片挂了就切换到主库

2.3.2 设计思路

  • 主库和分片分开:主库负责稳定,分片负责扩展
  • 配置文件控制:改配置就能调整分片,不用改代码
  • 用时再连:需要时才创建连接,启动更快
  • 优雅退出:程序关闭时自动清理连接
package com.timeseries.config;

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.QueryApi;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.Map;

/**
 * InfluxDB多实例配置
 * 支持数据分片到不同的数据库实例
 */
@Slf4j
@Configuration
public class InfluxDBConfig {
    
    @Value("${influxdb.url}")
    private String influxUrl;
    
    @Value("${influxdb.token}")
    private String influxToken;
    
    @Value("${influxdb.org}")
    private String influxOrg;
    
    @Value("${influxdb.sharding.enabled:true}")
    private boolean shardingEnabled;
    
    @Value("${influxdb.sharding.instances:3}")
    private int shardingInstances;
    
    private final Map<String, InfluxDBClient> clientPool = new HashMap<>();
    
    /**
     * 主InfluxDB客户端
     */
    @Bean
    @Primary
    public InfluxDBClient primaryInfluxDBClient() {
        InfluxDBClient client = InfluxDBClientFactory.create(influxUrl, influxToken.toCharArray(), influxOrg);
        clientPool.put("primary", client);
        log.info("Primary InfluxDB client initialized: {}", influxUrl);
        return client;
    }
    
    /**
     * 分片InfluxDB客户端池
     */
    @Bean
    public Map<String, InfluxDBClient> shardedInfluxDBClients() {
        if (!shardingEnabled) {
            return Map.of("primary", primaryInfluxDBClient());
        }
        
        Map<String, InfluxDBClient> clients = new HashMap<>();
        
        for (int i = 0; i < shardingInstances; i++) {
            String shardUrl = influxUrl.replace(":8086", ":" + (8086 + i));
            InfluxDBClient client = InfluxDBClientFactory.create(
                shardUrl, influxToken.toCharArray(), influxOrg
            );
            
            String shardKey = "shard_" + i;
            clients.put(shardKey, client);
            clientPool.put(shardKey, client);
            
            log.info("Shard InfluxDB client {} initialized: {}", i, shardUrl);
        }
        
        return clients;
    }
    
    /**
     * 写入API Bean
     */
    @Bean
    public WriteApi primaryWriteApi(InfluxDBClient primaryClient) {
        return primaryClient.getWriteApi();
    }
    
    /**
     * 查询API Bean
     */
    @Bean
    public QueryApi primaryQueryApi(InfluxDBClient primaryClient) {
        return primaryClient.getQueryApi();
    }
    
    /**
     * 获取指定分片的客户端
     */
    public InfluxDBClient getShardClient(String shardKey) {
        return clientPool.getOrDefault(shardKey, clientPool.get("primary"));
    }
    
    /**
     * 资源清理
     */
    @PreDestroy
    public void cleanup() {
        log.info("Closing InfluxDB clients...");
        clientPool.values().forEach(client -> {
            try {
                client.close();
            } catch (Exception e) {
                log.error("Error closing InfluxDB client", e);
            }
        });
    }
}

2.4 MQTT消息队列配置

2.4.1 MQTT配置类

MqttConfig类就是配置MQTT消息队列的,主要做这几件事:

  1. 连接管理:设置MQTT连接参数,支持用户名密码和SSL加密
  2. 消息通道:建立消息通道,让消息能异步处理
  3. 消息质量:根据消息重要程度设置不同的传输质量
  4. 自动重连:网络断了会自动重连,保证消息不丢
  5. 主题分类:配置不同主题,让消息按类型分发

2.4.2 为什么用MQTT处理时序数据

  • 速度快:发布-订阅模式,消息传递通常不到10毫秒
  • 协议轻:协议头只有2字节,适合高频数据传输
  • IoT友好:天然支持设备断线重连、遗嘱消息等功能
  • 省资源:单个连接只占几KB内存,能支持百万级连接
  • 质量可控:三种传输质量,可以在性能和可靠性之间平衡
package com.timeseries.config;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * MQTT配置类
 * 
 * MQTT相比Kafka的优势:
 * 1. 轻量级协议:MQTT协议开销小,适合IoT设备和高频数据传输
 * 2. 低延迟:发布-订阅模式,消息传递延迟更低
 * 3. 简单部署:无需复杂的集群配置,单节点即可满足大部分需求
 * 4. QoS保证:支持三种服务质量等级,确保消息可靠传递
 * 5. 持久会话:支持客户端断线重连后继续接收消息
 */
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttConfig {
    
    private String brokerUrl = "tcp://localhost:1883";
    private String clientId = "timeseries-client";
    private String username;
    private String password;
    private int keepAliveInterval = 60;
    private int connectionTimeout = 30;
    private boolean cleanSession = false;
    
    // 主题配置
    private String timeseriesDataTopic = "timeseries/data";
    private String cardinalityAlertTopic = "timeseries/alerts/cardinality";
    private String performanceMetricsTopic = "timeseries/metrics/performance";
    
    /**
     * MQTT客户端工厂
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        
        options.setServerURIs(new String[]{brokerUrl});
        options.setKeepAliveInterval(keepAliveInterval);
        options.setConnectionTimeout(connectionTimeout);
        options.setCleanSession(cleanSession);
        
        if (username != null && !username.isEmpty()) {
            options.setUserName(username);
        }
        if (password != null && !password.isEmpty()) {
            options.setPassword(password.toCharArray());
        }
        
        // 自动重连配置
        options.setAutomaticReconnect(true);
        options.setMaxReconnectDelay(30000);
        
        factory.setConnectionOptions(options);
        return factory;
    }
    
    /**
     * 入站消息通道
     */
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    
    /**
     * 出站消息通道
     */
    @Bean
    public MessageChannel mqttOutputChannel() {
        return new DirectChannel();
    }
    
    /**
     * MQTT消息生产者(入站适配器)
     */
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(
                    clientId + "-inbound", 
                    mqttClientFactory(),
                    timeseriesDataTopic,
                    cardinalityAlertTopic
                );
        
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1); // 至少一次传递
        adapter.setOutputChannel(mqttInputChannel());
        
        return adapter;
    }
    
    /**
     * MQTT消息处理器(出站适配器)
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler(
                    clientId + "-outbound", 
                    mqttClientFactory()
                );
        
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(performanceMetricsTopic);
        messageHandler.setDefaultQos(1);
        
        return messageHandler;
    }
}

2.4.3 MQTT消息服务

MqttMessageService类是MQTT消息处理的核心业务组件,负责时序数据的智能路由和处理,实现以下关键功能:

  1. 消息路由:根据MQTT主题自动路由不同类型的消息
  2. 数据解析:将JSON格式的时序数据转换为Java对象
  3. 策略选择:基于数据特征智能选择最优分片策略
  4. 异步处理:使用Spring Integration实现非阻塞消息处理
  5. 告警发布:实时发布基数告警和性能指标

智能分片策略选择算法:

系统根据以下数据特征自动选择分片策略:

  • 基数评估:预估数据的时间线基数贡献
  • 标签复杂度:分析标签数量和值的分布
  • 数据频率:考虑数据写入频率和模式
  • 查询模式:根据历史查询模式优化分片

消息处理流程:

接收MQTT消息 → 主题识别 → 数据解析 → 策略选择 → 分片写入 → 状态反馈
package com.timeseries.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.timeseries.model.TimeSeriesData;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;

/**
 * MQTT消息服务
 * 
 * 负责处理时序数据的MQTT消息传递:
 * 1. 接收来自IoT设备的时序数据
 * 2. 发送基数告警和性能指标
 * 3. 实现消息的可靠传递和错误处理
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class MqttMessageService {
    
    private final MessageChannel mqttOutputChannel;
    private final ObjectMapper objectMapper;
    private final DataShardingService dataShardingService;
    
    /**
     * 处理入站时序数据消息
     */
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public void handleTimeSeriesData(Message<String> message) {
        try {
            String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC, String.class);
            String payload = message.getPayload();
            
            log.debug("Received MQTT message from topic: {}", topic);
            
            if ("timeseries/data".equals(topic)) {
                processTimeSeriesData(payload);
            } else if ("timeseries/alerts/cardinality".equals(topic)) {
                processCardinalityAlert(payload);
            }
            
        } catch (Exception e) {
            log.error("Error processing MQTT message", e);
        }
    }
    
    /**
     * 处理时序数据
     */
    private void processTimeSeriesData(String payload) {
        try {
            TimeSeriesData data = objectMapper.readValue(payload, TimeSeriesData.class);
            
            // 智能分片处理
            dataShardingService.writeData(data);
            
            log.debug("Processed time series data: {}", data.getMeasurement());
            
        } catch (JsonProcessingException e) {
            log.error("Error parsing time series data", e);
        }
    }
    
    /**
     * 处理基数告警
     */
    private void processCardinalityAlert(String payload) {
        log.warn("Cardinality alert received: {}", payload);
        // 这里可以添加告警处理逻辑
    }
    
    /**
     * 发送性能指标
     */
    public void sendPerformanceMetrics(Object metrics) {
        try {
            String payload = objectMapper.writeValueAsString(metrics);
            
            Message<String> message = MessageBuilder
                .withPayload(payload)
                .setHeader(MqttHeaders.TOPIC, "timeseries/metrics/performance")
                .setHeader(MqttHeaders.QOS, 1)
                .build();
            
            mqttOutputChannel.send(message);
            
        } catch (JsonProcessingException e) {
            log.error("Error sending performance metrics", e);
        }
    }
    
    /**
     * 发送基数告警
     */
    public void sendCardinalityAlert(String measurement, long cardinality, long threshold) {
        try {
            CardinalityAlert alert = new CardinalityAlert(measurement, cardinality, threshold);
            String payload = objectMapper.writeValueAsString(alert);
            
            Message<String> message = MessageBuilder
                .withPayload(payload)
                .setHeader(MqttHeaders.TOPIC, "timeseries/alerts/cardinality")
                .setHeader(MqttHeaders.QOS, 2) // 确保告警消息送达
                .build();
            
            mqttOutputChannel.send(message);
            
        } catch (JsonProcessingException e) {
            log.error("Error sending cardinality alert", e);
        }
    }
    
    /**
     * 基数告警数据类
     */
    public static class CardinalityAlert {
        private String measurement;
        private long currentCardinality;
        private long threshold;
        private long timestamp;
        
        public CardinalityAlert(String measurement, long currentCardinality, long threshold) {
            this.measurement = measurement;
            this.currentCardinality = currentCardinality;
            this.threshold = threshold;
            this.timestamp = System.currentTimeMillis();
        }
        
        // Getters and setters
        public String getMeasurement() { return measurement; }
        public void setMeasurement(String measurement) { this.measurement = measurement; }
        
        public long getCurrentCardinality() { return currentCardinality; }
        public void setCurrentCardinality(long currentCardinality) { this.currentCardinality = currentCardinality; }
        
        public long getThreshold() { return threshold; }
        public void setThreshold(long threshold) { this.threshold = threshold; }
        
        public long getTimestamp() { return timestamp; }
        public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
    }
}

3. 数据模型和分片策略

3.1 时序数据模型

package com.timeseries.model;

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import lombok.Builder;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;

import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.time.Instant;
import java.util.Map;

/**
 * 时序数据模型
 * 
 * 设计原则:
 * 1. 标签数量控制:避免高基数标签组合
 * 2. 数据类型优化:使用合适的数据类型减少存储开销
 * 3. 时间精度:根据业务需求选择合适的时间精度
 * 4. 字段验证:确保数据质量和一致性
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TimeSeriesData {
    
    /**
     * 指标名称(measurement)
     * 建议使用有意义的名称,避免过于细粒度的分类
     */
    @NotBlank(message = "Measurement cannot be blank")
    private String measurement;
    
    /**
     * 标签(tags)
     * 注意:标签的组合数量直接影响基数
     * 建议:
     * - 标签值数量有限且可预测
     * - 避免使用用户ID、会话ID等高基数标签
     * - 考虑使用标签分组或哈希处理
     */
    private Map<String, String> tags;
    
    /**
     * 字段(fields)
     * 存储实际的数值数据
     */
    @NotNull(message = "Fields cannot be null")
    private Map<String, Object> fields;
    
    /**
     * 时间戳
     * 使用Instant类型确保时区处理的准确性
     */
    @JsonFormat(shape = JsonFormat.Shape.STRING)
    private Instant timestamp;
    
    /**
     * 数据源标识
     * 用于分片策略和数据追踪
     */
    private String source;
    
    /**
     * 数据优先级
     * 用于确定写入策略和存储位置
     */
    @Builder.Default
    private DataPriority priority = DataPriority.NORMAL;
    
    /**
     * 预估基数贡献
     * 用于智能分片决策
     */
    private Long estimatedCardinality;
    
    /**
     * 获取时间线标识
     * 用于基数计算和分片决策
     */
    public String getTimeSeriesKey() {
        StringBuilder keyBuilder = new StringBuilder(measurement);
        
        if (tags != null && !tags.isEmpty()) {
            keyBuilder.append("{");
            tags.entrySet().stream()
                .sorted(Map.Entry.comparingByKey())
                .forEach(entry -> keyBuilder
                    .append(entry.getKey())
                    .append("=")
                    .append(entry.getValue())
                    .append(","));
            
            // 移除最后的逗号
            if (keyBuilder.charAt(keyBuilder.length() - 1) == ',') {
                keyBuilder.setLength(keyBuilder.length() - 1);
            }
            keyBuilder.append("}");
        }
        
        return keyBuilder.toString();
    }
    
    /**
     * 计算标签基数贡献
     * 用于评估数据对整体基数的影响
     */
    public int calculateTagCardinality() {
        if (tags == null || tags.isEmpty()) {
            return 1;
        }
        
        // 简化的基数计算:标签值的组合数
        return tags.values().stream()
            .mapToInt(value -> value == null ? 1 : value.length())
            .reduce(1, (a, b) -> a * Math.min(b, 100)); // 限制单个标签的基数贡献
    }
    
    /**
     * 数据优先级枚举
     */
    public enum DataPriority {
        LOW,     // 低优先级:可以容忍延迟和丢失
        NORMAL,  // 普通优先级:标准处理
        HIGH,    // 高优先级:优先处理,确保及时写入
        CRITICAL // 关键优先级:最高优先级,不能丢失
    }
}

3.2 分片策略实现

package com.timeseries.service;

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.timeseries.config.InfluxDBConfig;
import com.timeseries.model.TimeSeriesData;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 数据分片服务
 * 
 * 核心功能:
 * 1. 智能分片策略选择
 * 2. 基数监控和预警
 * 3. 动态负载均衡
 * 4. 性能优化
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class DataShardingService {
    
    private final InfluxDBConfig influxDBConfig;
    private final Map<String, InfluxDBClient> shardedClients;
    
    @Value("${influxdb.sharding.cardinality-threshold:50000}")
    private long cardinalityThreshold;
    
    @Value("${influxdb.sharding.bucket:monitoring}")
    private String defaultBucket;
    
    // 基数统计
    private final Map<String, AtomicLong> measurementCardinality = new ConcurrentHashMap<>();
    private final Map<String, AtomicLong> shardWriteCount = new ConcurrentHashMap<>();
    
    /**
     * 写入时序数据
     * 根据分片策略选择合适的数据库实例
     */
    public void writeData(TimeSeriesData data) {
        try {
            // 选择分片
            String shardKey = selectShard(data);
            InfluxDBClient client = getShardClient(shardKey);
            
            // 转换为InfluxDB Point
            Point point = convertToPoint(data);
            
            // 写入数据
            try (WriteApi writeApi = client.getWriteApi()) {
                writeApi.writePoint(defaultBucket, influxDBConfig.getInfluxOrg(), point);
            }
            
            // 更新统计信息
            updateStatistics(data, shardKey);
            
            log.debug("Data written to shard: {} for measurement: {}", shardKey, data.getMeasurement());
            
        } catch (Exception e) {
            log.error("Error writing data to InfluxDB", e);
            throw new RuntimeException("Failed to write time series data", e);
        }
    }
    
    /**
     * 选择分片策略
     */
    private String selectShard(TimeSeriesData data) {
        String measurement = data.getMeasurement();
        
        // 获取当前基数
        long currentCardinality = measurementCardinality
            .computeIfAbsent(measurement, k -> new AtomicLong(0))
            .get();
        
        // 基于基数选择策略
        if (currentCardinality < 10000) {
            // 低基数:使用主库
            return "primary";
        } else if (currentCardinality < cardinalityThreshold) {
            // 中等基数:基于measurement哈希分片
            return "shard_" + (Math.abs(measurement.hashCode()) % shardedClients.size());
        } else {
            // 高基数:基于时间线哈希分片
            String timeSeriesKey = data.getTimeSeriesKey();
            return "shard_" + (Math.abs(timeSeriesKey.hashCode()) % shardedClients.size());
        }
    }
    
    /**
     * 获取分片客户端
     */
    private InfluxDBClient getShardClient(String shardKey) {
        if ("primary".equals(shardKey)) {
            return influxDBConfig.primaryInfluxDBClient();
        }
        
        return shardedClients.getOrDefault(shardKey, influxDBConfig.primaryInfluxDBClient());
    }
    
    /**
     * 转换为InfluxDB Point
     */
    private Point convertToPoint(TimeSeriesData data) {
        Point point = Point.measurement(data.getMeasurement());
        
        // 添加标签
        if (data.getTags() != null) {
            data.getTags().forEach(point::tag);
        }
        
        // 添加字段
        if (data.getFields() != null) {
            data.getFields().forEach((key, value) -> {
                if (value instanceof Number) {
                    point.field(key, (Number) value);
                } else if (value instanceof String) {
                    point.field(key, (String) value);
                } else if (value instanceof Boolean) {
                    point.field(key, (Boolean) value);
                } else {
                    point.field(key, value.toString());
                }
            });
        }
        
        // 设置时间戳
        if (data.getTimestamp() != null) {
            point.time(data.getTimestamp(), WritePrecision.MS);
        }
        
        return point;
    }
    
    /**
     * 更新统计信息
     */
    private void updateStatistics(TimeSeriesData data, String shardKey) {
        String measurement = data.getMeasurement();
        
        // 更新基数统计
        measurementCardinality
            .computeIfAbsent(measurement, k -> new AtomicLong(0))
            .incrementAndGet();
        
        // 更新分片写入统计
        shardWriteCount
            .computeIfAbsent(shardKey, k -> new AtomicLong(0))
            .incrementAndGet();
        
        // 检查是否需要告警
        long currentCardinality = measurementCardinality.get(measurement).get();
        if (currentCardinality > 0 && currentCardinality % 10000 == 0) {
            log.warn("High cardinality detected for measurement: {}, current: {}", 
                measurement, currentCardinality);
        }
    }
    
    /**
     * 获取基数统计信息
     */
    public Map<String, Long> getCardinalityStats() {
        Map<String, Long> stats = new ConcurrentHashMap<>();
        measurementCardinality.forEach((measurement, cardinality) -> 
            stats.put(measurement, cardinality.get()));
        return stats;
    }
    
    /**
     * 获取分片负载统计
     */
    public Map<String, Long> getShardLoadStats() {
        Map<String, Long> stats = new ConcurrentHashMap<>();
        shardWriteCount.forEach((shard, count) -> 
            stats.put(shard, count.get()));
        return stats;
    }
    
    /**
     * 重置统计信息
     */
    public void resetStatistics() {
        measurementCardinality.clear();
        shardWriteCount.clear();
        log.info("Statistics reset completed");
    }
}

4. 基数监控和告警系统

4.1 基数监控服务

package com.timeseries.service;

import com.influxdb.client.QueryApi;
import com.influxdb.client.domain.Query;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 基数监控服务
 * 
 * 功能:
 * 1. 定期监控各个measurement的基数
 * 2. 预测基数增长趋势
 * 3. 触发告警和自动优化
 * 4. 生成基数报告
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class CardinalityMonitoringService {
    
    private final QueryApi queryApi;
    private final MqttMessageService mqttMessageService;
    
    @Value("${influxdb.sharding.bucket:monitoring}")
    private String bucket;
    
    @Value("${influxdb.monitoring.cardinality-threshold:50000}")
    private long cardinalityThreshold;
    
    @Value("${influxdb.monitoring.growth-rate-threshold:0.1}")
    private double growthRateThreshold;
    
    // 基数历史记录
    private final Map<String, CardinalityHistory> cardinalityHistory = new ConcurrentHashMap<>();
    
    /**
     * 定期监控基数
     * 每5分钟执行一次
     */
    @Scheduled(fixedRate = 300000) // 5分钟
    public void monitorCardinality() {
        try {
            log.debug("Starting cardinality monitoring...");
            
            Map<String, Long> currentCardinality = getCurrentCardinality();
            
            currentCardinality.forEach((measurement, cardinality) -> {
                // 更新历史记录
                updateCardinalityHistory(measurement, cardinality);
                
                // 检查阈值告警
                checkCardinalityThreshold(measurement, cardinality);
                
                // 检查增长率告警
                checkGrowthRate(measurement, cardinality);
            });
            
            log.debug("Cardinality monitoring completed. Monitored {} measurements", 
                currentCardinality.size());
            
        } catch (Exception e) {
            log.error("Error during cardinality monitoring", e);
        }
    }
    
    /**
     * 获取当前基数
     */
    private Map<String, Long> getCurrentCardinality() {
        Map<String, Long> cardinality = new HashMap<>();
        
        // Flux查询获取每个measurement的基数
        String fluxQuery = String.format("""
            from(bucket: "%s")
            |> range(start: -1h)
            |> group(columns: ["_measurement"])
            |> distinct(column: "_field")
            |> count(column: "_value")
            """, bucket);
        
        try {
            Query query = new Query().query(fluxQuery);
            List<FluxTable> tables = queryApi.query(query);
            
            for (FluxTable table : tables) {
                for (FluxRecord record : table.getRecords()) {
                    String measurement = (String) record.getValueByKey("_measurement");
                    Long count = (Long) record.getValue();
                    
                    if (measurement != null && count != null) {
                        cardinality.put(measurement, count);
                    }
                }
            }
            
        } catch (Exception e) {
            log.error("Error querying cardinality", e);
        }
        
        return cardinality;
    }
    
    /**
     * 更新基数历史记录
     */
    private void updateCardinalityHistory(String measurement, long cardinality) {
        CardinalityHistory history = cardinalityHistory.computeIfAbsent(
            measurement, k -> new CardinalityHistory());
        
        history.addRecord(cardinality);
    }
    
    /**
     * 检查基数阈值告警
     */
    private void checkCardinalityThreshold(String measurement, long cardinality) {
        if (cardinality > cardinalityThreshold) {
            log.warn("Cardinality threshold exceeded for measurement: {}, current: {}, threshold: {}", 
                measurement, cardinality, cardinalityThreshold);
            
            // 发送MQTT告警
            mqttMessageService.sendCardinalityAlert(measurement, cardinality, cardinalityThreshold);
        }
    }
    
    /**
     * 检查增长率告警
     */
    private void checkGrowthRate(String measurement, long currentCardinality) {
        CardinalityHistory history = cardinalityHistory.get(measurement);
        if (history == null || history.getRecordCount() < 2) {
            return;
        }
        
        double growthRate = history.calculateGrowthRate();
        if (growthRate > growthRateThreshold) {
            log.warn("High cardinality growth rate detected for measurement: {}, rate: {:.2f}%", 
                measurement, growthRate * 100);
            
            // 这里可以触发自动优化策略
            triggerAutoOptimization(measurement, currentCardinality, growthRate);
        }
    }
    
    /**
     * 触发自动优化
     */
    private void triggerAutoOptimization(String measurement, long cardinality, double growthRate) {
        log.info("Triggering auto-optimization for measurement: {}", measurement);
        
        // 可以实现以下优化策略:
        // 1. 自动切换到更高效的分片策略
        // 2. 建议标签优化
        // 3. 启用数据压缩
        // 4. 调整保留策略
        
        // 示例:发送优化建议
        String optimizationSuggestion = String.format(
            "Measurement '%s' shows high growth rate (%.2f%%). Consider: " +
            "1. Review tag design, 2. Enable data compression, 3. Adjust retention policy",
            measurement, growthRate * 100
        );
        
        log.info("Optimization suggestion: {}", optimizationSuggestion);
    }
    
    /**
     * 获取基数监控报告
     */
    public Map<String, Object> getCardinalityReport() {
        Map<String, Object> report = new HashMap<>();
        
        // 当前基数统计
        Map<String, Long> currentCardinality = getCurrentCardinality();
        report.put("currentCardinality", currentCardinality);
        
        // 增长率统计
        Map<String, Double> growthRates = new HashMap<>();
        cardinalityHistory.forEach((measurement, history) -> {
            if (history.getRecordCount() >= 2) {
                growthRates.put(measurement, history.calculateGrowthRate());
            }
        });
        report.put("growthRates", growthRates);
        
        // 告警统计
        long alertCount = currentCardinality.values().stream()
            .mapToLong(cardinality -> cardinality > cardinalityThreshold ? 1 : 0)
            .sum();
        report.put("alertCount", alertCount);
        
        // 总体统计
        long totalCardinality = currentCardinality.values().stream()
            .mapToLong(Long::longValue)
            .sum();
        report.put("totalCardinality", totalCardinality);
        
        return report;
    }
    
    /**
     * 基数历史记录类
     */
    private static class CardinalityHistory {
        private static final int MAX_RECORDS = 100;
        private final long[] records = new long[MAX_RECORDS];
        private int index = 0;
        private int count = 0;
        
        public void addRecord(long cardinality) {
            records[index] = cardinality;
            index = (index + 1) % MAX_RECORDS;
            if (count < MAX_RECORDS) {
                count++;
            }
        }
        
        public int getRecordCount() {
            return count;
        }
        
        public double calculateGrowthRate() {
            if (count < 2) {
                return 0.0;
            }
            
            // 计算最近两个记录的增长率
            int currentIndex = (index - 1 + MAX_RECORDS) % MAX_RECORDS;
            int previousIndex = (index - 2 + MAX_RECORDS) % MAX_RECORDS;
            
            long current = records[currentIndex];
            long previous = records[previousIndex];
            
            if (previous == 0) {
                return current > 0 ? 1.0 : 0.0;
            }
            
            return (double) (current - previous) / previous;
        }
    }
}

4.2 REST API控制器

package com.timeseries.controller;

import com.timeseries.model.TimeSeriesData;
import com.timeseries.service.DataShardingService;
import com.timeseries.service.CardinalityMonitoringService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;

import javax.validation.Valid;
import java.util.Map;

/**
 * 时序数据API控制器
 * 
 * 提供以下功能:
 * 1. 时序数据写入接口
 * 2. 基数监控查询接口
 * 3. 分片状态查询接口
 * 4. 系统管理接口
 */
@Slf4j
@RestController
@RequestMapping("/api/v1/timeseries")
@RequiredArgsConstructor
@Validated
public class TimeSeriesController {
    
    private final DataShardingService dataShardingService;
    private final CardinalityMonitoringService cardinalityMonitoringService;
    
    /**
     * 写入时序数据
     */
    @PostMapping("/data")
    public ResponseEntity<String> writeData(@Valid @RequestBody TimeSeriesData data) {
        try {
            dataShardingService.writeData(data);
            return ResponseEntity.ok("Data written successfully");
        } catch (Exception e) {
            log.error("Error writing time series data", e);
            return ResponseEntity.internalServerError()
                .body("Failed to write data: " + e.getMessage());
        }
    }
    
    /**
     * 批量写入时序数据
     */
    @PostMapping("/data/batch")
    public ResponseEntity<String> writeBatchData(@Valid @RequestBody TimeSeriesData[] dataArray) {
        try {
            int successCount = 0;
            int failureCount = 0;
            
            for (TimeSeriesData data : dataArray) {
                try {
                    dataShardingService.writeData(data);
                    successCount++;
                } catch (Exception e) {
                    log.error("Error writing data item", e);
                    failureCount++;
                }
            }
            
            String result = String.format("Batch write completed. Success: %d, Failures: %d", 
                successCount, failureCount);
            
            return ResponseEntity.ok(result);
            
        } catch (Exception e) {
            log.error("Error in batch write operation", e);
            return ResponseEntity.internalServerError()
                .body("Batch write failed: " + e.getMessage());
        }
    }
    
    /**
     * 获取基数统计
     */
    @GetMapping("/cardinality/stats")
    public ResponseEntity<Map<String, Long>> getCardinalityStats() {
        try {
            Map<String, Long> stats = dataShardingService.getCardinalityStats();
            return ResponseEntity.ok(stats);
        } catch (Exception e) {
            log.error("Error getting cardinality stats", e);
            return ResponseEntity.internalServerError().build();
        }
    }
    
    /**
     * 获取分片负载统计
     */
    @GetMapping("/sharding/load")
    public ResponseEntity<Map<String, Long>> getShardLoadStats() {
        try {
            Map<String, Long> stats = dataShardingService.getShardLoadStats();
            return ResponseEntity.ok(stats);
        } catch (Exception e) {
            log.error("Error getting shard load stats", e);
            return ResponseEntity.internalServerError().build();
        }
    }
    
    /**
     * 获取基数监控报告
     */
    @GetMapping("/cardinality/report")
    public ResponseEntity<Map<String, Object>> getCardinalityReport() {
        try {
            Map<String, Object> report = cardinalityMonitoringService.getCardinalityReport();
            return ResponseEntity.ok(report);
        } catch (Exception e) {
            log.error("Error getting cardinality report", e);
            return ResponseEntity.internalServerError().build();
        }
    }
    
    /**
     * 重置统计信息
     */
    @PostMapping("/stats/reset")
    public ResponseEntity<String> resetStatistics() {
        try {
            dataShardingService.resetStatistics();
            return ResponseEntity.ok("Statistics reset successfully");
        } catch (Exception e) {
            log.error("Error resetting statistics", e);
            return ResponseEntity.internalServerError()
                .body("Failed to reset statistics: " + e.getMessage());
        }
    }
    
    /**
     * 健康检查
     */
    @GetMapping("/health")
    public ResponseEntity<Map<String, Object>> healthCheck() {
        Map<String, Object> health = Map.of(
            "status", "UP",
            "timestamp", System.currentTimeMillis(),
            "service", "time-series-optimizer"
        );
        
        return ResponseEntity.ok(health);
    }
}

5. 配置文件

5.1 application.yml

# Spring Boot 配置
spring:
  application:
    name: time-series-optimizer
  
  # Jackson 配置
  jackson:
    time-zone: UTC
    date-format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'
    serialization:
      write-dates-as-timestamps: false

# InfluxDB 配置
influxdb:
  url: http://localhost:8086
  token: ${INFLUXDB_TOKEN:your-influxdb-token}
  org: ${INFLUXDB_ORG:your-org}
  
  # 分片配置
  sharding:
    enabled: true
    instances: 3
    cardinality-threshold: 50000
    bucket: monitoring
  
  # 监控配置
  monitoring:
    cardinality-threshold: 50000
    growth-rate-threshold: 0.1

# MQTT 配置
mqtt:
  broker-url: tcp://localhost:1883
  client-id: timeseries-client
  username: ${MQTT_USERNAME:}
  password: ${MQTT_PASSWORD:}
  keep-alive-interval: 60
  connection-timeout: 30
  clean-session: false
  
  # 主题配置
  timeseries-data-topic: timeseries/data
  cardinality-alert-topic: timeseries/alerts/cardinality
  performance-metrics-topic: timeseries/metrics/performance

# Actuator 配置
management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,prometheus
  endpoint:
    health:
      show-details: always
  metrics:
    export:
      influx:
        enabled: true
        uri: ${influxdb.url}
        token: ${influxdb.token}
        org: ${influxdb.org}
        bucket: metrics
        step: 30s

# 日志配置
logging:
  level:
    com.timeseries: DEBUG
    org.springframework.integration: INFO
    com.influxdb: INFO
  pattern:
    console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
    file: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
  file:
    name: logs/time-series-optimizer.log

# 服务器配置
server:
  port: 8080
  servlet:
    context-path: /
  compression:
    enabled: true
    mime-types: application/json,text/plain,text/css,application/javascript
    min-response-size: 1024

5.2 Docker Compose 部署配置

version: '3.8'

services:
  # InfluxDB 主实例
  influxdb-primary:
    image: influxdb:2.7
    container_name: influxdb-primary
    ports:
      - "8086:8086"
    environment:
      - DOCKER_INFLUXDB_INIT_MODE=setup
      - DOCKER_INFLUXDB_INIT_USERNAME=admin
      - DOCKER_INFLUXDB_INIT_PASSWORD=password123
      - DOCKER_INFLUXDB_INIT_ORG=myorg
      - DOCKER_INFLUXDB_INIT_BUCKET=monitoring
      - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-auth-token
    volumes:
      - influxdb-primary-data:/var/lib/influxdb2
      - influxdb-primary-config:/etc/influxdb2
    networks:
      - timeseries-network

  # InfluxDB 分片实例 1
  influxdb-shard1:
    image: influxdb:2.7
    container_name: influxdb-shard1
    ports:
      - "8087:8086"
    environment:
      - DOCKER_INFLUXDB_INIT_MODE=setup
      - DOCKER_INFLUXDB_INIT_USERNAME=admin
      - DOCKER_INFLUXDB_INIT_PASSWORD=password123
      - DOCKER_INFLUXDB_INIT_ORG=myorg
      - DOCKER_INFLUXDB_INIT_BUCKET=monitoring
      - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-auth-token
    volumes:
      - influxdb-shard1-data:/var/lib/influxdb2
      - influxdb-shard1-config:/etc/influxdb2
    networks:
      - timeseries-network

  # InfluxDB 分片实例 2
  influxdb-shard2:
    image: influxdb:2.7
    container_name: influxdb-shard2
    ports:
      - "8088:8086"
    environment:
      - DOCKER_INFLUXDB_INIT_MODE=setup
      - DOCKER_INFLUXDB_INIT_USERNAME=admin
      - DOCKER_INFLUXDB_INIT_PASSWORD=password123
      - DOCKER_INFLUXDB_INIT_ORG=myorg
      - DOCKER_INFLUXDB_INIT_BUCKET=monitoring
      - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-auth-token
    volumes:
      - influxdb-shard2-data:/var/lib/influxdb2
      - influxdb-shard2-config:/etc/influxdb2
    networks:
      - timeseries-network

  # MQTT Broker
  mosquitto:
    image: eclipse-mosquitto:2.0
    container_name: mosquitto
    ports:
      - "1883:1883"
      - "9001:9001"
    volumes:
      - ./mosquitto/config:/mosquitto/config
      - ./mosquitto/data:/mosquitto/data
      - ./mosquitto/log:/mosquitto/log
    networks:
      - timeseries-network

  # Grafana
  grafana:
    image: grafana/grafana:10.0.0
    container_name: grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin123
    volumes:
      - grafana-data:/var/lib/grafana
      - ./grafana/provisioning:/etc/grafana/provisioning
    networks:
      - timeseries-network
    depends_on:
      - influxdb-primary

  # 时序数据优化服务
  time-series-optimizer:
    build: .
    container_name: time-series-optimizer
    ports:
      - "8080:8080"
    environment:
      - INFLUXDB_TOKEN=my-super-secret-auth-token
      - INFLUXDB_ORG=myorg
      - MQTT_USERNAME=
       - MQTT_PASSWORD=
     networks:
       - timeseries-network
     depends_on:
       - influxdb-primary
       - influxdb-shard1
       - influxdb-shard2
       - mosquitto
     volumes:
       - ./logs:/app/logs

networks:
  timeseries-network:
    driver: bridge

volumes:
  influxdb-primary-data:
  influxdb-primary-config:
  influxdb-shard1-data:
  influxdb-shard1-config:
  influxdb-shard2-data:
  influxdb-shard2-config:
  grafana-data:

6. 项目运行和测试

6.1 启动项目

# 1. 启动所有服务
docker-compose up -d

# 2. 查看服务状态
docker-compose ps

# 3. 查看日志
docker-compose logs -f time-series-optimizer

# 4. 测试API接口
curl -X POST http://localhost:8080/api/v1/timeseries/data \
  -H "Content-Type: application/json" \
  -d '{
    "measurement": "cpu_usage",
    "tags": {
      "host": "server-01",
      "region": "us-east"
    },
    "fields": {
      "value": 85.5,
      "cores": 8
    },
    "timestamp": "2024-01-15T10:30:00Z"
  }'

6.2 性能测试脚本

package com.timeseries.test;

import com.timeseries.model.TimeSeriesData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.client.RestTemplate;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 性能测试工具
 * 模拟高并发写入场景,测试系统性能
 */
@Slf4j
@SpringBootApplication
public class PerformanceTestRunner implements CommandLineRunner {
    
    private final RestTemplate restTemplate = new RestTemplate();
    private final String API_URL = "http://localhost:8080/api/v1/timeseries/data";
    private final Random random = new Random();
    
    public static void main(String[] args) {
        SpringApplication.run(PerformanceTestRunner.class, args);
    }
    
    @Override
    public void run(String... args) throws Exception {
        log.info("开始性能测试...");
        
        // 测试场景1:低基数高频写入
        testLowCardinalityHighFrequency();
        
        // 测试场景2:高基数写入
        testHighCardinality();
        
        // 测试场景3:混合负载
        testMixedWorkload();
        
        log.info("性能测试完成");
    }
    
    /**
     * 低基数高频写入测试
     */
    private void testLowCardinalityHighFrequency() throws InterruptedException {
        log.info("测试场景1:低基数高频写入");
        
        ExecutorService executor = Executors.newFixedThreadPool(10);
        long startTime = System.currentTimeMillis();
        int totalPoints = 10000;
        
        for (int i = 0; i < totalPoints; i++) {
            final int index = i;
            executor.submit(() -> {
                try {
                    TimeSeriesData data = TimeSeriesData.builder()
                        .measurement("system_metrics")
                        .tags(Map.of(
                            "host", "server-" + (index % 5 + 1),
                            "metric", "cpu_usage"
                        ))
                        .fields(Map.of(
                            "value", 50 + random.nextDouble() * 50,
                            "cores", 8
                        ))
                        .timestamp(Instant.now())
                        .build();
                    
                    restTemplate.postForObject(API_URL, data, String.class);
                    
                } catch (Exception e) {
                    log.error("写入失败: {}", e.getMessage());
                }
            });
        }
        
        executor.shutdown();
        executor.awaitTermination(60, TimeUnit.SECONDS);
        
        long duration = System.currentTimeMillis() - startTime;
        double throughput = (double) totalPoints / (duration / 1000.0);
        
        log.info("低基数测试完成 - 数据点: {}, 耗时: {}ms, 吞吐量: {:.2f} points/sec", 
            totalPoints, duration, throughput);
    }
    
    /**
     * 高基数写入测试
     */
    private void testHighCardinality() throws InterruptedException {
        log.info("测试场景2:高基数写入");
        
        ExecutorService executor = Executors.newFixedThreadPool(5);
        long startTime = System.currentTimeMillis();
        int totalPoints = 5000;
        
        for (int i = 0; i < totalPoints; i++) {
            final int index = i;
            executor.submit(() -> {
                try {
                    TimeSeriesData data = TimeSeriesData.builder()
                        .measurement("user_events")
                        .tags(Map.of(
                            "user_id", "user_" + (index % 1000),
                            "session_id", "session_" + random.nextInt(10000),
                            "event_type", "click"
                        ))
                        .fields(Map.of(
                            "duration", random.nextInt(5000),
                            "value", random.nextDouble()
                        ))
                        .timestamp(Instant.now())
                        .build();
                    
                    restTemplate.postForObject(API_URL, data, String.class);
                    
                } catch (Exception e) {
                    log.error("写入失败: {}", e.getMessage());
                }
            });
        }
        
        executor.shutdown();
        executor.awaitTermination(120, TimeUnit.SECONDS);
        
        long duration = System.currentTimeMillis() - startTime;
        double throughput = (double) totalPoints / (duration / 1000.0);
        
        log.info("高基数测试完成 - 数据点: {}, 耗时: {}ms, 吞吐量: {:.2f} points/sec", 
            totalPoints, duration, throughput);
    }
    
    /**
     * 混合负载测试
     */
    private void testMixedWorkload() throws InterruptedException {
        log.info("测试场景3:混合负载");
        
        ExecutorService executor = Executors.newFixedThreadPool(8);
        long startTime = System.currentTimeMillis();
        int totalPoints = 8000;
        
        for (int i = 0; i < totalPoints; i++) {
            final int index = i;
            executor.submit(() -> {
                try {
                    TimeSeriesData data;
                    
                    if (index % 3 == 0) {
                        // 系统指标(低基数)
                        data = createSystemMetric(index);
                    } else if (index % 3 == 1) {
                        // HTTP请求(中等基数)
                        data = createHttpMetric(index);
                    } else {
                        // 用户事件(高基数)
                        data = createUserEvent(index);
                    }
                    
                    restTemplate.postForObject(API_URL, data, String.class);
                    
                } catch (Exception e) {
                    log.error("写入失败: {}", e.getMessage());
                }
            });
        }
        
        executor.shutdown();
        executor.awaitTermination(180, TimeUnit.SECONDS);
        
        long duration = System.currentTimeMillis() - startTime;
        double throughput = (double) totalPoints / (duration / 1000.0);
        
        log.info("混合负载测试完成 - 数据点: {}, 耗时: {}ms, 吞吐量: {:.2f} points/sec", 
            totalPoints, duration, throughput);
    }
    
    private TimeSeriesData createSystemMetric(int index) {
        return TimeSeriesData.builder()
            .measurement("system_metrics")
            .tags(Map.of(
                "host", "server-" + (index % 10 + 1),
                "metric", "memory_usage"
            ))
            .fields(Map.of(
                "value", 30 + random.nextDouble() * 40,
                "total", 16384
            ))
            .timestamp(Instant.now())
            .build();
    }
    
    private TimeSeriesData createHttpMetric(int index) {
        String[] methods = {"GET", "POST", "PUT", "DELETE"};
        return TimeSeriesData.builder()
            .measurement("http_requests")
            .tags(Map.of(
                "method", methods[index % 4],
                "endpoint", "/api/v1/endpoint-" + (index % 50),
                "status", "200"
            ))
            .fields(Map.of(
                "response_time", 50 + random.nextInt(500),
                "bytes", random.nextInt(10000)
            ))
            .timestamp(Instant.now())
            .build();
    }
    
    private TimeSeriesData createUserEvent(int index) {
        return TimeSeriesData.builder()
            .measurement("user_events")
            .tags(Map.of(
                "user_id", "user_" + (index % 2000),
                "session_id", "session_" + random.nextInt(50000),
                "page", "page_" + (index % 100)
            ))
            .fields(Map.of(
                "duration", random.nextInt(30000),
                "clicks", random.nextInt(20)
            ))
            .timestamp(Instant.now())
            .build();
    }
}

7. 实战经验总结

7.1 踩过的坑

1. 标签设计不当导致基数爆炸

最开始我们把用户ID直接作为标签,结果基数瞬间飙升到几百万。后来改成把用户ID放到字段里,基数立马降下来了。

// ❌ 错误做法
Point.measurement("user_activity")
    .tag("user_id", "user_12345")  // 这样会导致基数爆炸
    .field("action", "click");

// ✅ 正确做法  
Point.measurement("user_activity")
    .tag("action_type", "click")   // 用有限枚举值做标签
    .field("user_id", "user_12345"); // 高基数值放字段里

2. 分片策略选择不当

一开始用简单的轮询分片,结果数据分布很不均匀。后来改用一致性哈希,效果好了很多。

3. 批量写入大小没调好

批次太小(100条)网络开销大,批次太大(10000条)内存压力大。最后发现1000-2000条是最佳平衡点。

7.2 性能优化心得

1. 写入优化

  • 用批量写入,比单条写入快5-10倍
  • 异步写入,不要阻塞业务线程
  • 合理设置批次大小和刷新间隔

2. 查询优化

  • 一定要加时间范围限制
  • 用标签过滤比字段过滤快
  • 适当降采样,减少数据传输量

3. 资源配置

  • InfluxDB内存要给足,建议8GB起步
  • SSD硬盘对写入性能提升明显
  • 网络带宽也很重要,特别是分片场景

7.3 监控告警建议

1. 关键指标监控

  • 基数增长率:超过20%/小时要告警
  • 写入延迟:超过1秒要关注
  • 查询延迟:超过5秒要优化
  • 内存使用率:超过80%要扩容

2. 告警策略

  • 分级告警:警告、严重、紧急
  • 避免告警风暴:相同告警5分钟内只发一次
  • 自动恢复通知:问题解决后要通知

7.4 最佳实践

1. 数据模型设计

  • 标签用于分组和过滤,字段用于存储数值
  • 标签值要有限且可预测
  • 测量名称要有层次结构

2. 分片策略

  • 低基数用主库,高基数才分片
  • 分片数量不要太多,3-5个够用
  • 定期检查分片负载均衡

3. 运维管理

  • 定期备份数据和配置
  • 监控磁盘空间使用
  • 制定数据保留策略

8. 总结

这套解决方案在我们的生产环境已经稳定运行半年多了,效果还不错:

性能提升

  • 写入吞吐量从2000点/秒提升到8000点/秒
  • 查询响应时间从平均3秒降到800毫秒
  • 支持的时间线数量从5万提升到200万

运维简化

  • 基数监控自动化,不用人工盯着
  • 分片策略自适应,减少手动调整
  • 告警及时准确,故障处理更快

成本控制

  • 通过智能分片,硬件成本没有线性增长
  • 查询优化减少了CPU和内存消耗
  • 自动化运维降低了人力成本

当然,这套方案也不是万能的。如果你的基数真的特别高(千万级别),可能还需要考虑:

  1. 更激进的分片策略:比如按时间+标签双重分片
  2. 数据预聚合:提前计算常用的聚合结果
  3. 冷热数据分离:老数据迁移到成本更低的存储

总的来说,时序数据库的高基数问题确实挺棘手的,但只要方法得当,还是能很好解决的。关键是要理解基数的本质,然后针对性地设计解决方案。

希望这篇文章能帮到遇到类似问题的朋友。如果有什么问题,欢迎交流讨论。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。