事业实时数据分析平台构建

举报
yd_291727934 发表于 2026/04/24 14:25:31 2026/04/24
【摘要】 摘要实时数据分析是公共事业企业智能化运营的关键能力。本文详细介绍如何基于TDengine时序database构建水、气、热等公共事业实时数据分析平台,包括流计算、实时告警、可视化等核心功能的实现。正文一、公共事业实时数据分析的需求水、气、热等公共事业企业面临着日益复杂的运营环境,对实时数据分析有着强烈的需求:管网运行监控。 需要实时监控管网压力、流量、温度等参数,及时发现异常,保障供应安全。...

摘要

实时数据分析是公共事业企业智能化运营的关键能力。本文详细介绍如何基于TDengine时序database构建水、气、热等公共事业实时数据分析平台,包括流计算、实时告警、可视化等核心功能的实现。




正文

一、公共事业实时数据分析的需求

水、气、热等公共事业企业面临着日益复杂的运营环境,对实时数据分析有着强烈的需求:

管网运行监控。 需要实时监控管网压力、流量、温度等参数,及时发现异常,保障供应安全。供水管网压力过低可能导致高层用户无水可用,压力过高则可能引发爆管事故。

用户行为分析。 需要实时分析用户用水用气用热模式,识别异常行为,发现偷水偷气、设备故障等问题。某燃气公司通过实时分析,发现了用户私接管道偷气的行为,避免了重大安全事故。

供需平衡优化。 需要根据实时需求调整供应,实现供需平衡。供热企业需要根据室外温度实时调整供热输出,既保障用户舒适度,又避免能源浪费。

应急响应支撑。 需要在突发事件发生时,快速分析影响范围,支撑应急决策。爆管事故发生后,需要快速确定受影响用户,制定停水方案和抢修计划。

传统批处理数据分析模式无法满足这些实时性要求。时序数据库TDengine通过内置的流计算、数据订阅、连续查询等功能,为公共事业企业构建实时数据分析平台提供了强大支撑。

二、平台架构设计

基于TDengine的公共事业实时数据分析平台采用Lambda架构,兼顾实时处理和离线分析:

2.1 数据摄入层

数据摄入层负责从各种数据源获取实时数据:

-- 创建智能表计原始数据超级表

CREATE STABLE IF NOT EXISTS meter_raw_data (

    ts TIMESTAMP,

    reading_value FLOAT,

    reading_type BINARY(16),

    quality_code TINYINT

) TAGS (

    meter_id BINARY(32),

    meter_type BINARY(16),

    district_id BINARY(32),

    user_type BINARY(16)

);

 

-- 创建聚合数据超级表

CREATE STABLE IF NOT EXISTS meter_aggregated (

    ts TIMESTAMP,

    avg_value FLOAT,

    max_value FLOAT,

    min_value FLOAT,

    sum_value DOUBLE,

    count_value INT,

    std_dev FLOAT

) TAGS (

    meter_id BINARY(32),

    meter_type BINARY(16),

    aggregation_period BINARY(16)

);

2.2 流计算处理

-- 创建连续查询进行实时聚合

CREATE STREAM meter_hourly_aggregation

INTO meter_aggregated

AS

SELECT

    _irowts as ts,

    AVG(reading_value) as avg_value,

    MAX(reading_value) as max_value,

    MIN(reading_value) as min_value,

    SUM(reading_value) as sum_value,

    COUNT(*) as count_value,

    STDDEV(reading_value) as std_dev

FROM meter_raw_data

WHERE reading_type = 'FLOW'

INTERVAL(1h)

FILL(PREV);

三、实时告警系统

3.1 告警规则定义

-- 创建告警事件超级表

CREATE STABLE IF NOT EXISTS alarm_events (

    ts TIMESTAMP,

    alarm_type BINARY(64),

    severity TINYINT,

    meter_id BINARY(32),

    current_value FLOAT,

    threshold_value FLOAT,

    alarm_desc BINARY(256),

    status TINYINT

) TAGS (

    district_id BINARY(32),

    alarm_category BINARY(32),

    user_type BINARY(16)

);

 

-- 创建告警规则表

CREATE TABLE IF NOT EXISTS alarm_rules (

    rule_id INT PRIMARY KEY,

    meter_type BINARY(16),

    alarm_type BINARY(64),

    high_limit FLOAT,

    low_limit FLOAT,

    rate_limit FLOAT,

    duration_threshold INT,

    severity TINYINT

);

3.2 实时告警检测

import taos

import time

from datetime import datetime

 

