MindSpore + TDengine:国产 AI 框架的时序数据分析实践
一、国产 AI 生态
在信创背景下,国产 AI 框架 MindSpore 正成为企业首选。结合 TDengine 时序数据库,可以构建完整的国产数据分析栈。
二、数据准备
2.1 从 TDengine 加载数据
import taos
import mindspore as ms
import numpy as np
class TDengineDataLoader:
def __init__(self, query):
self.conn = taos.connect(host="localhost", database="industry")
self.cursor = self.conn.cursor()
self.cursor.execute(query)
self.data = self.cursor.fetchall()
def to_mindspore_dataset(self):
features = []
labels = []
for row in self.data:
features.append(row[1:-1])
labels.append(row[-1])
return ms.dataset.NumpySlicesDataset(
{"features": np.array(features), "labels": np.array(labels)},
shuffle=True
)
2.2 数据预处理
from mindspore import dataset as ds
def preprocess_data(dataset):
# 归一化
dataset = dataset.map(operations=ds.transforms.Normalize(
mean=[0.5, 0.5, 0.5],
std=[0.5, 0.5, 0.5]
), input_columns=["features"])
# 批量处理
dataset = dataset.batch(batch_size=32)
return dataset
三、模型训练
3.1 定义模型
import mindspore.nn as nn
class AnomalyDetectionModel(nn.Cell):
def __init__(self):
super().__init__()
self.lstm = nn.LSTM(input_size=3, hidden_size=64, num_layers=2)
self.fc = nn.Dense(64, 1)
self.sigmoid = nn.Sigmoid()
def construct(self, x):
output, _ = self.lstm(x)
output = self.fc(output[:, -1, :])
return self.sigmoid(output)
3.2 训练模型
from mindspore import Model, nn
# 加载数据
loader = TDengineDataLoader("""
SELECT ts, temperature, pressure, vibration, anomaly
FROM device_data
WHERE ts > NOW() - 30d
""")
dataset = loader.to_mindspore_dataset()
dataset = preprocess_data(dataset)
# 定义模型
model = AnomalyDetectionModel()
loss_fn = nn.BCELoss()
optimizer = nn.Adam(model.trainable_params(), learning_rate=0.001)
# 训练
ms_model = Model(model, loss_fn, optimizer, metrics={"accuracy"})
ms_model.train(epoch=10, train_dataset=dataset)
四、实时推理
4.1 数据订阅
CREATE TOPIC realtime_anomaly AS
SELECT ts, temperature, pressure, vibration
FROM device_data
WHERE ts > NOW() - 1m;
4.2 实时预测
import taos
class RealtimePredictor:
def __init__(self, model):
self.model = model
self.conn = taos.connect(host="localhost", database="industry")
self.consumer = self.conn.subscribe("realtime_anomaly")
def predict(self):
rows = self.consumer.poll(1000)
for row in rows:
features = ms.Tensor([row[1:]], ms.float32)
prediction = self.model(features)
if prediction > 0.8:
self.send_alert(row[0], prediction)
五、总结
MindSpore 与 TDengine 的结合,为工业企业提供了完整的国产 AI 数据分析解决方案。
关键词:时序数据库、TDengine、MindSpore、国产 AI、异常检测
- 点赞
- 收藏
- 关注作者
评论(0)