事业实时数据分析平台构建
摘要
实时数据分析是公共事业企业智能化运营的关键能力。本文详细介绍如何基于TDengine时序database构建水、气、热等公共事业实时数据分析平台,包括流计算、实时告警、可视化等核心功能的实现。
正文
一、公共事业实时数据分析的需求
水、气、热等公共事业企业面临着日益复杂的运营环境,对实时数据分析有着强烈的需求:
管网运行监控。 需要实时监控管网压力、流量、温度等参数,及时发现异常,保障供应安全。供水管网压力过低可能导致高层用户无水可用,压力过高则可能引发爆管事故。
用户行为分析。 需要实时分析用户用水用气用热模式,识别异常行为,发现偷水偷气、设备故障等问题。某燃气公司通过实时分析,发现了用户私接管道偷气的行为,避免了重大安全事故。
供需平衡优化。 需要根据实时需求调整供应,实现供需平衡。供热企业需要根据室外温度实时调整供热输出,既保障用户舒适度,又避免能源浪费。
应急响应支撑。 需要在突发事件发生时,快速分析影响范围,支撑应急决策。爆管事故发生后,需要快速确定受影响用户,制定停水方案和抢修计划。
传统批处理数据分析模式无法满足这些实时性要求。时序数据库TDengine通过内置的流计算、数据订阅、连续查询等功能,为公共事业企业构建实时数据分析平台提供了强大支撑。
二、平台架构设计
基于TDengine的公共事业实时数据分析平台采用Lambda架构,兼顾实时处理和离线分析:
2.1 数据摄入层
数据摄入层负责从各种数据源获取实时数据:
-- 创建智能表计原始数据超级表
CREATE STABLE IF NOT EXISTS meter_raw_data (
ts TIMESTAMP,
reading_value FLOAT,
reading_type BINARY(16),
quality_code TINYINT
) TAGS (
meter_id BINARY(32),
meter_type BINARY(16),
district_id BINARY(32),
user_type BINARY(16)
);
-- 创建聚合数据超级表
CREATE STABLE IF NOT EXISTS meter_aggregated (
ts TIMESTAMP,
avg_value FLOAT,
max_value FLOAT,
min_value FLOAT,
sum_value DOUBLE,
count_value INT,
std_dev FLOAT
) TAGS (
meter_id BINARY(32),
meter_type BINARY(16),
aggregation_period BINARY(16)
);
2.2 流计算处理
-- 创建连续查询进行实时聚合
CREATE STREAM meter_hourly_aggregation
INTO meter_aggregated
AS
SELECT
_irowts as ts,
AVG(reading_value) as avg_value,
MAX(reading_value) as max_value,
MIN(reading_value) as min_value,
SUM(reading_value) as sum_value,
COUNT(*) as count_value,
STDDEV(reading_value) as std_dev
FROM meter_raw_data
WHERE reading_type = 'FLOW'
INTERVAL(1h)
FILL(PREV);
三、实时告警系统
3.1 告警规则定义
-- 创建告警事件超级表
CREATE STABLE IF NOT EXISTS alarm_events (
ts TIMESTAMP,
alarm_type BINARY(64),
severity TINYINT,
meter_id BINARY(32),
current_value FLOAT,
threshold_value FLOAT,
alarm_desc BINARY(256),
status TINYINT
) TAGS (
district_id BINARY(32),
alarm_category BINARY(32),
user_type BINARY(16)
);
-- 创建告警规则表
CREATE TABLE IF NOT EXISTS alarm_rules (
rule_id INT PRIMARY KEY,
meter_type BINARY(16),
alarm_type BINARY(64),
high_limit FLOAT,
low_limit FLOAT,
rate_limit FLOAT,
duration_threshold INT,
severity TINYINT
);
3.2 实时告警检测
import taos
import time
from datetime import datetime
def check_alarm_conditions():
"""实时检测告警条件"""
conn = taos.connect(host="localhost", database="utility")
cursor = conn.cursor()
# 查询需要检测告警的表计
cursor.execute("""
SELECT r.meter_id, r.high_limit, r.low_limit, r.severity,
m.reading_value, m.ts
FROM alarm_rules r
JOIN (
SELECT meter_id, reading_value, ts,
ROW_NUMBER() OVER (PARTITION BY meter_id ORDER BY ts DESC) as rn
FROM meter_raw_data
WHERE ts > NOW - 5m
) m ON r.meter_id = m.meter_id
WHERE m.rn = 1
""")
alarms = []
for row in cursor.fetchall():
meter_id, high_limit, low_limit, severity, value, ts = row
# 检测高限告警
if high_limit is not None and value > high_limit:
alarms.append({
'ts': ts,
'alarm_type': 'HIGH_LIMIT',
'severity': severity,
'meter_id': meter_id,
'current_value': value,
'threshold_value': high_limit,
'alarm_desc': f'{meter_id} 数值 {value} 超过高限 {high_limit}'
})
# 检测低限告警
if low_limit is not None and value < low_limit:
alarms.append({
'ts': ts,
'alarm_type': 'LOW_LIMIT',
'severity': severity,
'meter_id': meter_id,
'current_value': value,
'threshold_value': low_limit,
'alarm_desc': f'{meter_id} 数值 {value} 低于低限 {low_limit}'
})
# 写入告警事件
for alarm in alarms:
cursor.execute(f"""
INSERT INTO alarm_events
(ts, alarm_type, severity, meter_id, current_value, threshold_value, alarm_desc, status)
VALUES ('{alarm['ts']}', '{alarm['alarm_type']}', {alarm['severity']},
'{alarm['meter_id']}', {alarm['current_value']}, {alarm['threshold_value']},
'{alarm['alarm_desc']}', 0)
""")
conn.commit()
print(f"检测到 {len(alarms)} 条告警")
cursor.close()
conn.close()
# 定时执行告警检测
while True:
check_alarm_conditions()
time.sleep(60) # 每分钟检测一次
四、实时数据可视化
4.1 实时趋势查询
-- 查询实时流量趋势
SELECT
_irowts as ts,
avg_value,
max_value,
min_value
FROM meter_aggregated
WHERE meter_id = 'WM001'
AND ts > NOW - 24h
INTERVAL(1h)
FILL(LINEAR);
-- 查询多表计对比
SELECT
_irowts as ts,
AVG(CASE WHEN meter_id = 'WM001' THEN avg_value END) as flow_001,
AVG(CASE WHEN meter_id = 'WM002' THEN avg_value END) as flow_002,
AVG(CASE WHEN meter_id = 'WM003' THEN avg_value END) as flow_003
FROM meter_aggregated
WHERE meter_id IN ('WM001', 'WM002', 'WM003')
AND ts > NOW - 7d
INTERVAL(1h)
GROUP BY _irowts;
4.2 实时仪表盘API
from flask import Flask, jsonify
import taos
app = Flask(__name__)
@app.route('/api/realtime/summary')
def get_realtime_summary():
"""获取实时汇总数据"""
conn = taos.connect(host="localhost", database="utility")
cursor = conn.cursor()
# 查询关键指标
cursor.execute("""
SELECT
COUNT(DISTINCT meter_id) as total_meters,
COUNT(*) as total_readings_last_hour,
AVG(CASE WHEN quality_code = 0 THEN 1 ELSE 0 END) * 100 as data_quality_rate
FROM meter_raw_data
WHERE ts > NOW - 1h
""")
row = cursor.fetchone()
metrics = {
'total_meters': row[0],
'total_readings_last_hour': row[1],
'data_quality_rate': round(row[2], 2) if row[2] else 0
}
cursor.close()
conn.close()
return jsonify(metrics)
@app.route('/api/realtime/alarm-summary')
def get_alarm_summary():
"""获取实时告警汇总"""
conn = taos.connect(host="localhost", database="utility")
cursor = conn.cursor()
cursor.execute("""
SELECT
severity,
COUNT(*) as count
FROM alarm_events
WHERE status = 0
AND ts > NOW - 24h
GROUP BY severity
ORDER BY severity DESC
""")
alarms = []
for row in cursor.fetchall():
alarms.append({
'severity': row[0],
'count': row[1]
})
cursor.close()
conn.close()
return jsonify(alarms)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
五、实施效果
某水务集团构建实时数据分析平台后:
响应速度提升。 数据从产生到分析结果呈现延迟控制在秒级,工艺调整响应时间从小时级降至分钟级。
告警准确率提高。 基于实时流计算的告警检测,误报率降低70%,告警响应时间从分钟级降至秒级。
运营效率提升。 通过实时供需平衡优化,能耗降低10%,用户投诉减少50%。
决策支撑增强。 实时数据看板为管理层提供决策支撑,决策效率提升5倍。
TDengine时序database为公共事业实时数据分析提供了高性能、低延迟、易扩展的技术平台,助力企业实现智能化运营。
- 点赞
- 收藏
- 关注作者
评论(0)