物联网平台MySQL:智能调优与自动化运维实践
在物联网项目的实践中,数据库性能问题往往是最大的挑战。随着设备数量的增长,数据量呈指数级上升,传统的手工调优方式已经无法满足需求。
经过大半年的技术攻关,我们团队开发了一套自动化调优系统,在多个生产环境中取得了良好的效果。本文将分享这套系统的核心技术和实践经验,为面临类似挑战的技术团队提供参考。
1 索引优化:让系统自己找到最优解
传统的索引优化依赖DBA手工分析慢查询,逐个建立索引。但物联网项目面临的挑战完全不同:设备数量常常达到几十万甚至上百万台,数据增长速度极快,人工调优已经无法跟上节奏,而且业务场景还在持续变化。
基于这种现状,我们开始探索让系统自主学习优化的可能性。经过几个月的技术攻关,开发出一套自动化调优方案。这套系统已在多个项目中稳定运行超过一年,取得了良好的效果。
物联网查询具有明显的规律性特征。以农业项目为例,春季播种期间,土壤数据查询频率极高;秋季收获时,产量统计成为查询重点。工业项目同样如此,白班和夜班的查询模式存在明显差异。
传统的索引建立方式是一次性配置,后续很少调整。这种静态方式无法适应业务的动态变化。我们设计了一套基于查询日志分析的系统,能够自动识别慢查询、发现无效索引,并提供优化建议。
总结下来,物联网查询有这么几个特点:
- 时间相关的查询特别多,基本都是查最近的数据
- 按设备查询是家常便饭,设备ID、设备类型这些字段用得最频繁
- 地理位置查询也不少,特别是做监控大屏的时候
- 各种统计分析,按小时、按天汇总数据
基于这些特点,我们设计了一个自动分析系统。
1.1 自动索引分析
以农业项目为例,查询模式的季节性变化非常明显。春季播种期间,土壤湿度数据查询频率极高;夏季转向温度监控和灌溉管理;秋季收获时,产量统计查询成为主要需求。
初期采用手工建索引的方式,很快发现无法适应这种动态变化。春季建立的索引到夏季可能使用率大幅下降。基于查询模式的规律性特征,我们决定让系统自主学习和优化。
因此开发了专门的查询模式分析器:
@Component
public class IoTQueryPatternAnalyzer {
private final QueryLogRepository queryLogRepository;
private final IndexUsageRepository indexUsageRepository;
private final SeasonalPatternService seasonalPatternService;
/**
* 分析查询模式,生成索引优化建议
* 针对农业项目的季节性特点进行优化
*/
public List<IndexOptimizationSuggestion> analyzeIoTQueryPatterns(TimeRange analysisTimeRange) {
List<IndexOptimizationSuggestion> suggestions = new ArrayList<>();
// 获取指定时间范围的查询日志
List<QueryLog> queryLogs = queryLogRepository.findByTimeRange(analysisTimeRange);
log.info("获取到 {} 条查询日志,开始分析", queryLogs.size());
// 按业务类型分组:设备监控、环境数据、告警、报表等
Map<IoTQueryType, List<QueryLog>> queryByType = queryLogs.stream()
.collect(Collectors.groupingBy(this::classifyIoTQueryType));
// 获取当前季节模式,调整索引策略
SeasonalPattern currentSeason = seasonalPatternService.getCurrentSeasonPattern();
log.info("当前季节:{},调整索引策略", currentSeason.getSeasonName());
// 识别低效索引
suggestions.addAll(identifyIneffectiveIndexes(queryLogs, currentSeason));
// 识别缺失的关键索引
suggestions.addAll(identifyMissingIndexes(queryLogs, currentSeason));
// 生成季节性索引建议
suggestions.addAll(generateSeasonalIndexSuggestions(currentSeason));
// 按性能收益排序
suggestions.sort((s1, s2) ->
Double.compare(s2.getEstimatedPerformanceGain(), s1.getEstimatedPerformanceGain()));
log.info("生成 {} 条优化建议,预计性能提升 {}%",
suggestions.size(),
suggestions.stream().mapToDouble(IndexOptimizationSuggestion::getEstimatedPerformanceGain).sum());
return suggestions;
}
1.2 低效索引识别算法
随着系统运行时间的增长,部分索引会逐渐失去价值。例如在某个农业项目中,客户从种植玉米转向大豆,原有的玉米相关传感器索引就成了冗余资源。
识别无效索引需要从多个维度进行评估:
- 使用频率分析:长期低使用率的索引可能已无必要
- 维护成本计算:某些索引占用存储空间且影响写入性能
- 重复索引检测:避免功能相似的索引重复建立
需要特别注意的是季节性索引的保护。例如春播期专用索引虽然在秋季使用率低,但在下一个春季仍有重要价值。
基于这些考虑,我们设计了低效索引识别算法:
/**
* 识别低效索引
* 保护季节性索引,避免误删
*
* 判断标准:
* 1. 使用率低于10%
* 2. 维护成本高
* 3. 非季节性专用索引
*/
private List<IndexOptimizationSuggestion> identifyIneffectiveIndexes(
List<QueryLog> queryLogs, SeasonalPattern season) {
List<IndexOptimizationSuggestion> suggestions = new ArrayList<>();
// 获取所有索引使用统计
List<IndexUsageStats> indexStats = indexUsageRepository.getIndexUsageStats();
log.info("检查 {} 个索引的使用情况", indexStats.size());
for (IndexUsageStats stats : indexStats) {
// 计算索引使用率
double usageRate = calculateIoTIndexUsageRate(stats, queryLogs);
// 计算维护成本
double maintenanceCost = calculateIndexMaintenanceCost(stats);
// 低使用率且高成本的索引考虑删除
if (usageRate < 0.1 && maintenanceCost > getAverageMaintenanceCost()) {
// 检查是否为季节性索引
boolean isSeasonalIndex = isAgriculturalSeasonalIndex(stats.getIndexName(), season);
if (!isSeasonalIndex) {
// 生成删除建议
suggestions.add(IndexOptimizationSuggestion.builder()
.type(OptimizationType.DROP_INDEX)
.indexName(stats.getIndexName())
.tableName(stats.getTableName())
.reason(String.format("使用率%.2f%%,月维护成本%.2fMB",
usageRate * 100, maintenanceCost))
.estimatedPerformanceGain(maintenanceCost * 0.8) // 节省80%维护成本
.priority(Priority.MEDIUM)
.iotSpecific(true) // 物联网专用优化
.build());
log.debug("识别低效索引: 表={}, 索引={}, 使用率={:.2f}%, 维护成本={:.2f}MB/月",
stats.getTableName(), stats.getIndexName(),
usageRate * 100, maintenanceCost);
}
}
}
log.info("识别 {} 个低效索引,可节省 {:.2f}MB 存储空间",
suggestions.size(),
suggestions.stream().mapToDouble(IndexOptimizationSuggestion::getEstimatedPerformanceGain).sum());
return suggestions;
}
1.3 缺失索引智能识别
系统性能瓶颈往往不是索引过多,而是缺少关键索引。通过分析慢查询模式,可以精准识别需要建立的索引。
具体实现分为几个步骤:首先筛选执行时间超过1秒的慢查询,然后分析这些查询的共同特征。比如大量查询都基于设备ID进行检索,或者频繁查询特定时间段的数据。
当相同查询模式重复出现时,就表明需要针对性地建立索引。这种基于实际使用模式的索引策略,比盲目建索引更加有效。
/**
* 找出缺失的重要索引
* 主要看慢查询的模式
*
* 思路:
* 1. 找出执行超过1秒的慢查询
* 2. 按查询模式分组
* 3. 找出高频的查询模式
* 4. 针对性地建议索引
*/
private List<IndexOptimizationSuggestion> identifyMissingIndexes(
List<QueryLog> queryLogs, SeasonalPattern season) {
List<IndexOptimizationSuggestion> suggestions = new ArrayList<>();
// 找出慢查询(超过1秒的)
List<QueryLog> slowQueries = queryLogs.stream()
.filter(log -> log.getExecutionTime() > 1000)
.filter(log -> isIoTRelatedQuery(log)) // 只看物联网相关的查询
.collect(Collectors.toList());
log.info("找到 {} 条慢查询,开始分析需要建哪些索引", slowQueries.size());
// 按查询模式分组
Map<String, List<QueryLog>> queryPatterns = slowQueries.stream()
.collect(Collectors.groupingBy(this::extractIoTQueryPattern));
for (Map.Entry<String, List<QueryLog>> entry : queryPatterns.entrySet()) {
String pattern = entry.getKey();
List<QueryLog> patternQueries = entry.getValue();
// 只处理高频的查询模式(出现5次以上才值得建索引)
if (patternQueries.size() >= 5) {
IndexSuggestion suggestion = analyzeIoTQueryPatternForIndex(pattern, patternQueries);
if (suggestion != null) {
// 算算能提升多少性能
double avgExecutionTime = patternQueries.stream()
.mapToDouble(QueryLog::getExecutionTime)
.average().orElse(0);
// 一般建了索引能快70%
double estimatedImprovement = avgExecutionTime * 0.7;
suggestions.add(IndexOptimizationSuggestion.builder()
.type(OptimizationType.CREATE_INDEX)
.tableName(suggestion.getTableName())
.columns(suggestion.getColumns())
.indexType(suggestion.getIndexType())
.reason(String.format("高频慢查询,出现%d次,平均耗时%.2fms,影响监控效率",
patternQueries.size(), avgExecutionTime))
.estimatedPerformanceGain(estimatedImprovement)
.priority(Priority.HIGH)
.iotSpecific(true)
.seasonalRelevance(calculateSeasonalRelevance(pattern, season))
.build());
log.info("需要建索引: 表={}, 列={}, 查询{}次, 平均{}ms, 季节相关性={}",
suggestion.getTableName(), suggestion.getColumns(),
patternQueries.size(), avgExecutionTime,
calculateSeasonalRelevance(pattern, season));
}
}
}
log.info("需要建 {} 个索引,预计每天能节省 {:.2f}秒",
suggestions.size(),
suggestions.stream().mapToDouble(IndexOptimizationSuggestion::getEstimatedPerformanceGain).sum() / 1000);
return suggestions;
}
1.4 农业物联网季节性索引策略
农业项目的数据访问模式具有明显的季节性特征。春季播种期间,土壤监测数据成为关注焦点;夏季转向灌溉控制和病虫害防治;秋季收获时,产量统计和收获数据查询激增。
冬季相对平缓,主要进行设备维护和历史数据分析。
基于这种季节性规律,索引策略需要动态调整以匹配实际业务需求:
/**
* 根据农业季节调整索引策略
* 春耕夏管秋收冬藏,每个季节重点不一样
*
* 简单说就是:
* - 春天:土壤监测、播种数据
* - 夏天:灌溉控制、病虫害监测
* - 秋天:产量统计、收获数据
* - 冬天:设备维护、历史数据分析
*/
private List<IndexOptimizationSuggestion> generateSeasonalIndexSuggestions(SeasonalPattern season) {
List<IndexOptimizationSuggestion> suggestions = new ArrayList<>();
switch (season.getSeasonName()) {
case "SPRING":
// 春天播种,土壤数据查询特别多
suggestions.add(createAgriculturalSeasonalIndex(
"sensor_data",
Arrays.asList("sensor_type", "timestamp", "field_id", "soil_moisture_level"),
"春播期土壤监测查询增长300%,需要专门的索引",
Priority.HIGH,
"SPRING_SOIL_MONITORING"
));
// 播种计划也是春天的重点
suggestions.add(createAgriculturalSeasonalIndex(
"planting_schedule",
Arrays.asList("field_id", "planting_date", "crop_type", "seed_variety"),
"春耕期播种计划和种子管理查询频繁",
Priority.HIGH,
"SPRING_PLANTING"
));
break;
case "SUMMER":
// 夏天主要是灌溉和病虫害防治
suggestions.add(createAgriculturalSeasonalIndex(
"irrigation_logs",
Arrays.asList("field_id", "irrigation_date", "water_amount", "soil_moisture_before"),
"夏季灌溉高峰,查询量增长400%",
Priority.HIGH,
"SUMMER_IRRIGATION"
));
// 夏天病虫害多,监测数据查询频繁
suggestions.add(createAgriculturalSeasonalIndex(
"pest_monitoring",
Arrays.asList("field_id", "detection_date", "pest_type", "severity_level"),
"夏季病虫害高发,监测和预警查询特别多",
Priority.HIGH,
"SUMMER_PEST_CONTROL"
));
break;
case "AUTUMN":
// 秋天收获,产量统计是重点
suggestions.add(createAgriculturalSeasonalIndex(
"harvest_data",
Arrays.asList("field_id", "harvest_date", "crop_type", "yield_per_hectare"),
"秋收期产量统计和收益分析查询暴增",
Priority.HIGH,
"AUTUMN_HARVEST"
));
// 质量检测也很重要
suggestions.add(createAgriculturalSeasonalIndex(
"quality_assessment",
Arrays.asList("batch_id", "test_date", "quality_grade", "moisture_content"),
"收获季质量检测和分级查询频繁",
Priority.MEDIUM,
"AUTUMN_QUALITY"
));
break;
case "WINTER":
// 冬天农闲,主要是设备维护
suggestions.add(createAgriculturalSeasonalIndex(
"maintenance_logs",
Arrays.asList("device_id", "maintenance_date", "maintenance_type", "technician_id"),
"冬季设备维护期,维护记录查询增多",
Priority.MEDIUM,
"WINTER_MAINTENANCE"
));
// 年底总结报告
suggestions.add(createAgriculturalSeasonalIndex(
"annual_reports",
Arrays.asList("year", "field_id", "crop_type", "total_yield"),
"冬季农闲期,年度数据汇总和报告生成",
Priority.LOW,
"WINTER_ANALYSIS"
));
break;
}
log.info("{}季节需要 {} 个专门的索引", season.getSeasonName(), suggestions.size());
return suggestions;
}
}
1.5 索引监控和自动优化
索引建好了还不够,还得时刻盯着它们的使用情况。物联网项目里,有几类索引特别重要:时间戳索引(查历史数据用)、设备ID索引(按设备查询用)、地理位置索引(做地图展示用)、告警状态索引(查故障用)。
我们写了个监控视图,专门用来分析这些索引的使用情况:
-- 物联网索引使用情况分析
-- 专门针对IoT数据特点设计的监控视图
CREATE VIEW iot_index_usage_analysis AS
SELECT
t.TABLE_SCHEMA as database_name,
t.TABLE_NAME as table_name,
s.INDEX_NAME as index_name,
s.COLUMN_NAME as column_name,
s.SEQ_IN_INDEX as column_position,
-- 按IoT场景分类索引类型
CASE
WHEN s.INDEX_NAME = 'PRIMARY' THEN 'PRIMARY KEY'
WHEN s.NON_UNIQUE = 0 THEN 'UNIQUE INDEX'
WHEN s.COLUMN_NAME LIKE '%timestamp%' OR s.COLUMN_NAME LIKE '%time%' THEN 'TIME_SERIES_INDEX' -- 时间序列索引
WHEN s.COLUMN_NAME LIKE '%device_id%' OR s.COLUMN_NAME LIKE '%sensor_id%' THEN 'DEVICE_INDEX' -- 设备索引
WHEN s.COLUMN_NAME LIKE '%location%' OR s.COLUMN_NAME LIKE '%coordinate%' THEN 'SPATIAL_INDEX' -- 地理位置索引
WHEN s.COLUMN_NAME LIKE '%status%' OR s.COLUMN_NAME LIKE '%alert%' THEN 'STATUS_INDEX' -- 状态索引
ELSE 'REGULAR_INDEX' -- 普通索引
END as iot_index_type,
s.CARDINALITY as cardinality,
-- 索引使用统计(需要开启performance_schema)
COALESCE(ius.COUNT_FETCH, 0) as fetch_count, -- 查询次数
COALESCE(ius.COUNT_INSERT, 0) as insert_count, -- 插入次数
COALESCE(ius.COUNT_UPDATE, 0) as update_count, -- 更新次数
COALESCE(ius.COUNT_DELETE, 0) as delete_count, -- 删除次数
-- 计算索引使用效率(考虑IoT写多读少的特点)
CASE
WHEN COALESCE(ius.COUNT_FETCH, 0) = 0 THEN 0 -- 没有查询就是0效率
ELSE ROUND(
COALESCE(ius.COUNT_FETCH, 0) /
(COALESCE(ius.COUNT_FETCH, 0) + COALESCE(ius.COUNT_INSERT, 0) * 0.1 +
COALESCE(ius.COUNT_UPDATE, 0) * 0.5 + COALESCE(ius.COUNT_DELETE, 0) * 0.3) * 100, 2
) -- 读写比例计算,写操作权重较低
END as iot_usage_efficiency_percent,
-- 索引占用空间估算
ROUND(
(t.DATA_LENGTH + t.INDEX_LENGTH) / 1024 / 1024 *
(s.CARDINALITY / GREATEST(t.TABLE_ROWS, 1)), 2
) as estimated_size_mb, -- 预估索引大小
-- 表的基本信息
t.TABLE_ROWS as table_rows, -- 总行数
ROUND((t.DATA_LENGTH + t.INDEX_LENGTH) / 1024 / 1024, 2) as table_size_mb, -- 表大小
-- 数据增长速度(IoT数据增长很快)
CASE
WHEN t.UPDATE_TIME IS NOT NULL AND t.CREATE_TIME IS NOT NULL THEN
ROUND(t.TABLE_ROWS / GREATEST(DATEDIFF(t.UPDATE_TIME, t.CREATE_TIME), 1), 0)
ELSE 0
END as avg_daily_growth_rows, -- 平均每天增长多少行
t.CREATE_TIME as table_created,
t.UPDATE_TIME as table_updated
FROM information_schema.STATISTICS s
JOIN information_schema.TABLES t
ON s.TABLE_SCHEMA = t.TABLE_SCHEMA
AND s.TABLE_NAME = t.TABLE_NAME
LEFT JOIN performance_schema.table_io_waits_summary_by_index_usage ius
ON s.TABLE_SCHEMA = ius.OBJECT_SCHEMA
AND s.TABLE_NAME = ius.OBJECT_NAME
AND s.INDEX_NAME = ius.INDEX_NAME
WHERE s.TABLE_SCHEMA NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys')
AND t.TABLE_TYPE = 'BASE TABLE'
-- 只看IoT相关的表
AND (t.TABLE_NAME LIKE '%sensor%' -- 传感器表
OR t.TABLE_NAME LIKE '%device%' -- 设备表
OR t.TABLE_NAME LIKE '%iot%' -- IoT相关表
OR t.TABLE_NAME LIKE '%telemetry%' -- 遥测数据表
OR t.TABLE_NAME LIKE '%measurement%') -- 测量数据表
ORDER BY s.TABLE_SCHEMA, s.TABLE_NAME, s.INDEX_NAME, s.SEQ_IN_INDEX;
2 执行计划缓存与物联网专用优化
物联网项目的查询场景复杂多样:时间段传感器数据统计、多维度设备状态分析、地理区域查询、实时告警统计等。这些查询往往需要关联多张表,涉及时间窗口计算和复杂聚合操作。
如果每次都重新生成执行计划,会严重影响性能。因此,执行计划缓存在物联网场景中显得尤为重要。
2.1 查询缓存配置
物联网项目具有相对固定的查询模式。设备状态每隔几分钟查询一次,传感器数据每小时统计一次。
这种高频重复查询场景非常适合通过缓存优化:
-- 查询缓存配置(针对IoT重复查询优化)
-- 1. 查询结果缓存
SET GLOBAL query_cache_size = 536870912; -- 512MB,IoT查询结果比较大
SET GLOBAL query_cache_type = ON; -- 开启查询缓存
SET GLOBAL query_cache_limit = 4194304; -- 单个查询最大4MB,适合聚合查询
-- 2. 表缓存优化(IoT项目表很多)
SET GLOBAL table_open_cache = 8000; -- 表缓存调大点,设备表多
SET GLOBAL table_definition_cache = 4000; -- 表定义缓存
SET GLOBAL table_open_cache_instances = 16; -- 多实例缓存,减少锁等待
2.2 时序数据分区查询优化
物联网数据通常按时间分区存储,查询时需要指定时间范围。
MySQL的分区裁剪功能能够显著提升此类查询的性能:
-- 时序数据分区查询优化
SET SESSION optimizer_prune_level = 1; -- 开启分区裁剪,只查需要的分区
SET SESSION optimizer_search_depth = 62; -- 搜索深度调大
SET SESSION eq_range_index_dive_limit = 200; -- 范围查询优化
-- 时间窗口查询专门优化
SET SESSION range_optimizer_max_mem_size = 8388608; -- 8MB内存做范围优化
SET SESSION optimizer_switch = 'index_condition_pushdown=on,mrr=on,mrr_cost_based=on'; -- 开启高级优化
2.3 执行计划监控
仅有配置优化还不够,还需要持续监控查询执行情况。
为此专门设计了监控表,记录物联网查询的执行计划和性能指标:
-- IoT查询执行计划监控表
CREATE TABLE iot_execution_plan_monitor (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
query_hash VARCHAR(64) NOT NULL, -- 查询哈希,用来去重
query_type ENUM('SENSOR_DATA', 'DEVICE_STATUS', 'ALERT_ANALYSIS', 'SPATIAL_QUERY', 'AGGREGATION') NOT NULL, -- 查询类型
query_text TEXT, -- SQL语句(敏感信息已脱敏)
execution_plan JSON, -- 执行计划
execution_time_ms INT, -- 执行耗时
rows_examined BIGINT, -- 扫描了多少行
rows_sent BIGINT, -- 返回了多少行
-- IoT专门的监控指标
device_count INT, -- 查询涉及多少个设备
time_range_hours INT, -- 查询的时间跨度
partition_count INT, -- 用了几个分区
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- 记录时间
-- 专门为IoT查询设计的索引
INDEX idx_query_type_time (query_type, created_at), -- 按查询类型和时间查找
INDEX idx_query_hash_time (query_hash, created_at), -- 按查询哈希和时间查找
INDEX idx_execution_time (execution_time_ms), -- 按执行时间排序
INDEX idx_device_count (device_count), -- 按设备数量查找
INDEX idx_created_at (created_at) -- 按时间查找
) PARTITION BY RANGE (UNIX_TIMESTAMP(created_at)) (
PARTITION p_current VALUES LESS THAN (UNIX_TIMESTAMP('2024-02-01')),
PARTITION p_future VALUES LESS THAN MAXVALUE
);
2.6 智能告警触发器
仅仅记录数据还不够,系统需要具备自动发现问题的能力。
通过触发器实现查询执行时间异常的自动告警:
-- IoT查询性能异常告警触发器
DELIMITER //
CREATE TRIGGER iot_execution_plan_alert
AFTER INSERT ON iot_execution_plan_monitor
FOR EACH ROW
BEGIN
DECLARE avg_execution_time DECIMAL(10,2);
DECLARE threshold_multiplier DECIMAL(3,1) DEFAULT 1.5;
-- 不同查询类型设置不同的告警阈值
CASE NEW.query_type
WHEN 'SENSOR_DATA' THEN SET threshold_multiplier = 2.0; -- 传感器查询慢点没关系
WHEN 'DEVICE_STATUS' THEN SET threshold_multiplier = 1.3; -- 设备状态要快
WHEN 'ALERT_ANALYSIS' THEN SET threshold_multiplier = 1.2; -- 告警分析必须快
WHEN 'SPATIAL_QUERY' THEN SET threshold_multiplier = 3.0; -- 地图查询本来就慢
WHEN 'AGGREGATION' THEN SET threshold_multiplier = 2.5; -- 聚合查询可以慢点
END CASE;
-- 算出最近7天同类查询的平均耗时
SELECT AVG(execution_time_ms) INTO avg_execution_time
FROM iot_execution_plan_monitor
WHERE query_type = NEW.query_type
AND created_at >= DATE_SUB(NOW(), INTERVAL 7 DAY)
AND id != NEW.id;
-- 如果超过阈值就告警
IF NEW.execution_time_ms > avg_execution_time * threshold_multiplier AND avg_execution_time > 0 THEN
INSERT INTO iot_performance_alerts (alert_type, query_type, message, query_hash, created_at)
VALUES (
'IOT_QUERY_PERFORMANCE_DEGRADATION',
NEW.query_type,
CONCAT('查询变慢了:', NEW.query_type, ' 现在要', NEW.execution_time_ms, 'ms,平时只要',
ROUND(avg_execution_time, 2), 'ms,涉及', COALESCE(NEW.device_count, 0), '个设备'),
NEW.query_hash,
NOW()
);
END IF;
END//
DELIMITER ;
2.2 分区策略优化
物联网项目的数据量增长迅猛,每天产生数TB的传感器数据。
大部分查询集中在最近7天的数据,历史数据访问频率较低。通过分区策略将新旧数据分离,让查询只扫描必要的分区,确保100ms内响应。
-- IoT时序数据分区管理
-- 自动管理IoT数据的分区,按数据生命周期优化
-- 1. IoT数据自动分区管理存储过程
DELIMITER //
CREATE PROCEDURE manage_iot_time_series_partitions()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE table_name VARCHAR(64);
DECLARE partition_name VARCHAR(64);
DECLARE partition_date DATE;
DECLARE table_type VARCHAR(32);
-- 游标:找出所有IoT相关的表
DECLARE iot_table_cursor CURSOR FOR
SELECT TABLE_NAME,
CASE
WHEN TABLE_NAME LIKE '%sensor_data%' THEN 'SENSOR_DATA' -- 传感器数据
WHEN TABLE_NAME LIKE '%device_status%' THEN 'DEVICE_STATUS' -- 设备状态
WHEN TABLE_NAME LIKE '%alert%' OR TABLE_NAME LIKE '%alarm%' THEN 'ALERT_DATA' -- 告警数据
WHEN TABLE_NAME LIKE '%telemetry%' THEN 'TELEMETRY_DATA' -- 遥测数据
ELSE 'OTHER_IOT_DATA' -- 其他IoT数据
END as table_type
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE()
AND (TABLE_NAME LIKE '%sensor%' -- 传感器表
OR TABLE_NAME LIKE '%device%' -- 设备表
OR TABLE_NAME LIKE '%iot%' -- IoT表
OR TABLE_NAME LIKE '%telemetry%' -- 遥测表
OR TABLE_NAME LIKE '%alert%' -- 告警表
OR TABLE_NAME LIKE '%measurement%') -- 测量表
AND TABLE_TYPE = 'BASE TABLE'
AND TABLE_NAME LIKE '%_data'; -- 只要数据表
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
OPEN iot_table_cursor;
iot_table_loop: LOOP
FETCH iot_table_cursor INTO table_name, table_type;
IF done THEN
LEAVE iot_table_loop;
END IF;
-- 根据数据类型设置不同的分区策略
CASE table_type
WHEN 'SENSOR_DATA' THEN
CALL create_future_partitions(table_name, 7); -- 提前建好7天的分区
CALL archive_old_partitions(table_name, 30); -- 传感器数据保留30天
WHEN 'DEVICE_STATUS' THEN
CALL create_future_partitions(table_name, 7); -- 提前建好7天的分区
CALL archive_old_partitions(table_name, 90); -- 设备状态保留90天
WHEN 'ALERT_DATA' THEN
CALL create_future_partitions(table_name, 7); -- 提前建好7天的分区
CALL archive_old_partitions(table_name, 365); -- 告警数据保留1年
ELSE
CALL create_future_partitions(table_name, 7); -- 提前建好7天的分区
CALL archive_old_partitions(table_name, 60); -- 其他数据保留60天
END CASE;
END LOOP;
CLOSE iot_table_cursor;
END//
DELIMITER ;
2.3 分区创建和维护
上述存储过程依赖两个核心子程序:创建未来分区和归档历史数据。
具体实现如下:
-- 2. 创建未来分区
DELIMITER //
CREATE PROCEDURE create_future_partitions(IN target_table VARCHAR(64), IN days_ahead INT)
BEGIN
DECLARE counter INT DEFAULT 0;
DECLARE partition_date DATE; -- 分区日期
DECLARE partition_name VARCHAR(64); -- 分区名称
DECLARE partition_exists INT DEFAULT 0; -- 分区是否存在
WHILE counter < days_ahead DO
SET partition_date = DATE_ADD(CURDATE(), INTERVAL counter DAY); -- 计算分区日期
SET partition_name = CONCAT('p_', DATE_FORMAT(partition_date, '%Y%m%d')); -- 生成分区名
-- 检查这个分区是否已经存在了
SELECT COUNT(*) INTO partition_exists
FROM information_schema.PARTITIONS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = target_table
AND PARTITION_NAME = partition_name;
-- 不存在就创建
IF partition_exists = 0 THEN
SET @create_partition_sql = CONCAT(
'ALTER TABLE ', target_table,
' ADD PARTITION (PARTITION ', partition_name,
' VALUES LESS THAN (TO_DAYS("',
DATE_ADD(partition_date, INTERVAL 1 DAY), '")))'); -- 动态生成SQL
PREPARE create_stmt FROM @create_partition_sql; -- 准备SQL
EXECUTE create_stmt; -- 执行创建分区
DEALLOCATE PREPARE create_stmt; -- 释放资源
-- 记录分区创建日志
INSERT INTO iot_partition_management_log
(table_name, partition_name, action, table_type, created_at)
VALUES
(target_table, partition_name, 'CREATE',
CASE
WHEN target_table LIKE '%sensor%' THEN 'SENSOR_DATA' -- 传感器数据
WHEN target_table LIKE '%device%' THEN 'DEVICE_STATUS' -- 设备状态
WHEN target_table LIKE '%alert%' THEN 'ALERT_DATA' -- 告警数据
ELSE 'OTHER_IOT_DATA' -- 其他数据
END,
NOW());
END IF;
SET counter = counter + 1;
END WHILE;
END//
DELIMITER ;
2.4 历史数据归档处理
创建分区只是第一步,还需要定期清理历史数据。
物联网数据增长速度极快,不及时清理会导致存储空间不足:
-- 3. 历史数据归档处理
DELIMITER //
CREATE PROCEDURE archive_old_partitions(IN target_table VARCHAR(64), IN retention_days INT)
BEGIN
DECLARE old_partition_name VARCHAR(64); -- 要归档的分区名
DECLARE old_date DATE; -- 归档日期
DECLARE old_partition_exists INT DEFAULT 0; -- 分区是否存在
DECLARE archive_table VARCHAR(64); -- 归档表名
DECLARE archive_exists INT DEFAULT 0; -- 归档表是否存在
SET old_date = DATE_SUB(CURDATE(), INTERVAL retention_days DAY); -- 计算要归档的日期
SET old_partition_name = CONCAT('p_', DATE_FORMAT(old_date, '%Y%m%d')); -- 生成分区名
SET archive_table = CONCAT(target_table, '_archive'); -- 归档表名
-- 检查要归档的分区是否存在
SELECT COUNT(*) INTO old_partition_exists
FROM information_schema.PARTITIONS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = target_table
AND PARTITION_NAME = old_partition_name;
IF old_partition_exists > 0 THEN
-- 检查归档表是否存在
SELECT COUNT(*) INTO archive_exists
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = archive_table;
-- 如果有归档表,先把数据备份过去
IF archive_exists > 0 THEN
SET @backup_sql = CONCAT(
'INSERT INTO ', archive_table,
' SELECT * FROM ', target_table,
' PARTITION (', old_partition_name, ')'
); -- 动态生成备份SQL
PREPARE backup_stmt FROM @backup_sql; -- 准备SQL
EXECUTE backup_stmt; -- 执行备份
DEALLOCATE PREPARE backup_stmt; -- 释放资源
-- 记录归档日志
INSERT INTO iot_partition_management_log
(table_name, partition_name, action, table_type, created_at)
VALUES
(target_table, old_partition_name, 'ARCHIVE',
CASE
WHEN target_table LIKE '%sensor%' THEN 'SENSOR_DATA' -- 传感器数据
WHEN target_table LIKE '%device%' THEN 'DEVICE_STATUS' -- 设备状态
WHEN target_table LIKE '%alert%' THEN 'ALERT_DATA' -- 告警数据
ELSE 'OTHER_IOT_DATA' -- 其他类型的数据
END,
NOW());
END IF;
-- 删掉老的分区
SET @drop_sql = CONCAT(
'ALTER TABLE ', target_table,
' DROP PARTITION ', old_partition_name
);
PREPARE drop_stmt FROM @drop_sql;
EXECUTE drop_stmt;
DEALLOCATE PREPARE drop_stmt;
-- 记录一下删除了哪个分区
INSERT INTO iot_partition_management_log
(table_name, partition_name, action, table_type, created_at)
VALUES
(target_table, old_partition_name, 'DROP',
CASE
WHEN target_table LIKE '%sensor%' THEN 'SENSOR_DATA' -- 传感器表
WHEN target_table LIKE '%device%' THEN 'DEVICE_STATUS' -- 设备状态表
WHEN target_table LIKE '%alert%' THEN 'ALERT_DATA' -- 告警表
ELSE 'OTHER_IOT_DATA' -- 其他表
END,
NOW());
END IF;
END//
DELIMITER ;
2.5 自动化调度和监控
分区管理不能依赖人工操作,需要通过定时任务自动执行。
选择每天凌晨2点执行,此时用户活跃度最低,对业务影响最小:
-- 创建定时任务,每天凌晨2点自动管理分区
CREATE EVENT iot_auto_partition_management
ON SCHEDULE EVERY 1 DAY
STARTS TIMESTAMP(CURDATE() + INTERVAL 1 DAY, '02:00:00')
DO
CALL manage_iot_time_series_partitions(); -- 调用分区管理存储过程
-- 分区管理日志表,记录所有分区操作
CREATE TABLE iot_partition_management_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(64) NOT NULL, -- 表名
partition_name VARCHAR(64) NOT NULL, -- 分区名
action ENUM('CREATE', 'DROP', 'ARCHIVE') NOT NULL, -- 操作类型
table_type ENUM('SENSOR_DATA', 'DEVICE_STATUS', 'ALERT_DATA', 'TELEMETRY_DATA', 'OTHER_IOT_DATA') NOT NULL, -- 表类型
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- 操作时间
-- 几个常用的索引,方便查询日志
INDEX idx_table_type_action_time (table_type, action, created_at), -- 按表类型和操作查询
INDEX idx_table_action_time (table_name, action, created_at), -- 按表名和操作查询
INDEX idx_created_at (created_at) -- 按时间查询
);
-- 分区管理统计表,每天的操作汇总
CREATE TABLE iot_partition_management_stats (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
management_date DATE NOT NULL UNIQUE, -- 管理日期
sensor_tables_processed INT DEFAULT 0, -- 处理的传感器表数量
device_tables_processed INT DEFAULT 0, -- 处理的设备表数量
alert_tables_processed INT DEFAULT 0, -- 处理的告警表数量
total_partitions_created INT DEFAULT 0, -- 创建的分区总数
total_partitions_dropped INT DEFAULT 0, -- 删除的分区总数
total_partitions_archived INT DEFAULT 0, -- 归档的分区总数
total_data_archived_gb DECIMAL(10,2) DEFAULT 0, -- 归档的数据量(GB)
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- 记录创建时间
INDEX idx_management_date (management_date) -- 按日期查询
);
3 运维实践和监控体系
3.1 物联网运维监控的特殊性
物联网平台的运维监控与传统应用存在显著差异,面临更大挑战。
设备数量多、分布广、类型杂,数据量大、实时性要求高,故障影响面广。这些特点要求重新设计运维监控体系。
物联网运维有几个明显特点:
设备规模大:一个平台管理几万甚至几十万台设备很正常,设备类型包括传感器、网关、控制器等,分布在不同地方,网络环境也复杂多变。
数据量大:每秒可能产生几百万条监控数据,包括设备状态、性能指标、业务数据等,既要实时处理,又要做历史分析。
故障影响大:一个设备坏了可能影响整个业务流程,网络断了可能导致一大批设备离线,数据丢了可能影响业务决策。
我们的运维理念要从被动响应转向主动预防:
- 预测性维护:通过历史数据和机器学习算法提前发现设备故障风险
- 自动化响应:建立智能告警和自动处理机制,减少人工干预
- 全链路监控:从设备到应用,从数据采集到业务处理,端到端覆盖
- 业务关联分析:把技术指标和业务指标关联起来,从技术运维转向业务运维
3.2 性能基线建立
农业物联网系统的性能具有明显的季节特征。
春耕时设备启动频繁,夏天灌溉系统负载高,秋收时数据采集量暴增,冬天相对平稳。固定的性能基线无法适应这种变化,需要建立动态的性能基线管理系统。
@Component
@Slf4j
public class AgriculturalAlertEngine {
private final AgriculturalDeviceService deviceService;
private final WeatherDataService weatherService;
private final CropGrowthService cropService;
private final SoilConditionService soilService;
private final AlertNotificationService notificationService;
/**
* 农业物联网告警引擎
* 要考虑作物生长阶段、天气、土壤、设备状态等多个因素
*/
public void evaluateAgriculturalAlerts(AgricultureSensorData sensorData, AlertContext context) {
String deviceId = sensorData.getDeviceId();
log.info("开始检查农业设备告警,设备ID: {}, 传感器类型: {}", deviceId, sensorData.getSensorType());
try {
// 获取设备基本信息(作物类型、地块位置、设备规格等)
AgriculturalDeviceProfile deviceProfile = deviceService.getDeviceProfile(deviceId);
if (deviceProfile == null) {
log.warn("设备档案不存在,设备ID: {}", deviceId);
return;
}
// 确定作物当前生长阶段
CropGrowthStage currentStage = cropService.getCurrentGrowthStage(
deviceProfile.getCropType(), deviceProfile.getPlantingDate());
// 获取当前天气和土壤情况
WeatherCondition currentWeather = weatherService.getCurrentWeather(deviceProfile.getLocation());
SoilCondition soilCondition = soilService.getCurrentSoilCondition(deviceProfile.getFieldId());
// 根据实际情况动态生成告警规则
AgriculturalAlertRule dynamicRule = buildAgriculturalAlertRule(
deviceProfile, currentStage, currentWeather, soilCondition);
// 多维度告警检查
List<AgriculturalAlertEvent> alerts = new ArrayList<>();
alerts.add(checkCropGrowthAlert(sensorData, dynamicRule, currentStage)); // 作物生长告警
alerts.add(checkWeatherRiskAlert(sensorData, currentWeather, deviceProfile.getCropType())); // 天气风险告警
alerts.add(checkSoilConditionAlert(sensorData, soilCondition, currentStage)); // 土壤状态告警
alerts.add(checkIrrigationAlert(sensorData, deviceProfile, currentWeather)); // 灌溉告警
alerts.add(checkPestDiseaseRiskAlert(sensorData, currentWeather, currentStage)); // 病虫害风险告警
// 过滤掉无效告警,去掉重复的
List<AgriculturalAlertEvent> validAlerts = alerts.stream()
.filter(Objects::nonNull)
.collect(Collectors.toList());
// 发送告警通知
for (AgriculturalAlertEvent alert : validAlerts) {
sendAgriculturalAlert(alert, deviceProfile, currentStage);
}
log.info("农业设备告警检查完成,设备: {}, 触发告警数: {}", deviceId, validAlerts.size());
} catch (Exception e) {
log.error("农业告警检查失败,设备: {}", deviceId, e);
}
}
/**
* 构建动态告警规则
* 根据作物类型、生长阶段、天气、土壤状态动态调整告警阈值
*/
private AgriculturalAlertRule buildAgriculturalAlertRule(AgriculturalDeviceProfile deviceProfile,
CropGrowthStage growthStage,
WeatherCondition weather,
SoilCondition soil) {
String cropType = deviceProfile.getCropType();
String sensorType = deviceProfile.getSensorType();
// 获取作物在当前生长阶段的标准参数范围
CropParameterRange baseRange = cropService.getCropParameterRange(cropType, growthStage.getStage());
// 根据天气调整阈值(高温天气下土壤湿度要求更高)
double weatherAdjustment = calculateWeatherAdjustment(sensorType, weather, cropType);
// 根据土壤类型调整(沙土排水快,需要更频繁灌溉)
double soilAdjustment = calculateSoilTypeAdjustment(sensorType, soil.getSoilType(), cropType);
// 根据生长阶段调整(开花期对湿度更敏感)
double growthStageAdjustment = calculateGrowthStageAdjustment(sensorType, growthStage, cropType);
// 计算最终动态阈值
double adjustedMinValue = baseRange.getMinValue() * (1 + weatherAdjustment + soilAdjustment + growthStageAdjustment);
double adjustedMaxValue = baseRange.getMaxValue() * (1 + weatherAdjustment + soilAdjustment + growthStageAdjustment);
log.debug("生成告警规则 - 设备: {}, 作物: {}, 生长阶段: {}, 调整后范围: [{}, {}]",
deviceProfile.getDeviceId(), cropType, growthStage.getStage(), adjustedMinValue, adjustedMaxValue);
return AgriculturalAlertRule.builder()
.deviceId(deviceProfile.getDeviceId())
.cropType(cropType)
.growthStage(growthStage.getStage())
.sensorType(sensorType)
.minValue(adjustedMinValue)
.maxValue(adjustedMaxValue)
.weatherAdjustment(weatherAdjustment)
.soilAdjustment(soilAdjustment)
.growthStageAdjustment(growthStageAdjustment)
.build();
}
/**
* 检查作物生长告警
* 评估当前环境条件是否适合作物生长
*/
private AgriculturalAlertEvent checkCropGrowthAlert(AgricultureSensorData sensorData,
AgriculturalAlertRule rule,
CropGrowthStage growthStage) {
double currentValue = sensorData.getValue();
String sensorType = sensorData.getSensorType();
if (currentValue < rule.getMinValue() || currentValue > rule.getMaxValue()) {
// 根据偏离程度和生长阶段的重要性确定告警级别
AlertSeverity severity = calculateAgriculturalAlertSeverity(currentValue, rule, growthStage);
// 生成告警消息
String alertMessage = generateAgriculturalAlertMessage(sensorType, currentValue, rule, growthStage);
return AgriculturalAlertEvent.builder()
.deviceId(sensorData.getDeviceId())
.alertType(AgriculturalAlertType.CROP_GROWTH_RISK) // 作物生长风险
.severity(severity)
.cropType(rule.getCropType())
.growthStage(growthStage.getStage())
.sensorType(sensorType)
.currentValue(currentValue)
.optimalRange(rule.getMinValue(), rule.getMaxValue()) // 最佳范围
.message(alertMessage)
.timestamp(sensorData.getTimestamp())
.actionRecommendation(generateActionRecommendation(sensorType, currentValue, rule, growthStage)) // 建议采取的行动
.metadata(Map.of(
"weather_adjustment", rule.getWeatherAdjustment(), // 天气调整系数
"soil_adjustment", rule.getSoilAdjustment(), // 土壤调整系数
"growth_stage_adjustment", rule.getGrowthStageAdjustment(), // 生长阶段调整系数
"growth_stage_criticality", growthStage.getCriticality() // 生长阶段重要性
))
.build();
}
return null; // 没有告警
}
/**
* 检查天气风险告警
* 评估天气条件是否会对作物造成风险
*/
private AgriculturalAlertEvent checkWeatherRiskAlert(AgricultureSensorData sensorData,
WeatherCondition weather,
String cropType) {
// 检查是否有极端天气
if (isExtremeWeatherCondition(weather, cropType)) {
AlertSeverity severity = calculateWeatherRiskSeverity(weather, cropType);
return AgriculturalAlertEvent.builder()
.deviceId(sensorData.getDeviceId())
.alertType(AgriculturalAlertType.WEATHER_RISK) // 天气风险告警
.severity(severity)
.cropType(cropType)
.sensorType(sensorData.getSensorType())
.message(generateWeatherRiskMessage(weather, cropType))
.timestamp(sensorData.getTimestamp())
.actionRecommendation(generateWeatherActionRecommendation(weather, cropType)) // 应对建议
.metadata(Map.of(
"temperature", weather.getTemperature(), // 温度
"humidity", weather.getHumidity(), // 湿度
"wind_speed", weather.getWindSpeed(), // 风速
"precipitation", weather.getPrecipitation(), // 降水量
"weather_type", weather.getWeatherType() // 天气类型
))
.build();
}
return null; // 天气正常,无需告警
}
/**
* 发送告警通知
* 根据作物类型、生长阶段、告警级别选择合适的通知方式
*/
private void sendAgriculturalAlert(AgriculturalAlertEvent alert,
AgriculturalDeviceProfile deviceProfile,
CropGrowthStage growthStage) {
try {
// 根据作物重要性、生长阶段和告警级别确定通知策略
AgriculturalNotificationStrategy strategy = determineAgriculturalNotificationStrategy(
alert, deviceProfile, growthStage);
// 构建告警消息
AgriculturalAlertMessage message = AgriculturalAlertMessage.builder()
.deviceId(alert.getDeviceId())
.deviceName(deviceProfile.getDeviceName()) // 设备名称
.fieldLocation(deviceProfile.getFieldLocation()) // 地块位置
.cropType(alert.getCropType()) // 作物类型
.growthStage(alert.getGrowthStage()) // 生长阶段
.sensorType(alert.getSensorType()) // 传感器类型
.alertType(alert.getAlertType()) // 告警类型
.severity(alert.getSeverity()) // 告警级别
.message(alert.getMessage()) // 告警消息
.actionRecommendation(alert.getActionRecommendation()) // 处理建议
.timestamp(alert.getTimestamp()) // 时间戳
.urgencyLevel(strategy.getUrgencyLevel()) // 紧急程度
.expertRequired(strategy.isExpertRequired()) // 是否需要专家
.build();
// 发送通知(农技专家、种植户、管理员等)
notificationService.sendAgriculturalNotification(message, strategy);
log.info("告警通知已发送 - 设备: {}, 作物: {}, 生长阶段: {}, 告警类型: {}, 级别: {}, 通知方式: {}",
alert.getDeviceId(), alert.getCropType(), alert.getGrowthStage(),
alert.getAlertType(), alert.getSeverity(), strategy.getNotificationChannels());
} catch (Exception e) {
log.error("发送告警通知失败,设备: {}, 作物: {}, 告警: {}",
alert.getDeviceId(), alert.getCropType(), alert.getAlertType(), e);
}
}
}
- 点赞
- 收藏
- 关注作者
评论(0)