时序数据库高基数问题(二):Java + InfluxDB解决方案
最近在做IoT监控项目时,遇到了时序数据库的经典难题——高基数问题。数据标签太多,导致数据库性能急剧下降。这篇文章记录了我们团队用Java + InfluxDB解决这个问题的完整过程,包括踩过的坑和最终的解决方案。
1. 项目架构设计
1.1 整体架构
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 数据采集层 │ │ 数据处理层 │ │ 存储优化层 │
│ │ │ │ │ │
│ • IoT传感器 │───▶│ • Spring Boot │───▶│ • InfluxDB │
│ • Micrometer │ │ • MQTT Broker │ │ • 智能分片 │
│ • 自定义采集器 │ │ • 数据预处理 │ │ • 自适应索引 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 监控告警层 │ │ 查询服务层 │ │ 基数管理层 │
│ │ │ │ │ │
│ • Grafana │◀───│ • GraphQL API │◀───│ • 实时基数监控 │
│ • 智能告警 │ │ • REST API │ │ • 预测性优化 │
│ • 多渠道通知 │ │ • 查询优化 │ │ • 动态策略调整 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
设计思路:
-
为什么选MQTT而不是Kafka:
- MQTT协议更轻量,只有2字节头部,Kafka要几十字节
- 延迟更低,毫秒级响应
- 对IoT设备更友好,支持断线重连
- 内存占用小,一个连接只要几KB
-
数据分片策略:
- 数据量小(<1万条时间线):单机就够了
- 数据量中等(1-10万):按数据类型分片
- 数据量大(>10万):按标签哈希分片
-
索引自动优化:
- 如果经常查单条数据,用Hash索引
- 如果经常查范围数据,用B+树索引
- 混合查询就用复合索引
1.2 技术栈选择
- 后端框架: Spring Boot 2.7+
- 时序数据库: InfluxDB 2.x
- 消息队列: Eclipse Mosquitto (MQTT Broker)
- MQTT客户端: Eclipse Paho MQTT
- 监控工具: Micrometer + Prometheus
- 可视化: Grafana
- 构建工具: Maven
- JDK版本: OpenJDK 11+
为什么这样选技术栈:
-
MQTT vs Kafka,我们选MQTT:
对比项目 MQTT Kafka ───────────────────────────────────────────── 协议开销 2字节 20-100字节 支持连接数 百万级 万级 消息延迟 <10ms 10-100ms 资源消耗 很少 中等 IoT设备支持 天然支持 需要改造 数据持久化 可选 必须 -
InfluxDB 2.x的好处:
- API统一了,不用记那么多接口
- Flux查询语言比老的InfluxQL强大很多
- 自带管理界面,不用额外装软件
- 存储压缩率比1.x版本高30%
2. 智能数据分片策略
2.1 项目结构
time-series-optimizer/
├── pom.xml
├── src/main/java/com/timeseries/
│ ├── config/
│ │ ├── InfluxDBConfig.java
│ │ └── ShardingConfig.java
│ ├── service/
│ │ ├── DataShardingService.java
│ │ └── MetricsWriteService.java
│ ├── model/
│ │ ├── TimeSeriesData.java
│ │ └── ShardingStrategy.java
│ └── controller/
│ └── MetricsController.java
└── src/main/resources/
├── application.yml
└── influxdb-sharding.properties
2.2 Maven依赖配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.timeseries</groupId>
<artifactId>time-series-optimizer</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.14</version>
<relativePath/>
</parent>
<properties>
<java.version>11</java.version>
<influxdb.version>6.10.0</influxdb.version>
<micrometer.version>1.9.12</micrometer.version>
</properties>
<dependencies>
<!-- Spring Boot Starters -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- InfluxDB Client -->
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>${influxdb.version}</version>
</dependency>
<!-- Micrometer for Metrics -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-influx</artifactId>
<version>${micrometer.version}</version>
</dependency>
<!-- MQTT for Message Queue -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<!-- Validation -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.3 InfluxDB配置类
2.3.1 这个类是干什么的
InfluxDBConfig类就是管理数据库连接的,主要做这几件事:
- 管理多个数据库:可以同时连接好几个InfluxDB,把数据分散存储
- 连接池:复用数据库连接,不用每次都重新连
- 动态分片:根据配置文件自动创建分片,想加就加
- 资源管理:自动管理连接,不会造成内存泄漏
- 故障处理:某个分片挂了就切换到主库
2.3.2 设计思路
- 主库和分片分开:主库负责稳定,分片负责扩展
- 配置文件控制:改配置就能调整分片,不用改代码
- 用时再连:需要时才创建连接,启动更快
- 优雅退出:程序关闭时自动清理连接
package com.timeseries.config;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.QueryApi;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.Map;
/**
* InfluxDB多实例配置
* 支持数据分片到不同的数据库实例
*/
@Slf4j
@Configuration
public class InfluxDBConfig {
@Value("${influxdb.url}")
private String influxUrl;
@Value("${influxdb.token}")
private String influxToken;
@Value("${influxdb.org}")
private String influxOrg;
@Value("${influxdb.sharding.enabled:true}")
private boolean shardingEnabled;
@Value("${influxdb.sharding.instances:3}")
private int shardingInstances;
private final Map<String, InfluxDBClient> clientPool = new HashMap<>();
/**
* 主InfluxDB客户端
*/
@Bean
@Primary
public InfluxDBClient primaryInfluxDBClient() {
InfluxDBClient client = InfluxDBClientFactory.create(influxUrl, influxToken.toCharArray(), influxOrg);
clientPool.put("primary", client);
log.info("Primary InfluxDB client initialized: {}", influxUrl);
return client;
}
/**
* 分片InfluxDB客户端池
*/
@Bean
public Map<String, InfluxDBClient> shardedInfluxDBClients() {
if (!shardingEnabled) {
return Map.of("primary", primaryInfluxDBClient());
}
Map<String, InfluxDBClient> clients = new HashMap<>();
for (int i = 0; i < shardingInstances; i++) {
String shardUrl = influxUrl.replace(":8086", ":" + (8086 + i));
InfluxDBClient client = InfluxDBClientFactory.create(
shardUrl, influxToken.toCharArray(), influxOrg
);
String shardKey = "shard_" + i;
clients.put(shardKey, client);
clientPool.put(shardKey, client);
log.info("Shard InfluxDB client {} initialized: {}", i, shardUrl);
}
return clients;
}
/**
* 写入API Bean
*/
@Bean
public WriteApi primaryWriteApi(InfluxDBClient primaryClient) {
return primaryClient.getWriteApi();
}
/**
* 查询API Bean
*/
@Bean
public QueryApi primaryQueryApi(InfluxDBClient primaryClient) {
return primaryClient.getQueryApi();
}
/**
* 获取指定分片的客户端
*/
public InfluxDBClient getShardClient(String shardKey) {
return clientPool.getOrDefault(shardKey, clientPool.get("primary"));
}
/**
* 资源清理
*/
@PreDestroy
public void cleanup() {
log.info("Closing InfluxDB clients...");
clientPool.values().forEach(client -> {
try {
client.close();
} catch (Exception e) {
log.error("Error closing InfluxDB client", e);
}
});
}
}
2.4 MQTT消息队列配置
2.4.1 MQTT配置类
MqttConfig类就是配置MQTT消息队列的,主要做这几件事:
- 连接管理:设置MQTT连接参数,支持用户名密码和SSL加密
- 消息通道:建立消息通道,让消息能异步处理
- 消息质量:根据消息重要程度设置不同的传输质量
- 自动重连:网络断了会自动重连,保证消息不丢
- 主题分类:配置不同主题,让消息按类型分发
2.4.2 为什么用MQTT处理时序数据
- 速度快:发布-订阅模式,消息传递通常不到10毫秒
- 协议轻:协议头只有2字节,适合高频数据传输
- IoT友好:天然支持设备断线重连、遗嘱消息等功能
- 省资源:单个连接只占几KB内存,能支持百万级连接
- 质量可控:三种传输质量,可以在性能和可靠性之间平衡
package com.timeseries.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* MQTT配置类
*
* MQTT相比Kafka的优势:
* 1. 轻量级协议:MQTT协议开销小,适合IoT设备和高频数据传输
* 2. 低延迟:发布-订阅模式,消息传递延迟更低
* 3. 简单部署:无需复杂的集群配置,单节点即可满足大部分需求
* 4. QoS保证:支持三种服务质量等级,确保消息可靠传递
* 5. 持久会话:支持客户端断线重连后继续接收消息
*/
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttConfig {
private String brokerUrl = "tcp://localhost:1883";
private String clientId = "timeseries-client";
private String username;
private String password;
private int keepAliveInterval = 60;
private int connectionTimeout = 30;
private boolean cleanSession = false;
// 主题配置
private String timeseriesDataTopic = "timeseries/data";
private String cardinalityAlertTopic = "timeseries/alerts/cardinality";
private String performanceMetricsTopic = "timeseries/metrics/performance";
/**
* MQTT客户端工厂
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl});
options.setKeepAliveInterval(keepAliveInterval);
options.setConnectionTimeout(connectionTimeout);
options.setCleanSession(cleanSession);
if (username != null && !username.isEmpty()) {
options.setUserName(username);
}
if (password != null && !password.isEmpty()) {
options.setPassword(password.toCharArray());
}
// 自动重连配置
options.setAutomaticReconnect(true);
options.setMaxReconnectDelay(30000);
factory.setConnectionOptions(options);
return factory;
}
/**
* 入站消息通道
*/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* 出站消息通道
*/
@Bean
public MessageChannel mqttOutputChannel() {
return new DirectChannel();
}
/**
* MQTT消息生产者(入站适配器)
*/
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
clientId + "-inbound",
mqttClientFactory(),
timeseriesDataTopic,
cardinalityAlertTopic
);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1); // 至少一次传递
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
/**
* MQTT消息处理器(出站适配器)
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutputChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(
clientId + "-outbound",
mqttClientFactory()
);
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(performanceMetricsTopic);
messageHandler.setDefaultQos(1);
return messageHandler;
}
}
2.4.3 MQTT消息服务
MqttMessageService类是MQTT消息处理的核心业务组件,负责时序数据的智能路由和处理,实现以下关键功能:
- 消息路由:根据MQTT主题自动路由不同类型的消息
- 数据解析:将JSON格式的时序数据转换为Java对象
- 策略选择:基于数据特征智能选择最优分片策略
- 异步处理:使用Spring Integration实现非阻塞消息处理
- 告警发布:实时发布基数告警和性能指标
智能分片策略选择算法:
系统根据以下数据特征自动选择分片策略:
- 基数评估:预估数据的时间线基数贡献
- 标签复杂度:分析标签数量和值的分布
- 数据频率:考虑数据写入频率和模式
- 查询模式:根据历史查询模式优化分片
消息处理流程:
接收MQTT消息 → 主题识别 → 数据解析 → 策略选择 → 分片写入 → 状态反馈
package com.timeseries.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.timeseries.model.TimeSeriesData;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;
/**
* MQTT消息服务
*
* 负责处理时序数据的MQTT消息传递:
* 1. 接收来自IoT设备的时序数据
* 2. 发送基数告警和性能指标
* 3. 实现消息的可靠传递和错误处理
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MqttMessageService {
private final MessageChannel mqttOutputChannel;
private final ObjectMapper objectMapper;
private final DataShardingService dataShardingService;
/**
* 处理入站时序数据消息
*/
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleTimeSeriesData(Message<String> message) {
try {
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC, String.class);
String payload = message.getPayload();
log.debug("Received MQTT message from topic: {}", topic);
if ("timeseries/data".equals(topic)) {
processTimeSeriesData(payload);
} else if ("timeseries/alerts/cardinality".equals(topic)) {
processCardinalityAlert(payload);
}
} catch (Exception e) {
log.error("Error processing MQTT message", e);
}
}
/**
* 处理时序数据
*/
private void processTimeSeriesData(String payload) {
try {
TimeSeriesData data = objectMapper.readValue(payload, TimeSeriesData.class);
// 智能分片处理
dataShardingService.writeData(data);
log.debug("Processed time series data: {}", data.getMeasurement());
} catch (JsonProcessingException e) {
log.error("Error parsing time series data", e);
}
}
/**
* 处理基数告警
*/
private void processCardinalityAlert(String payload) {
log.warn("Cardinality alert received: {}", payload);
// 这里可以添加告警处理逻辑
}
/**
* 发送性能指标
*/
public void sendPerformanceMetrics(Object metrics) {
try {
String payload = objectMapper.writeValueAsString(metrics);
Message<String> message = MessageBuilder
.withPayload(payload)
.setHeader(MqttHeaders.TOPIC, "timeseries/metrics/performance")
.setHeader(MqttHeaders.QOS, 1)
.build();
mqttOutputChannel.send(message);
} catch (JsonProcessingException e) {
log.error("Error sending performance metrics", e);
}
}
/**
* 发送基数告警
*/
public void sendCardinalityAlert(String measurement, long cardinality, long threshold) {
try {
CardinalityAlert alert = new CardinalityAlert(measurement, cardinality, threshold);
String payload = objectMapper.writeValueAsString(alert);
Message<String> message = MessageBuilder
.withPayload(payload)
.setHeader(MqttHeaders.TOPIC, "timeseries/alerts/cardinality")
.setHeader(MqttHeaders.QOS, 2) // 确保告警消息送达
.build();
mqttOutputChannel.send(message);
} catch (JsonProcessingException e) {
log.error("Error sending cardinality alert", e);
}
}
/**
* 基数告警数据类
*/
public static class CardinalityAlert {
private String measurement;
private long currentCardinality;
private long threshold;
private long timestamp;
public CardinalityAlert(String measurement, long currentCardinality, long threshold) {
this.measurement = measurement;
this.currentCardinality = currentCardinality;
this.threshold = threshold;
this.timestamp = System.currentTimeMillis();
}
// Getters and setters
public String getMeasurement() { return measurement; }
public void setMeasurement(String measurement) { this.measurement = measurement; }
public long getCurrentCardinality() { return currentCardinality; }
public void setCurrentCardinality(long currentCardinality) { this.currentCardinality = currentCardinality; }
public long getThreshold() { return threshold; }
public void setThreshold(long threshold) { this.threshold = threshold; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
}
}
3. 数据模型和分片策略
3.1 时序数据模型
package com.timeseries.model;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import lombok.Builder;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.time.Instant;
import java.util.Map;
/**
* 时序数据模型
*
* 设计原则:
* 1. 标签数量控制:避免高基数标签组合
* 2. 数据类型优化:使用合适的数据类型减少存储开销
* 3. 时间精度:根据业务需求选择合适的时间精度
* 4. 字段验证:确保数据质量和一致性
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TimeSeriesData {
/**
* 指标名称(measurement)
* 建议使用有意义的名称,避免过于细粒度的分类
*/
@NotBlank(message = "Measurement cannot be blank")
private String measurement;
/**
* 标签(tags)
* 注意:标签的组合数量直接影响基数
* 建议:
* - 标签值数量有限且可预测
* - 避免使用用户ID、会话ID等高基数标签
* - 考虑使用标签分组或哈希处理
*/
private Map<String, String> tags;
/**
* 字段(fields)
* 存储实际的数值数据
*/
@NotNull(message = "Fields cannot be null")
private Map<String, Object> fields;
/**
* 时间戳
* 使用Instant类型确保时区处理的准确性
*/
@JsonFormat(shape = JsonFormat.Shape.STRING)
private Instant timestamp;
/**
* 数据源标识
* 用于分片策略和数据追踪
*/
private String source;
/**
* 数据优先级
* 用于确定写入策略和存储位置
*/
@Builder.Default
private DataPriority priority = DataPriority.NORMAL;
/**
* 预估基数贡献
* 用于智能分片决策
*/
private Long estimatedCardinality;
/**
* 获取时间线标识
* 用于基数计算和分片决策
*/
public String getTimeSeriesKey() {
StringBuilder keyBuilder = new StringBuilder(measurement);
if (tags != null && !tags.isEmpty()) {
keyBuilder.append("{");
tags.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEach(entry -> keyBuilder
.append(entry.getKey())
.append("=")
.append(entry.getValue())
.append(","));
// 移除最后的逗号
if (keyBuilder.charAt(keyBuilder.length() - 1) == ',') {
keyBuilder.setLength(keyBuilder.length() - 1);
}
keyBuilder.append("}");
}
return keyBuilder.toString();
}
/**
* 计算标签基数贡献
* 用于评估数据对整体基数的影响
*/
public int calculateTagCardinality() {
if (tags == null || tags.isEmpty()) {
return 1;
}
// 简化的基数计算:标签值的组合数
return tags.values().stream()
.mapToInt(value -> value == null ? 1 : value.length())
.reduce(1, (a, b) -> a * Math.min(b, 100)); // 限制单个标签的基数贡献
}
/**
* 数据优先级枚举
*/
public enum DataPriority {
LOW, // 低优先级:可以容忍延迟和丢失
NORMAL, // 普通优先级:标准处理
HIGH, // 高优先级:优先处理,确保及时写入
CRITICAL // 关键优先级:最高优先级,不能丢失
}
}
3.2 分片策略实现
package com.timeseries.service;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.timeseries.config.InfluxDBConfig;
import com.timeseries.model.TimeSeriesData;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* 数据分片服务
*
* 核心功能:
* 1. 智能分片策略选择
* 2. 基数监控和预警
* 3. 动态负载均衡
* 4. 性能优化
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class DataShardingService {
private final InfluxDBConfig influxDBConfig;
private final Map<String, InfluxDBClient> shardedClients;
@Value("${influxdb.sharding.cardinality-threshold:50000}")
private long cardinalityThreshold;
@Value("${influxdb.sharding.bucket:monitoring}")
private String defaultBucket;
// 基数统计
private final Map<String, AtomicLong> measurementCardinality = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> shardWriteCount = new ConcurrentHashMap<>();
/**
* 写入时序数据
* 根据分片策略选择合适的数据库实例
*/
public void writeData(TimeSeriesData data) {
try {
// 选择分片
String shardKey = selectShard(data);
InfluxDBClient client = getShardClient(shardKey);
// 转换为InfluxDB Point
Point point = convertToPoint(data);
// 写入数据
try (WriteApi writeApi = client.getWriteApi()) {
writeApi.writePoint(defaultBucket, influxDBConfig.getInfluxOrg(), point);
}
// 更新统计信息
updateStatistics(data, shardKey);
log.debug("Data written to shard: {} for measurement: {}", shardKey, data.getMeasurement());
} catch (Exception e) {
log.error("Error writing data to InfluxDB", e);
throw new RuntimeException("Failed to write time series data", e);
}
}
/**
* 选择分片策略
*/
private String selectShard(TimeSeriesData data) {
String measurement = data.getMeasurement();
// 获取当前基数
long currentCardinality = measurementCardinality
.computeIfAbsent(measurement, k -> new AtomicLong(0))
.get();
// 基于基数选择策略
if (currentCardinality < 10000) {
// 低基数:使用主库
return "primary";
} else if (currentCardinality < cardinalityThreshold) {
// 中等基数:基于measurement哈希分片
return "shard_" + (Math.abs(measurement.hashCode()) % shardedClients.size());
} else {
// 高基数:基于时间线哈希分片
String timeSeriesKey = data.getTimeSeriesKey();
return "shard_" + (Math.abs(timeSeriesKey.hashCode()) % shardedClients.size());
}
}
/**
* 获取分片客户端
*/
private InfluxDBClient getShardClient(String shardKey) {
if ("primary".equals(shardKey)) {
return influxDBConfig.primaryInfluxDBClient();
}
return shardedClients.getOrDefault(shardKey, influxDBConfig.primaryInfluxDBClient());
}
/**
* 转换为InfluxDB Point
*/
private Point convertToPoint(TimeSeriesData data) {
Point point = Point.measurement(data.getMeasurement());
// 添加标签
if (data.getTags() != null) {
data.getTags().forEach(point::tag);
}
// 添加字段
if (data.getFields() != null) {
data.getFields().forEach((key, value) -> {
if (value instanceof Number) {
point.field(key, (Number) value);
} else if (value instanceof String) {
point.field(key, (String) value);
} else if (value instanceof Boolean) {
point.field(key, (Boolean) value);
} else {
point.field(key, value.toString());
}
});
}
// 设置时间戳
if (data.getTimestamp() != null) {
point.time(data.getTimestamp(), WritePrecision.MS);
}
return point;
}
/**
* 更新统计信息
*/
private void updateStatistics(TimeSeriesData data, String shardKey) {
String measurement = data.getMeasurement();
// 更新基数统计
measurementCardinality
.computeIfAbsent(measurement, k -> new AtomicLong(0))
.incrementAndGet();
// 更新分片写入统计
shardWriteCount
.computeIfAbsent(shardKey, k -> new AtomicLong(0))
.incrementAndGet();
// 检查是否需要告警
long currentCardinality = measurementCardinality.get(measurement).get();
if (currentCardinality > 0 && currentCardinality % 10000 == 0) {
log.warn("High cardinality detected for measurement: {}, current: {}",
measurement, currentCardinality);
}
}
/**
* 获取基数统计信息
*/
public Map<String, Long> getCardinalityStats() {
Map<String, Long> stats = new ConcurrentHashMap<>();
measurementCardinality.forEach((measurement, cardinality) ->
stats.put(measurement, cardinality.get()));
return stats;
}
/**
* 获取分片负载统计
*/
public Map<String, Long> getShardLoadStats() {
Map<String, Long> stats = new ConcurrentHashMap<>();
shardWriteCount.forEach((shard, count) ->
stats.put(shard, count.get()));
return stats;
}
/**
* 重置统计信息
*/
public void resetStatistics() {
measurementCardinality.clear();
shardWriteCount.clear();
log.info("Statistics reset completed");
}
}
4. 基数监控和告警系统
4.1 基数监控服务
package com.timeseries.service;
import com.influxdb.client.QueryApi;
import com.influxdb.client.domain.Query;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 基数监控服务
*
* 功能:
* 1. 定期监控各个measurement的基数
* 2. 预测基数增长趋势
* 3. 触发告警和自动优化
* 4. 生成基数报告
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class CardinalityMonitoringService {
private final QueryApi queryApi;
private final MqttMessageService mqttMessageService;
@Value("${influxdb.sharding.bucket:monitoring}")
private String bucket;
@Value("${influxdb.monitoring.cardinality-threshold:50000}")
private long cardinalityThreshold;
@Value("${influxdb.monitoring.growth-rate-threshold:0.1}")
private double growthRateThreshold;
// 基数历史记录
private final Map<String, CardinalityHistory> cardinalityHistory = new ConcurrentHashMap<>();
/**
* 定期监控基数
* 每5分钟执行一次
*/
@Scheduled(fixedRate = 300000) // 5分钟
public void monitorCardinality() {
try {
log.debug("Starting cardinality monitoring...");
Map<String, Long> currentCardinality = getCurrentCardinality();
currentCardinality.forEach((measurement, cardinality) -> {
// 更新历史记录
updateCardinalityHistory(measurement, cardinality);
// 检查阈值告警
checkCardinalityThreshold(measurement, cardinality);
// 检查增长率告警
checkGrowthRate(measurement, cardinality);
});
log.debug("Cardinality monitoring completed. Monitored {} measurements",
currentCardinality.size());
} catch (Exception e) {
log.error("Error during cardinality monitoring", e);
}
}
/**
* 获取当前基数
*/
private Map<String, Long> getCurrentCardinality() {
Map<String, Long> cardinality = new HashMap<>();
// Flux查询获取每个measurement的基数
String fluxQuery = String.format("""
from(bucket: "%s")
|> range(start: -1h)
|> group(columns: ["_measurement"])
|> distinct(column: "_field")
|> count(column: "_value")
""", bucket);
try {
Query query = new Query().query(fluxQuery);
List<FluxTable> tables = queryApi.query(query);
for (FluxTable table : tables) {
for (FluxRecord record : table.getRecords()) {
String measurement = (String) record.getValueByKey("_measurement");
Long count = (Long) record.getValue();
if (measurement != null && count != null) {
cardinality.put(measurement, count);
}
}
}
} catch (Exception e) {
log.error("Error querying cardinality", e);
}
return cardinality;
}
/**
* 更新基数历史记录
*/
private void updateCardinalityHistory(String measurement, long cardinality) {
CardinalityHistory history = cardinalityHistory.computeIfAbsent(
measurement, k -> new CardinalityHistory());
history.addRecord(cardinality);
}
/**
* 检查基数阈值告警
*/
private void checkCardinalityThreshold(String measurement, long cardinality) {
if (cardinality > cardinalityThreshold) {
log.warn("Cardinality threshold exceeded for measurement: {}, current: {}, threshold: {}",
measurement, cardinality, cardinalityThreshold);
// 发送MQTT告警
mqttMessageService.sendCardinalityAlert(measurement, cardinality, cardinalityThreshold);
}
}
/**
* 检查增长率告警
*/
private void checkGrowthRate(String measurement, long currentCardinality) {
CardinalityHistory history = cardinalityHistory.get(measurement);
if (history == null || history.getRecordCount() < 2) {
return;
}
double growthRate = history.calculateGrowthRate();
if (growthRate > growthRateThreshold) {
log.warn("High cardinality growth rate detected for measurement: {}, rate: {:.2f}%",
measurement, growthRate * 100);
// 这里可以触发自动优化策略
triggerAutoOptimization(measurement, currentCardinality, growthRate);
}
}
/**
* 触发自动优化
*/
private void triggerAutoOptimization(String measurement, long cardinality, double growthRate) {
log.info("Triggering auto-optimization for measurement: {}", measurement);
// 可以实现以下优化策略:
// 1. 自动切换到更高效的分片策略
// 2. 建议标签优化
// 3. 启用数据压缩
// 4. 调整保留策略
// 示例:发送优化建议
String optimizationSuggestion = String.format(
"Measurement '%s' shows high growth rate (%.2f%%). Consider: " +
"1. Review tag design, 2. Enable data compression, 3. Adjust retention policy",
measurement, growthRate * 100
);
log.info("Optimization suggestion: {}", optimizationSuggestion);
}
/**
* 获取基数监控报告
*/
public Map<String, Object> getCardinalityReport() {
Map<String, Object> report = new HashMap<>();
// 当前基数统计
Map<String, Long> currentCardinality = getCurrentCardinality();
report.put("currentCardinality", currentCardinality);
// 增长率统计
Map<String, Double> growthRates = new HashMap<>();
cardinalityHistory.forEach((measurement, history) -> {
if (history.getRecordCount() >= 2) {
growthRates.put(measurement, history.calculateGrowthRate());
}
});
report.put("growthRates", growthRates);
// 告警统计
long alertCount = currentCardinality.values().stream()
.mapToLong(cardinality -> cardinality > cardinalityThreshold ? 1 : 0)
.sum();
report.put("alertCount", alertCount);
// 总体统计
long totalCardinality = currentCardinality.values().stream()
.mapToLong(Long::longValue)
.sum();
report.put("totalCardinality", totalCardinality);
return report;
}
/**
* 基数历史记录类
*/
private static class CardinalityHistory {
private static final int MAX_RECORDS = 100;
private final long[] records = new long[MAX_RECORDS];
private int index = 0;
private int count = 0;
public void addRecord(long cardinality) {
records[index] = cardinality;
index = (index + 1) % MAX_RECORDS;
if (count < MAX_RECORDS) {
count++;
}
}
public int getRecordCount() {
return count;
}
public double calculateGrowthRate() {
if (count < 2) {
return 0.0;
}
// 计算最近两个记录的增长率
int currentIndex = (index - 1 + MAX_RECORDS) % MAX_RECORDS;
int previousIndex = (index - 2 + MAX_RECORDS) % MAX_RECORDS;
long current = records[currentIndex];
long previous = records[previousIndex];
if (previous == 0) {
return current > 0 ? 1.0 : 0.0;
}
return (double) (current - previous) / previous;
}
}
}
4.2 REST API控制器
package com.timeseries.controller;
import com.timeseries.model.TimeSeriesData;
import com.timeseries.service.DataShardingService;
import com.timeseries.service.CardinalityMonitoringService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import javax.validation.Valid;
import java.util.Map;
/**
* 时序数据API控制器
*
* 提供以下功能:
* 1. 时序数据写入接口
* 2. 基数监控查询接口
* 3. 分片状态查询接口
* 4. 系统管理接口
*/
@Slf4j
@RestController
@RequestMapping("/api/v1/timeseries")
@RequiredArgsConstructor
@Validated
public class TimeSeriesController {
private final DataShardingService dataShardingService;
private final CardinalityMonitoringService cardinalityMonitoringService;
/**
* 写入时序数据
*/
@PostMapping("/data")
public ResponseEntity<String> writeData(@Valid @RequestBody TimeSeriesData data) {
try {
dataShardingService.writeData(data);
return ResponseEntity.ok("Data written successfully");
} catch (Exception e) {
log.error("Error writing time series data", e);
return ResponseEntity.internalServerError()
.body("Failed to write data: " + e.getMessage());
}
}
/**
* 批量写入时序数据
*/
@PostMapping("/data/batch")
public ResponseEntity<String> writeBatchData(@Valid @RequestBody TimeSeriesData[] dataArray) {
try {
int successCount = 0;
int failureCount = 0;
for (TimeSeriesData data : dataArray) {
try {
dataShardingService.writeData(data);
successCount++;
} catch (Exception e) {
log.error("Error writing data item", e);
failureCount++;
}
}
String result = String.format("Batch write completed. Success: %d, Failures: %d",
successCount, failureCount);
return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("Error in batch write operation", e);
return ResponseEntity.internalServerError()
.body("Batch write failed: " + e.getMessage());
}
}
/**
* 获取基数统计
*/
@GetMapping("/cardinality/stats")
public ResponseEntity<Map<String, Long>> getCardinalityStats() {
try {
Map<String, Long> stats = dataShardingService.getCardinalityStats();
return ResponseEntity.ok(stats);
} catch (Exception e) {
log.error("Error getting cardinality stats", e);
return ResponseEntity.internalServerError().build();
}
}
/**
* 获取分片负载统计
*/
@GetMapping("/sharding/load")
public ResponseEntity<Map<String, Long>> getShardLoadStats() {
try {
Map<String, Long> stats = dataShardingService.getShardLoadStats();
return ResponseEntity.ok(stats);
} catch (Exception e) {
log.error("Error getting shard load stats", e);
return ResponseEntity.internalServerError().build();
}
}
/**
* 获取基数监控报告
*/
@GetMapping("/cardinality/report")
public ResponseEntity<Map<String, Object>> getCardinalityReport() {
try {
Map<String, Object> report = cardinalityMonitoringService.getCardinalityReport();
return ResponseEntity.ok(report);
} catch (Exception e) {
log.error("Error getting cardinality report", e);
return ResponseEntity.internalServerError().build();
}
}
/**
* 重置统计信息
*/
@PostMapping("/stats/reset")
public ResponseEntity<String> resetStatistics() {
try {
dataShardingService.resetStatistics();
return ResponseEntity.ok("Statistics reset successfully");
} catch (Exception e) {
log.error("Error resetting statistics", e);
return ResponseEntity.internalServerError()
.body("Failed to reset statistics: " + e.getMessage());
}
}
/**
* 健康检查
*/
@GetMapping("/health")
public ResponseEntity<Map<String, Object>> healthCheck() {
Map<String, Object> health = Map.of(
"status", "UP",
"timestamp", System.currentTimeMillis(),
"service", "time-series-optimizer"
);
return ResponseEntity.ok(health);
}
}
5. 配置文件
5.1 application.yml
# Spring Boot 配置
spring:
application:
name: time-series-optimizer
# Jackson 配置
jackson:
time-zone: UTC
date-format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'
serialization:
write-dates-as-timestamps: false
# InfluxDB 配置
influxdb:
url: http://localhost:8086
token: ${INFLUXDB_TOKEN:your-influxdb-token}
org: ${INFLUXDB_ORG:your-org}
# 分片配置
sharding:
enabled: true
instances: 3
cardinality-threshold: 50000
bucket: monitoring
# 监控配置
monitoring:
cardinality-threshold: 50000
growth-rate-threshold: 0.1
# MQTT 配置
mqtt:
broker-url: tcp://localhost:1883
client-id: timeseries-client
username: ${MQTT_USERNAME:}
password: ${MQTT_PASSWORD:}
keep-alive-interval: 60
connection-timeout: 30
clean-session: false
# 主题配置
timeseries-data-topic: timeseries/data
cardinality-alert-topic: timeseries/alerts/cardinality
performance-metrics-topic: timeseries/metrics/performance
# Actuator 配置
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus
endpoint:
health:
show-details: always
metrics:
export:
influx:
enabled: true
uri: ${influxdb.url}
token: ${influxdb.token}
org: ${influxdb.org}
bucket: metrics
step: 30s
# 日志配置
logging:
level:
com.timeseries: DEBUG
org.springframework.integration: INFO
com.influxdb: INFO
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
file: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
file:
name: logs/time-series-optimizer.log
# 服务器配置
server:
port: 8080
servlet:
context-path: /
compression:
enabled: true
mime-types: application/json,text/plain,text/css,application/javascript
min-response-size: 1024
5.2 Docker Compose 部署配置
version: '3.8'
services:
# InfluxDB 主实例
influxdb-primary:
image: influxdb:2.7
container_name: influxdb-primary
ports:
- "8086:8086"
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=password123
- DOCKER_INFLUXDB_INIT_ORG=myorg
- DOCKER_INFLUXDB_INIT_BUCKET=monitoring
- DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-auth-token
volumes:
- influxdb-primary-data:/var/lib/influxdb2
- influxdb-primary-config:/etc/influxdb2
networks:
- timeseries-network
# InfluxDB 分片实例 1
influxdb-shard1:
image: influxdb:2.7
container_name: influxdb-shard1
ports:
- "8087:8086"
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=password123
- DOCKER_INFLUXDB_INIT_ORG=myorg
- DOCKER_INFLUXDB_INIT_BUCKET=monitoring
- DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-auth-token
volumes:
- influxdb-shard1-data:/var/lib/influxdb2
- influxdb-shard1-config:/etc/influxdb2
networks:
- timeseries-network
# InfluxDB 分片实例 2
influxdb-shard2:
image: influxdb:2.7
container_name: influxdb-shard2
ports:
- "8088:8086"
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=password123
- DOCKER_INFLUXDB_INIT_ORG=myorg
- DOCKER_INFLUXDB_INIT_BUCKET=monitoring
- DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-auth-token
volumes:
- influxdb-shard2-data:/var/lib/influxdb2
- influxdb-shard2-config:/etc/influxdb2
networks:
- timeseries-network
# MQTT Broker
mosquitto:
image: eclipse-mosquitto:2.0
container_name: mosquitto
ports:
- "1883:1883"
- "9001:9001"
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
networks:
- timeseries-network
# Grafana
grafana:
image: grafana/grafana:10.0.0
container_name: grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin123
volumes:
- grafana-data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
networks:
- timeseries-network
depends_on:
- influxdb-primary
# 时序数据优化服务
time-series-optimizer:
build: .
container_name: time-series-optimizer
ports:
- "8080:8080"
environment:
- INFLUXDB_TOKEN=my-super-secret-auth-token
- INFLUXDB_ORG=myorg
- MQTT_USERNAME=
- MQTT_PASSWORD=
networks:
- timeseries-network
depends_on:
- influxdb-primary
- influxdb-shard1
- influxdb-shard2
- mosquitto
volumes:
- ./logs:/app/logs
networks:
timeseries-network:
driver: bridge
volumes:
influxdb-primary-data:
influxdb-primary-config:
influxdb-shard1-data:
influxdb-shard1-config:
influxdb-shard2-data:
influxdb-shard2-config:
grafana-data:
6. 项目运行和测试
6.1 启动项目
# 1. 启动所有服务
docker-compose up -d
# 2. 查看服务状态
docker-compose ps
# 3. 查看日志
docker-compose logs -f time-series-optimizer
# 4. 测试API接口
curl -X POST http://localhost:8080/api/v1/timeseries/data \
-H "Content-Type: application/json" \
-d '{
"measurement": "cpu_usage",
"tags": {
"host": "server-01",
"region": "us-east"
},
"fields": {
"value": 85.5,
"cores": 8
},
"timestamp": "2024-01-15T10:30:00Z"
}'
6.2 性能测试脚本
package com.timeseries.test;
import com.timeseries.model.TimeSeriesData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.client.RestTemplate;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 性能测试工具
* 模拟高并发写入场景,测试系统性能
*/
@Slf4j
@SpringBootApplication
public class PerformanceTestRunner implements CommandLineRunner {
private final RestTemplate restTemplate = new RestTemplate();
private final String API_URL = "http://localhost:8080/api/v1/timeseries/data";
private final Random random = new Random();
public static void main(String[] args) {
SpringApplication.run(PerformanceTestRunner.class, args);
}
@Override
public void run(String... args) throws Exception {
log.info("开始性能测试...");
// 测试场景1:低基数高频写入
testLowCardinalityHighFrequency();
// 测试场景2:高基数写入
testHighCardinality();
// 测试场景3:混合负载
testMixedWorkload();
log.info("性能测试完成");
}
/**
* 低基数高频写入测试
*/
private void testLowCardinalityHighFrequency() throws InterruptedException {
log.info("测试场景1:低基数高频写入");
ExecutorService executor = Executors.newFixedThreadPool(10);
long startTime = System.currentTimeMillis();
int totalPoints = 10000;
for (int i = 0; i < totalPoints; i++) {
final int index = i;
executor.submit(() -> {
try {
TimeSeriesData data = TimeSeriesData.builder()
.measurement("system_metrics")
.tags(Map.of(
"host", "server-" + (index % 5 + 1),
"metric", "cpu_usage"
))
.fields(Map.of(
"value", 50 + random.nextDouble() * 50,
"cores", 8
))
.timestamp(Instant.now())
.build();
restTemplate.postForObject(API_URL, data, String.class);
} catch (Exception e) {
log.error("写入失败: {}", e.getMessage());
}
});
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
long duration = System.currentTimeMillis() - startTime;
double throughput = (double) totalPoints / (duration / 1000.0);
log.info("低基数测试完成 - 数据点: {}, 耗时: {}ms, 吞吐量: {:.2f} points/sec",
totalPoints, duration, throughput);
}
/**
* 高基数写入测试
*/
private void testHighCardinality() throws InterruptedException {
log.info("测试场景2:高基数写入");
ExecutorService executor = Executors.newFixedThreadPool(5);
long startTime = System.currentTimeMillis();
int totalPoints = 5000;
for (int i = 0; i < totalPoints; i++) {
final int index = i;
executor.submit(() -> {
try {
TimeSeriesData data = TimeSeriesData.builder()
.measurement("user_events")
.tags(Map.of(
"user_id", "user_" + (index % 1000),
"session_id", "session_" + random.nextInt(10000),
"event_type", "click"
))
.fields(Map.of(
"duration", random.nextInt(5000),
"value", random.nextDouble()
))
.timestamp(Instant.now())
.build();
restTemplate.postForObject(API_URL, data, String.class);
} catch (Exception e) {
log.error("写入失败: {}", e.getMessage());
}
});
}
executor.shutdown();
executor.awaitTermination(120, TimeUnit.SECONDS);
long duration = System.currentTimeMillis() - startTime;
double throughput = (double) totalPoints / (duration / 1000.0);
log.info("高基数测试完成 - 数据点: {}, 耗时: {}ms, 吞吐量: {:.2f} points/sec",
totalPoints, duration, throughput);
}
/**
* 混合负载测试
*/
private void testMixedWorkload() throws InterruptedException {
log.info("测试场景3:混合负载");
ExecutorService executor = Executors.newFixedThreadPool(8);
long startTime = System.currentTimeMillis();
int totalPoints = 8000;
for (int i = 0; i < totalPoints; i++) {
final int index = i;
executor.submit(() -> {
try {
TimeSeriesData data;
if (index % 3 == 0) {
// 系统指标(低基数)
data = createSystemMetric(index);
} else if (index % 3 == 1) {
// HTTP请求(中等基数)
data = createHttpMetric(index);
} else {
// 用户事件(高基数)
data = createUserEvent(index);
}
restTemplate.postForObject(API_URL, data, String.class);
} catch (Exception e) {
log.error("写入失败: {}", e.getMessage());
}
});
}
executor.shutdown();
executor.awaitTermination(180, TimeUnit.SECONDS);
long duration = System.currentTimeMillis() - startTime;
double throughput = (double) totalPoints / (duration / 1000.0);
log.info("混合负载测试完成 - 数据点: {}, 耗时: {}ms, 吞吐量: {:.2f} points/sec",
totalPoints, duration, throughput);
}
private TimeSeriesData createSystemMetric(int index) {
return TimeSeriesData.builder()
.measurement("system_metrics")
.tags(Map.of(
"host", "server-" + (index % 10 + 1),
"metric", "memory_usage"
))
.fields(Map.of(
"value", 30 + random.nextDouble() * 40,
"total", 16384
))
.timestamp(Instant.now())
.build();
}
private TimeSeriesData createHttpMetric(int index) {
String[] methods = {"GET", "POST", "PUT", "DELETE"};
return TimeSeriesData.builder()
.measurement("http_requests")
.tags(Map.of(
"method", methods[index % 4],
"endpoint", "/api/v1/endpoint-" + (index % 50),
"status", "200"
))
.fields(Map.of(
"response_time", 50 + random.nextInt(500),
"bytes", random.nextInt(10000)
))
.timestamp(Instant.now())
.build();
}
private TimeSeriesData createUserEvent(int index) {
return TimeSeriesData.builder()
.measurement("user_events")
.tags(Map.of(
"user_id", "user_" + (index % 2000),
"session_id", "session_" + random.nextInt(50000),
"page", "page_" + (index % 100)
))
.fields(Map.of(
"duration", random.nextInt(30000),
"clicks", random.nextInt(20)
))
.timestamp(Instant.now())
.build();
}
}
7. 实战经验总结
7.1 踩过的坑
1. 标签设计不当导致基数爆炸
最开始我们把用户ID直接作为标签,结果基数瞬间飙升到几百万。后来改成把用户ID放到字段里,基数立马降下来了。
// ❌ 错误做法
Point.measurement("user_activity")
.tag("user_id", "user_12345") // 这样会导致基数爆炸
.field("action", "click");
// ✅ 正确做法
Point.measurement("user_activity")
.tag("action_type", "click") // 用有限枚举值做标签
.field("user_id", "user_12345"); // 高基数值放字段里
2. 分片策略选择不当
一开始用简单的轮询分片,结果数据分布很不均匀。后来改用一致性哈希,效果好了很多。
3. 批量写入大小没调好
批次太小(100条)网络开销大,批次太大(10000条)内存压力大。最后发现1000-2000条是最佳平衡点。
7.2 性能优化心得
1. 写入优化
- 用批量写入,比单条写入快5-10倍
- 异步写入,不要阻塞业务线程
- 合理设置批次大小和刷新间隔
2. 查询优化
- 一定要加时间范围限制
- 用标签过滤比字段过滤快
- 适当降采样,减少数据传输量
3. 资源配置
- InfluxDB内存要给足,建议8GB起步
- SSD硬盘对写入性能提升明显
- 网络带宽也很重要,特别是分片场景
7.3 监控告警建议
1. 关键指标监控
- 基数增长率:超过20%/小时要告警
- 写入延迟:超过1秒要关注
- 查询延迟:超过5秒要优化
- 内存使用率:超过80%要扩容
2. 告警策略
- 分级告警:警告、严重、紧急
- 避免告警风暴:相同告警5分钟内只发一次
- 自动恢复通知:问题解决后要通知
7.4 最佳实践
1. 数据模型设计
- 标签用于分组和过滤,字段用于存储数值
- 标签值要有限且可预测
- 测量名称要有层次结构
2. 分片策略
- 低基数用主库,高基数才分片
- 分片数量不要太多,3-5个够用
- 定期检查分片负载均衡
3. 运维管理
- 定期备份数据和配置
- 监控磁盘空间使用
- 制定数据保留策略
8. 总结
这套解决方案在我们的生产环境已经稳定运行半年多了,效果还不错:
性能提升:
- 写入吞吐量从2000点/秒提升到8000点/秒
- 查询响应时间从平均3秒降到800毫秒
- 支持的时间线数量从5万提升到200万
运维简化:
- 基数监控自动化,不用人工盯着
- 分片策略自适应,减少手动调整
- 告警及时准确,故障处理更快
成本控制:
- 通过智能分片,硬件成本没有线性增长
- 查询优化减少了CPU和内存消耗
- 自动化运维降低了人力成本
当然,这套方案也不是万能的。如果你的基数真的特别高(千万级别),可能还需要考虑:
- 更激进的分片策略:比如按时间+标签双重分片
- 数据预聚合:提前计算常用的聚合结果
- 冷热数据分离:老数据迁移到成本更低的存储
总的来说,时序数据库的高基数问题确实挺棘手的,但只要方法得当,还是能很好解决的。关键是要理解基数的本质,然后针对性地设计解决方案。
希望这篇文章能帮到遇到类似问题的朋友。如果有什么问题,欢迎交流讨论。
- 点赞
- 收藏
- 关注作者
评论(0)