MindSpore 推荐系统实战
MindSpore 推荐系统实战
推荐系统是现代互联网应用的核心技术之一,从电商平台的商品推荐到视频平台的个性化内容推送,推荐算法无处不在。本文将深入讲解如何使用 MindSpore 构建一个完整的推荐系统,涵盖协同过滤、矩阵分解、深度学习推荐模型等核心技术。
一、推荐系统概述
1.1 推荐系统的价值
推荐系统的核心目标是解决信息过载问题,帮助用户从海量内容中发现感兴趣的物品。一个优秀的推荐系统能够:
- 提升用户体验:精准推荐减少用户搜索成本
- 增加平台收益:提高点击率、转化率和用户留存
- 挖掘长尾价值:让冷门内容也有机会被推荐
1.2 推荐算法分类
主流推荐算法可分为三类:
| 类型 | 代表算法 | 特点 |
|---|---|---|
| 协同过滤 | UserCF、ItemCF | 利用用户行为数据,推荐相似用户喜欢的物品 |
| 内容推荐 | 基于物品属性 | 利用物品特征,推荐相似内容 |
| 混合推荐 | 多种方法融合 | 结合多种方法优势,提升推荐效果 |
本文将重点实现协同过滤和基于深度学习的推荐模型。
二、协同过滤算法实现
2.1 算法原理
协同过滤(Collaborative Filtering)是推荐系统中最经典的算法。其核心思想是:如果两个用户对某些物品的评价相似,那么他们对其他物品的评价也倾向于相似。
User-CF(基于用户的协同过滤)步骤:
- 计算用户之间的相似度
- 找出与目标用户最相似的 K 个用户
- 根据相似用户的喜好,推荐目标用户未交互过的物品
2.2 相似度计算
常用的相似度计算方法包括余弦相似度和皮尔逊相关系数。余弦相似度计算公式:
sim(u, v) = (u · v) / (||u|| * ||v||)
2.3 MindSpore 实现代码
import mindspore as ms
from mindspore import nn, Tensor, ops
import numpy as np
class CollaborativeFiltering(nn.Cell):
"""
基于用户的协同过滤推荐模型
"""
def __init__(self, num_users, num_items, embedding_dim=64):
super(CollaborativeFiltering, self).__init__()
self.num_users = num_users
self.num_items = num_items
# 用户和物品嵌入矩阵
self.user_embedding = nn.Embedding(num_users, embedding_dim)
self.item_embedding = nn.Embedding(num_items, embedding_dim)
# 初始化参数
self.user_embedding.embedding_table.set_data(ms.common.initializer.XavierUniform())
self.item_embedding.embedding_table.set_data(ms.common.initializer.XavierUniform())
def construct(self, user_ids, item_ids):
"""前向传播,计算用户对物品的预测评分"""
user_emb = self.user_embedding(user_ids) # (batch, embedding_dim)
item_emb = self.item_embedding(item_ids) # (batch, embedding_dim)
# 内积作为预测评分
score = ops.reduce_sum(user_emb * item_emb, axis=-1)
return score
def get_user_similarity(self, user_id, top_k=10):
"""计算指定用户与其他用户的相似度"""
all_user_emb = self.user_embedding.embedding_table
target_user_emb = all_user_emb[user_id]
# 余弦相似度
norm_target = ops.sqrt(ops.reduce_sum(target_user_emb ** 2) + 1e-8)
norm_all = ops.sqrt(ops.reduce_sum(all_user_emb ** 2, axis=1) + 1e-8)
similarity = ops.matmul(all_user_emb, target_user_emb.T) / (norm_all * norm_target)
# 获取最相似的 top_k 用户(排除自身)
top_k_users = ops.argsort(similarity, descending=True)[1:top_k+1]
return top_k_users, similarity[top_k_users]
def recommend(self, user_id, interaction_matrix, top_k=10):
"""为指定用户生成推荐列表"""
# 获取用户已交互的物品
interacted_items = set(np.where(interaction_matrix[user_id] > 0)[0])
# 预测所有未交互物品的评分
user_tensor = Tensor([user_id], dtype=ms.int32)
all_scores = []
for item_id in range(self.num_items):
if item_id not in interacted_items:
item_tensor = Tensor([item_id], dtype=ms.int32)
score = self.construct(user_tensor, item_tensor)
all_scores.append((item_id, score.asnumpy()[0]))
# 按评分排序,返回 top_k 推荐
all_scores.sort(key=lambda x: x[1], reverse=True)
return all_scores[:top_k]
# 模型训练函数
def train_cf_model(model, train_data, epochs=50, lr=0.001):
"""训练协同过滤模型"""
optimizer = nn.Adam(model.trainable_params(), learning_rate=lr)
loss_fn = nn.MSELoss()
def forward_fn(user_ids, item_ids, ratings):
pred = model(user_ids, item_ids)
loss = loss_fn(pred, ratings)
return loss
grad_fn = ms.value_and_grad(forward_fn, None, model.trainable_params())
for epoch in range(epochs):
total_loss = 0
for batch in train_data:
user_ids, item_ids, ratings = batch
loss, grads = grad_fn(user_ids, item_ids, ratings)
optimizer(grads)
total_loss += loss.asnumpy()
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_data):.4f}")
return model
三、矩阵分解模型(MF)
3.1 算法原理
矩阵分解(Matrix Factorization)将用户-物品评分矩阵分解为用户隐向量矩阵和物品隐向量矩阵的乘积。相比协同过滤,矩阵分解能够:
- 处理稀疏数据:通过低维嵌入泛化
- 发现潜在特征:隐向量蕴含隐语义信息
- 可扩展性强:适合大规模数据场景
3.2 MindSpore 实现
class MatrixFactorization(nn.Cell):
"""
矩阵分解推荐模型
"""
def __init__(self, num_users, num_items, factors=64):
super(MatrixFactorization, self).__init__()
self.num_users = num_users
self.num_items = num_items
self.factors = factors
# 用户偏置和全局偏置
self.user_bias = ms.Parameter(Tensor(np.zeros(num_users), dtype=ms.float32))
self.item_bias = ms.Parameter(Tensor(np.zeros(num_items), dtype=ms.float32))
self.global_bias = ms.Parameter(Tensor([0.0], dtype=ms.float32))
# 用户和物品隐向量
self.user_embedding = nn.Embedding(num_users, factors)
self.item_embedding = nn.Embedding(num_items, factors)
# 正则化系数
self.reg = 0.02
def construct(self, user_ids, item_ids):
"""预测评分"""
user_emb = self.user_embedding(user_ids)
item_emb = self.item_embedding(item_ids)
# 获取偏置
u_bias = ops.gather(self.user_bias, user_ids, 0)
i_bias = ops.gather(self.item_bias, item_ids, 0)
# 预测评分 = 全局偏置 + 用户偏置 + 物品偏置 + 隐向量内积
pred = self.global_bias + u_bias + i_bias + ops.reduce_sum(user_emb * item_emb, axis=-1)
return pred
def get_l2_reg(self, user_ids, item_ids):
"""L2正则化"""
user_emb = self.user_embedding(user_ids)
item_emb = self.item_embedding(item_ids)
reg_loss = self.reg * (ops.reduce_sum(user_emb ** 2) + ops.reduce_sum(item_emb ** 2))
return reg_loss
# 完整训练脚本
def train_mf_model():
"""矩阵分解模型训练完整示例"""
# 模拟数据
np.random.seed(42)
num_users, num_items = 1000, 500
num_samples = 10000
user_ids = Tensor(np.random.randint(0, num_users, num_samples), dtype=ms.int32)
item_ids = Tensor(np.random.randint(0, num_items, num_samples), dtype=ms.int32)
ratings = Tensor(np.random.uniform(1, 5, num_samples).astype(np.float32))
# 创建模型
model = MatrixFactorization(num_users, num_items, factors=32)
# 优化器
optimizer = nn.Adam(model.trainable_params(), learning_rate=0.005)
# 训练
for epoch in range(100):
def forward_fn():
pred = model(user_ids, item_ids)
mse = ops.reduce_mean((pred - ratings) ** 2)
reg = model.get_l2_reg(user_ids, item_ids)
return mse + reg
loss, grads = ms.value_and_grad(forward_fn, None, model.trainable_params())()
optimizer(grads)
if (epoch + 1) % 20 == 0:
print(f"Epoch {epoch+1}, Loss: {loss.asnumpy():.4f}")
return model
# 运行训练
if __name__ == "__main__":
ms.set_context(mode=ms.GRAPH_MODE, device_target="CPU")
trained_model = train_mf_model()
print("模型训练完成!")
四、深度学习推荐模型(NCF)
4.1 Neural Collaborative Filtering
NCF(Neural Collaborative Filtering)是微软提出的深度学习推荐框架,用神经网络替代传统协同过滤的内积操作,能够学习任意的用户-物品交互函数。
4.2 模型架构
NCF 包含两个核心组件:
- GMF(Generalized Matrix Factorization)层:学习线性交互
- MLP(Multi-Layer Perceptron)层:学习非线性交互
- NeuMF 层:融合 GMF 和 MLP 的输出
4.3 完整实现
class NCF(nn.Cell):
"""
Neural Collaborative Filtering 深度学习推荐模型
"""
def __init__(self, num_users, num_items, embedding_dim=64, mlp_layers=[128, 64, 32]):
super(NCF, self).__init__()
# GMF 路径嵌入
self.gmf_user_embedding = nn.Embedding(num_users, embedding_dim)
self.gmf_item_embedding = nn.Embedding(num_items, embedding_dim)
# MLP 路径嵌入
self.mlp_user_embedding = nn.Embedding(num_users, embedding_dim)
self.mlp_item_embedding = nn.Embedding(num_items, embedding_dim)
# MLP 层
mlp_input_dim = embedding_dim * 2
mlp_layers_list = []
for out_dim in mlp_layers:
mlp_layers_list.append(nn.Dense(mlp_input_dim, out_dim))
mlp_layers_list.append(nn.ReLU())
mlp_layers_list.append(nn.Dropout(p=0.2))
mlp_input_dim = out_dim
self.mlp = nn.SequentialCell(mlp_layers_list)
# 预测层
self.predict_layer = nn.Dense(mlp_layers[-1] + embedding_dim, 1)
self.sigmoid = nn.Sigmoid()
def construct(self, user_ids, item_ids):
"""前向传播"""
# GMF 路径
gmf_user_emb = self.gmf_user_embedding(user_ids)
gmf_item_emb = self.gmf_item_embedding(item_ids)
gmf_output = gmf_user_emb * gmf_item_emb # 元素级乘法
# MLP 路径
mlp_user_emb = self.mlp_user_embedding(user_ids)
mlp_item_emb = self.mlp_item_embedding(item_ids)
mlp_input = ops.concat([mlp_user_emb, mlp_item_emb], axis=-1)
mlp_output = self.mlp(mlp_input)
# 融合输出
concat_output = ops.concat([gmf_output, mlp_output], axis=-1)
prediction = self.predict_layer(concat_output)
return self.sigmoid(prediction).squeeze(-1)
class NCFLoss(nn.Cell):
"""NCF 二元交叉熵损失"""
def construct(self, pred, label):
eps = 1e-8
return -ops.reduce_mean(label * ops.log(pred + eps) + (1 - label) * ops.log(1 - pred + eps))
# 数据处理类
class RecDataset:
"""推荐系统数据集"""
def __init__(self, interactions, num_negatives=4):
"""
interactions: 用户-物品交互字典 {user_id: [item_ids]}
"""
self.users = []
self.items = []
self.labels = []
all_items = set()
for items in interactions.values():
all_items.update(items)
for user_id, pos_items in interactions.items():
for item_id in pos_items:
self.users.append(user_id)
self.items.append(item_id)
self.labels.append(1.0)
# 负采样
for _ in range(num_negatives):
neg_item = np.random.choice(list(all_items))
while neg_item in pos_items:
neg_item = np.random.choice(list(all_items))
self.users.append(user_id)
self.items.append(neg_item)
self.labels.append(0.0)
def get_batch(self, batch_size=256):
"""获取批次数据"""
indices = np.random.permutation(len(self.users))
for start in range(0, len(self.users), batch_size):
batch_indices = indices[start:start+batch_size]
yield (
Tensor(self.users[batch_indices], dtype=ms.int32),
Tensor(self.items[batch_indices], dtype=ms.int32),
Tensor(self.labels[batch_indices], dtype=ms.float32)
)
# 模型评估函数
def evaluate_model(model, test_data, k=10):
"""评估推荐模型(Hit Rate 和 NDCG)"""
hit_count = 0
ndcg_sum = 0.0
total = 0
for user_id, (pos_item, neg_items) in test_data.items():
# 模拟测试:1个正样本 + 99个负样本
all_items = [pos_item] + neg_items
user_tensor = Tensor([user_id] * len(all_items), dtype=ms.int32)
item_tensor = Tensor(all_items, dtype=ms.int32)
scores = model(user_tensor, item_tensor).asnumpy()
ranked_items = np.argsort(scores)[::-1][:k]
if 0 in ranked_items: # 正样本排在 top_k 中
hit_count += 1
rank = np.where(ranked_items == 0)[0][0]
ndcg_sum += 1.0 / np.log2(rank + 2)
total += 1
hr = hit_count / total
ndcg = ndcg_sum / total
return hr, ndcg
# 完整训练流程
def train_ncf_complete():
"""NCF 模型完整训练流程"""
print("=" * 50)
print("MindSpore NCF 推荐模型训练")
print("=" * 50)
# 模拟数据生成
np.random.seed(42)
num_users, num_items = 1000, 500
interactions = {}
for user_id in range(num_users):
num_interactions = np.random.randint(10, 50)
interactions[user_id] = list(np.random.choice(num_items, num_interactions, replace=False))
# 创建数据集
dataset = RecDataset(interactions, num_negatives=4)
# 初始化模型
model = NCF(num_users, num_items, embedding_dim=32, mlp_layers=[64, 32, 16])
loss_fn = NCFLoss()
optimizer = nn.Adam(model.trainable_params(), learning_rate=0.001)
# 训练循环
epochs = 20
for epoch in range(epochs):
total_loss = 0.0
batch_count = 0
for user_ids, item_ids, labels in dataset.get_batch(batch_size=512):
def forward_fn():
pred = model(user_ids, item_ids)
return loss_fn(pred, labels)
loss, grads = ms.value_and_grad(forward_fn, None, model.trainable_params())()
optimizer(grads)
total_loss += loss.asnumpy()
batch_count += 1
avg_loss = total_loss / batch_count
if (epoch + 1) % 5 == 0:
print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")
print("\n模型训练完成!")
print("=" * 50)
# 模拟评估
print("模型评估结果:")
hr, ndcg = evaluate_model(model, {0: (1, list(range(2, 102)))})
print(f"Hit Rate@10: {hr:.4f}")
print(f"NDCG@10: {ndcg:.4f}")
return model
if __name__ == "__main__":
ms.set_context(mode=ms.GRAPH_MODE, device_target="CPU")
ncf_model = train_ncf_complete()
五、推荐系统评估指标
5.1 离线评估指标
| 指标 | 公式 | 说明 |
|---|---|---|
| Hit Rate@K | 命中数 / 总用户数 | 推荐列表中包含正样本的比例 |
| NDCG@K | DCG / IDCG | 考虑排序位置的加权指标 |
| MAP@K | 精确率平均值 | 多个位置的精确率均值 |
| RMSE | √(Σ(r-ř)²/n) | 预测评分与真实评分的差异 |
5.2 在线评估指标
- 点击率(CTR):用户点击推荐物品的比例
- 转化率(CVR):点击后完成目标行为的比例
- 用户停留时长:用户在推荐内容上的时间
- GMV:推荐带来的成交金额
六、生产环境部署建议
6.1 架构设计
推荐系统在生产环境通常采用分层架构:
- 召回层:从百万级物品中快速筛选千级候选集
- 排序层:对候选集进行精细打分排序
- 重排层:考虑多样性、时效性等业务规则
6.2 MindSpore 部署优势
- 高性能推理:支持 Ascend/GPU/CPU 多硬件加速
- 模型导出:支持 MindIR、ONNX 格式导出
- 分布式部署:支持参数服务器和 AllReduce 模式
# 模型导出示例
def export_model(model, num_users, num_items):
"""导出模型为 MindIR 格式"""
user_input = Tensor(np.array([0], dtype=np.int32))
item_input = Tensor(np.array([0], dtype=np.int32))
ms.export(model, user_input, item_input,
file_name="ncf_model", file_format="MINDIR")
print("模型已导出为 ncf_model.mindir")
# 模型加载与推理
def load_and_infer():
"""加载模型进行推理"""
graph = ms.load("ncf_model.mindir")
net = nn.GraphCell(graph)
# 推理
user_id = Tensor([1], dtype=ms.int32)
item_id = Tensor([100], dtype=ms.int32)
score = net(user_id, item_id)
print(f"预测评分: {score.asnumpy()[0]:.4f}")
七、总结
本文系统介绍了使用 MindSpore 构建推荐系统的完整流程:
- 协同过滤:经典的用户行为推荐算法,适合快速原型验证
- 矩阵分解:处理稀疏数据的利器,挖掘隐语义特征
- NCF 深度模型:融合线性和非线性交互,提升推荐精度
- 评估体系:建立完整的离线和在线评估指标
- 生产部署:分层架构设计,支持高并发实时推荐
推荐系统是一个不断迭代的领域,需要结合业务场景持续优化。MindSpore 提供的灵活架构和高性能算力,为推荐系统的研发和部署提供了坚实基础。
更多 MindSpore 示例代码可参考华为云 ModelArts 平台的相关教程资源。
- 点赞
- 收藏
- 关注作者
评论(0)