def check_alarm_conditions():

    """实时检测告警条件"""

    conn = taos.connect(host="localhost", database="utility")

    cursor = conn.cursor()

    

    # 查询需要检测告警的表计

    cursor.execute("""

        SELECT r.meter_id, r.high_limit, r.low_limit, r.severity,

               m.reading_value, m.ts

        FROM alarm_rules r

        JOIN (

            SELECT meter_id, reading_value, ts,

                   ROW_NUMBER() OVER (PARTITION BY meter_id ORDER BY ts DESC) as rn

            FROM meter_raw_data

            WHERE ts > NOW - 5m

        ) m ON r.meter_id = m.meter_id

        WHERE m.rn = 1

    """)

    

    alarms = []

    for row in cursor.fetchall():

        meter_id, high_limit, low_limit, severity, value, ts = row

        

        # 检测高限告警

        if high_limit is not None and value > high_limit:

            alarms.append({

                'ts': ts,

                'alarm_type': 'HIGH_LIMIT',

                'severity': severity,

                'meter_id': meter_id,

                'current_value': value,

                'threshold_value': high_limit,

                'alarm_desc': f'{meter_id} 数值 {value} 超过高限 {high_limit}'

            })

        

        # 检测低限告警

        if low_limit is not None and value < low_limit:

            alarms.append({

                'ts': ts,

                'alarm_type': 'LOW_LIMIT',

                'severity': severity,

                'meter_id': meter_id,

                'current_value': value,

                'threshold_value': low_limit,

                'alarm_desc': f'{meter_id} 数值 {value} 低于低限 {low_limit}'

            })

    

    # 写入告警事件

    for alarm in alarms:

        cursor.execute(f"""

            INSERT INTO alarm_events

            (ts, alarm_type, severity, meter_id, current_value, threshold_value, alarm_desc, status)

            VALUES ('{alarm['ts']}', '{alarm['alarm_type']}', {alarm['severity']},

                    '{alarm['meter_id']}', {alarm['current_value']}, {alarm['threshold_value']},

                    '{alarm['alarm_desc']}', 0)

        """)

    

    conn.commit()

    print(f"检测到 {len(alarms)} 条告警")

    

    cursor.close()

    conn.close()

 

# 定时执行告警检测

while True:

    check_alarm_conditions()

    time.sleep(60)  # 每分钟检测一次

四、实时数据可视化

4.1 实时趋势查询

-- 查询实时流量趋势

SELECT

    _irowts as ts,

    avg_value,

    max_value,

    min_value

FROM meter_aggregated

WHERE meter_id = 'WM001'

    AND ts > NOW - 24h

INTERVAL(1h)

FILL(LINEAR);

 

-- 查询多表计对比

SELECT

    _irowts as ts,

    AVG(CASE WHEN meter_id = 'WM001' THEN avg_value END) as flow_001,

    AVG(CASE WHEN meter_id = 'WM002' THEN avg_value END) as flow_002,

    AVG(CASE WHEN meter_id = 'WM003' THEN avg_value END) as flow_003

FROM meter_aggregated

WHERE meter_id IN ('WM001', 'WM002', 'WM003')

    AND ts > NOW - 7d

INTERVAL(1h)

GROUP BY _irowts;

4.2 实时仪表盘API

from flask import Flask, jsonify

import taos

 

app = Flask(__name__)

 

@app.route('/api/realtime/summary')

def get_realtime_summary():

    """获取实时汇总数据"""

    conn = taos.connect(host="localhost", database="utility")

    cursor = conn.cursor()

    

    # 查询关键指标

    cursor.execute("""

        SELECT

            COUNT(DISTINCT meter_id) as total_meters,

            COUNT(*) as total_readings_last_hour,

            AVG(CASE WHEN quality_code = 0 THEN 1 ELSE 0 END) * 100 as data_quality_rate

        FROM meter_raw_data

        WHERE ts > NOW - 1h

    """)

    

    row = cursor.fetchone()

    metrics = {

        'total_meters': row[0],

        'total_readings_last_hour': row[1],

        'data_quality_rate': round(row[2], 2) if row[2] else 0

    }

    

    cursor.close()

    conn.close()

    

    return jsonify(metrics)

 

@app.route('/api/realtime/alarm-summary')

def get_alarm_summary():

    """获取实时告警汇总"""

    conn = taos.connect(host="localhost", database="utility")

    cursor = conn.cursor()

    

    cursor.execute("""

        SELECT

            severity,

            COUNT(*) as count

        FROM alarm_events

        WHERE status = 0

            AND ts > NOW - 24h

        GROUP BY severity

        ORDER BY severity DESC

    """)

    

    alarms = []

    for row in cursor.fetchall():

        alarms.append({

            'severity': row[0],

            'count': row[1]

        })

    

    cursor.close()

    conn.close()

    

    return jsonify(alarms)

 

if __name__ == '__main__':

    app.run(host='0.0.0.0', port=5000)

五、实施效果

某水务集团构建实时数据分析平台后:

响应速度提升。 数据从产生到分析结果呈现延迟控制在秒级,工艺调整响应时间从小时级降至分钟级。

告警准确率提高。 基于实时流计算的告警检测,误报率降低70%,告警响应时间从分钟级降至秒级。

运营效率提升。 通过实时供需平衡优化,能耗降低10%,用户投诉减少50%。

决策支撑增强。 实时数据看板为管理层提供决策支撑,决策效率提升5倍。

TDengine时序database为公共事业实时数据分析提供了高性能、低延迟、易扩展的技术平台,助力企业实现智能化运营。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。