MindSpore + TDengine:国产 AI 框架的时序数据分析实践

举报
yd_227194873 发表于 2026/06/24 17:31:56 2026/06/24
【摘要】 本文介绍如何使用 MindSpore 国产 AI 框架与 TDengine 时序数据库结合,实现工业数据的智能分析和预测。

一、国产 AI 生态

在信创背景下,国产 AI 框架 MindSpore 正成为企业首选。结合 TDengine 时序数据库,可以构建完整的国产数据分析栈。

二、数据准备

2.1 从 TDengine 加载数据

import taos

import mindspore as ms

import numpy as np

 

class TDengineDataLoader:

    def __init__(self, query):

        self.conn = taos.connect(host="localhost", database="industry")

        self.cursor = self.conn.cursor()

        self.cursor.execute(query)

        self.data = self.cursor.fetchall()

    

    def to_mindspore_dataset(self):

        features = []

        labels = []

        

        for row in self.data:

            features.append(row[1:-1])

            labels.append(row[-1])

        

        return ms.dataset.NumpySlicesDataset(

            {"features": np.array(features), "labels": np.array(labels)},

            shuffle=True

        )

2.2 数据预处理

from mindspore import dataset as ds

 

def preprocess_data(dataset):

    # 归一化

    dataset = dataset.map(operations=ds.transforms.Normalize(

        mean=[0.5, 0.5, 0.5],

        std=[0.5, 0.5, 0.5]

    ), input_columns=["features"])

    

    # 批量处理

    dataset = dataset.batch(batch_size=32)

    

    return dataset

三、模型训练

3.1 定义模型

import mindspore.nn as nn

 

class AnomalyDetectionModel(nn.Cell):

    def __init__(self):

        super().__init__()

        self.lstm = nn.LSTM(input_size=3, hidden_size=64, num_layers=2)

        self.fc = nn.Dense(64, 1)

        self.sigmoid = nn.Sigmoid()

    

    def construct(self, x):

        output, _ = self.lstm(x)

        output = self.fc(output[:, -1, :])

        return self.sigmoid(output)

3.2 训练模型

from mindspore import Model, nn

 

# 加载数据

loader = TDengineDataLoader("""

    SELECT ts, temperature, pressure, vibration, anomaly

    FROM device_data

    WHERE ts > NOW() - 30d

""")

dataset = loader.to_mindspore_dataset()

dataset = preprocess_data(dataset)

 

# 定义模型

model = AnomalyDetectionModel()

loss_fn = nn.BCELoss()

optimizer = nn.Adam(model.trainable_params(), learning_rate=0.001)

 

# 训练

ms_model = Model(model, loss_fn, optimizer, metrics={"accuracy"})

ms_model.train(epoch=10, train_dataset=dataset)

四、实时推理

4.1 数据订阅

CREATE TOPIC realtime_anomaly AS

SELECT ts, temperature, pressure, vibration

FROM device_data

WHERE ts > NOW() - 1m;

4.2 实时预测

import taos

 

class RealtimePredictor:

    def __init__(self, model):

        self.model = model

        self.conn = taos.connect(host="localhost", database="industry")

        self.consumer = self.conn.subscribe("realtime_anomaly")

    

    def predict(self):

        rows = self.consumer.poll(1000)

        for row in rows:

            features = ms.Tensor([row[1:]], ms.float32)

            prediction = self.model(features)

            

            if prediction > 0.8:

                self.send_alert(row[0], prediction)

五、总结

MindSpore 与 TDengine 的结合,为工业企业提供了完整的国产 AI 数据分析解决方案。

关键词:时序数据库、TDengine、MindSpore、国产 AI、异常检测

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。