MindSpore 推荐系统实战

举报
whitea133 发表于 2026/04/29 01:02:47 2026/04/29
【摘要】 MindSpore 推荐系统实战推荐系统是现代互联网应用的核心技术之一,从电商平台的商品推荐到视频平台的个性化内容推送,推荐算法无处不在。本文将深入讲解如何使用 MindSpore 构建一个完整的推荐系统,涵盖协同过滤、矩阵分解、深度学习推荐模型等核心技术。 一、推荐系统概述 1.1 推荐系统的价值推荐系统的核心目标是解决信息过载问题,帮助用户从海量内容中发现感兴趣的物品。一个优秀的推荐系...

MindSpore 推荐系统实战

推荐系统是现代互联网应用的核心技术之一,从电商平台的商品推荐到视频平台的个性化内容推送,推荐算法无处不在。本文将深入讲解如何使用 MindSpore 构建一个完整的推荐系统,涵盖协同过滤、矩阵分解、深度学习推荐模型等核心技术。

一、推荐系统概述

1.1 推荐系统的价值

推荐系统的核心目标是解决信息过载问题,帮助用户从海量内容中发现感兴趣的物品。一个优秀的推荐系统能够:

  • 提升用户体验:精准推荐减少用户搜索成本
  • 增加平台收益:提高点击率、转化率和用户留存
  • 挖掘长尾价值:让冷门内容也有机会被推荐

1.2 推荐算法分类

主流推荐算法可分为三类:

类型 代表算法 特点
协同过滤 UserCF、ItemCF 利用用户行为数据,推荐相似用户喜欢的物品
内容推荐 基于物品属性 利用物品特征,推荐相似内容
混合推荐 多种方法融合 结合多种方法优势,提升推荐效果

本文将重点实现协同过滤和基于深度学习的推荐模型。

二、协同过滤算法实现

2.1 算法原理

协同过滤(Collaborative Filtering)是推荐系统中最经典的算法。其核心思想是:如果两个用户对某些物品的评价相似,那么他们对其他物品的评价也倾向于相似。

User-CF(基于用户的协同过滤)步骤:

  1. 计算用户之间的相似度
  2. 找出与目标用户最相似的 K 个用户
  3. 根据相似用户的喜好,推荐目标用户未交互过的物品

2.2 相似度计算

常用的相似度计算方法包括余弦相似度和皮尔逊相关系数。余弦相似度计算公式:

sim(u, v) = (u · v) / (||u|| * ||v||)

2.3 MindSpore 实现代码

import mindspore as ms
from mindspore import nn, Tensor, ops
import numpy as np

class CollaborativeFiltering(nn.Cell):
    """
    基于用户的协同过滤推荐模型
    """
    def __init__(self, num_users, num_items, embedding_dim=64):
        super(CollaborativeFiltering, self).__init__()
        self.num_users = num_users
        self.num_items = num_items

        # 用户和物品嵌入矩阵
        self.user_embedding = nn.Embedding(num_users, embedding_dim)
        self.item_embedding = nn.Embedding(num_items, embedding_dim)

        # 初始化参数
        self.user_embedding.embedding_table.set_data(ms.common.initializer.XavierUniform())
        self.item_embedding.embedding_table.set_data(ms.common.initializer.XavierUniform())

    def construct(self, user_ids, item_ids):
        """前向传播,计算用户对物品的预测评分"""
        user_emb = self.user_embedding(user_ids)  # (batch, embedding_dim)
        item_emb = self.item_embedding(item_ids)  # (batch, embedding_dim)

        # 内积作为预测评分
        score = ops.reduce_sum(user_emb * item_emb, axis=-1)
        return score

    def get_user_similarity(self, user_id, top_k=10):
        """计算指定用户与其他用户的相似度"""
        all_user_emb = self.user_embedding.embedding_table
        target_user_emb = all_user_emb[user_id]

        # 余弦相似度
        norm_target = ops.sqrt(ops.reduce_sum(target_user_emb ** 2) + 1e-8)
        norm_all = ops.sqrt(ops.reduce_sum(all_user_emb ** 2, axis=1) + 1e-8)

        similarity = ops.matmul(all_user_emb, target_user_emb.T) / (norm_all * norm_target)

        # 获取最相似的 top_k 用户(排除自身)
        top_k_users = ops.argsort(similarity, descending=True)[1:top_k+1]
        return top_k_users, similarity[top_k_users]

    def recommend(self, user_id, interaction_matrix, top_k=10):
        """为指定用户生成推荐列表"""
        # 获取用户已交互的物品
        interacted_items = set(np.where(interaction_matrix[user_id] > 0)[0])

        # 预测所有未交互物品的评分
        user_tensor = Tensor([user_id], dtype=ms.int32)
        all_scores = []

        for item_id in range(self.num_items):
            if item_id not in interacted_items:
                item_tensor = Tensor([item_id], dtype=ms.int32)
                score = self.construct(user_tensor, item_tensor)
                all_scores.append((item_id, score.asnumpy()[0]))

        # 按评分排序,返回 top_k 推荐
        all_scores.sort(key=lambda x: x[1], reverse=True)
        return all_scores[:top_k]

# 模型训练函数
def train_cf_model(model, train_data, epochs=50, lr=0.001):
    """训练协同过滤模型"""
    optimizer = nn.Adam(model.trainable_params(), learning_rate=lr)
    loss_fn = nn.MSELoss()

    def forward_fn(user_ids, item_ids, ratings):
        pred = model(user_ids, item_ids)
        loss = loss_fn(pred, ratings)
        return loss

    grad_fn = ms.value_and_grad(forward_fn, None, model.trainable_params())

    for epoch in range(epochs):
        total_loss = 0
        for batch in train_data:
            user_ids, item_ids, ratings = batch
            loss, grads = grad_fn(user_ids, item_ids, ratings)
            optimizer(grads)
            total_loss += loss.asnumpy()

        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_data):.4f}")

    return model

三、矩阵分解模型(MF)

3.1 算法原理

矩阵分解(Matrix Factorization)将用户-物品评分矩阵分解为用户隐向量矩阵和物品隐向量矩阵的乘积。相比协同过滤,矩阵分解能够:

  • 处理稀疏数据:通过低维嵌入泛化
  • 发现潜在特征:隐向量蕴含隐语义信息
  • 可扩展性强:适合大规模数据场景

3.2 MindSpore 实现

class MatrixFactorization(nn.Cell):
    """
    矩阵分解推荐模型
    """
    def __init__(self, num_users, num_items, factors=64):
        super(MatrixFactorization, self).__init__()
        self.num_users = num_users
        self.num_items = num_items
        self.factors = factors

        # 用户偏置和全局偏置
        self.user_bias = ms.Parameter(Tensor(np.zeros(num_users), dtype=ms.float32))
        self.item_bias = ms.Parameter(Tensor(np.zeros(num_items), dtype=ms.float32))
        self.global_bias = ms.Parameter(Tensor([0.0], dtype=ms.float32))

        # 用户和物品隐向量
        self.user_embedding = nn.Embedding(num_users, factors)
        self.item_embedding = nn.Embedding(num_items, factors)

        # 正则化系数
        self.reg = 0.02

    def construct(self, user_ids, item_ids):
        """预测评分"""
        user_emb = self.user_embedding(user_ids)
        item_emb = self.item_embedding(item_ids)

        # 获取偏置
        u_bias = ops.gather(self.user_bias, user_ids, 0)
        i_bias = ops.gather(self.item_bias, item_ids, 0)

        # 预测评分 = 全局偏置 + 用户偏置 + 物品偏置 + 隐向量内积
        pred = self.global_bias + u_bias + i_bias + ops.reduce_sum(user_emb * item_emb, axis=-1)
        return pred

    def get_l2_reg(self, user_ids, item_ids):
        """L2正则化"""
        user_emb = self.user_embedding(user_ids)
        item_emb = self.item_embedding(item_ids)
        reg_loss = self.reg * (ops.reduce_sum(user_emb ** 2) + ops.reduce_sum(item_emb ** 2))
        return reg_loss

# 完整训练脚本
def train_mf_model():
    """矩阵分解模型训练完整示例"""
    # 模拟数据
    np.random.seed(42)
    num_users, num_items = 1000, 500
    num_samples = 10000

    user_ids = Tensor(np.random.randint(0, num_users, num_samples), dtype=ms.int32)
    item_ids = Tensor(np.random.randint(0, num_items, num_samples), dtype=ms.int32)
    ratings = Tensor(np.random.uniform(1, 5, num_samples).astype(np.float32))

    # 创建模型
    model = MatrixFactorization(num_users, num_items, factors=32)

    # 优化器
    optimizer = nn.Adam(model.trainable_params(), learning_rate=0.005)

    # 训练
    for epoch in range(100):
        def forward_fn():
            pred = model(user_ids, item_ids)
            mse = ops.reduce_mean((pred - ratings) ** 2)
            reg = model.get_l2_reg(user_ids, item_ids)
            return mse + reg

        loss, grads = ms.value_and_grad(forward_fn, None, model.trainable_params())()
        optimizer(grads)

        if (epoch + 1) % 20 == 0:
            print(f"Epoch {epoch+1}, Loss: {loss.asnumpy():.4f}")

    return model

# 运行训练
if __name__ == "__main__":
    ms.set_context(mode=ms.GRAPH_MODE, device_target="CPU")
    trained_model = train_mf_model()
    print("模型训练完成!")

四、深度学习推荐模型(NCF)

4.1 Neural Collaborative Filtering

NCF(Neural Collaborative Filtering)是微软提出的深度学习推荐框架,用神经网络替代传统协同过滤的内积操作,能够学习任意的用户-物品交互函数。

4.2 模型架构

NCF 包含两个核心组件:

  • GMF(Generalized Matrix Factorization)层:学习线性交互
  • MLP(Multi-Layer Perceptron)层:学习非线性交互
  • NeuMF 层:融合 GMF 和 MLP 的输出

4.3 完整实现

class NCF(nn.Cell):
    """
    Neural Collaborative Filtering 深度学习推荐模型
    """
    def __init__(self, num_users, num_items, embedding_dim=64, mlp_layers=[128, 64, 32]):
        super(NCF, self).__init__()

        # GMF 路径嵌入
        self.gmf_user_embedding = nn.Embedding(num_users, embedding_dim)
        self.gmf_item_embedding = nn.Embedding(num_items, embedding_dim)

        # MLP 路径嵌入
        self.mlp_user_embedding = nn.Embedding(num_users, embedding_dim)
        self.mlp_item_embedding = nn.Embedding(num_items, embedding_dim)

        # MLP 层
        mlp_input_dim = embedding_dim * 2
        mlp_layers_list = []
        for out_dim in mlp_layers:
            mlp_layers_list.append(nn.Dense(mlp_input_dim, out_dim))
            mlp_layers_list.append(nn.ReLU())
            mlp_layers_list.append(nn.Dropout(p=0.2))
            mlp_input_dim = out_dim
        self.mlp = nn.SequentialCell(mlp_layers_list)

        # 预测层
        self.predict_layer = nn.Dense(mlp_layers[-1] + embedding_dim, 1)
        self.sigmoid = nn.Sigmoid()

    def construct(self, user_ids, item_ids):
        """前向传播"""
        # GMF 路径
        gmf_user_emb = self.gmf_user_embedding(user_ids)
        gmf_item_emb = self.gmf_item_embedding(item_ids)
        gmf_output = gmf_user_emb * gmf_item_emb  # 元素级乘法

        # MLP 路径
        mlp_user_emb = self.mlp_user_embedding(user_ids)
        mlp_item_emb = self.mlp_item_embedding(item_ids)
        mlp_input = ops.concat([mlp_user_emb, mlp_item_emb], axis=-1)
        mlp_output = self.mlp(mlp_input)

        # 融合输出
        concat_output = ops.concat([gmf_output, mlp_output], axis=-1)
        prediction = self.predict_layer(concat_output)
        return self.sigmoid(prediction).squeeze(-1)

class NCFLoss(nn.Cell):
    """NCF 二元交叉熵损失"""
    def construct(self, pred, label):
        eps = 1e-8
        return -ops.reduce_mean(label * ops.log(pred + eps) + (1 - label) * ops.log(1 - pred + eps))

# 数据处理类
class RecDataset:
    """推荐系统数据集"""
    def __init__(self, interactions, num_negatives=4):
        """
        interactions: 用户-物品交互字典 {user_id: [item_ids]}
        """
        self.users = []
        self.items = []
        self.labels = []

        all_items = set()
        for items in interactions.values():
            all_items.update(items)

        for user_id, pos_items in interactions.items():
            for item_id in pos_items:
                self.users.append(user_id)
                self.items.append(item_id)
                self.labels.append(1.0)

                # 负采样
                for _ in range(num_negatives):
                    neg_item = np.random.choice(list(all_items))
                    while neg_item in pos_items:
                        neg_item = np.random.choice(list(all_items))
                    self.users.append(user_id)
                    self.items.append(neg_item)
                    self.labels.append(0.0)

    def get_batch(self, batch_size=256):
        """获取批次数据"""
        indices = np.random.permutation(len(self.users))
        for start in range(0, len(self.users), batch_size):
            batch_indices = indices[start:start+batch_size]
            yield (
                Tensor(self.users[batch_indices], dtype=ms.int32),
                Tensor(self.items[batch_indices], dtype=ms.int32),
                Tensor(self.labels[batch_indices], dtype=ms.float32)
            )

# 模型评估函数
def evaluate_model(model, test_data, k=10):
    """评估推荐模型(Hit Rate 和 NDCG)"""
    hit_count = 0
    ndcg_sum = 0.0
    total = 0

    for user_id, (pos_item, neg_items) in test_data.items():
        # 模拟测试:1个正样本 + 99个负样本
        all_items = [pos_item] + neg_items
        user_tensor = Tensor([user_id] * len(all_items), dtype=ms.int32)
        item_tensor = Tensor(all_items, dtype=ms.int32)

        scores = model(user_tensor, item_tensor).asnumpy()
        ranked_items = np.argsort(scores)[::-1][:k]

        if 0 in ranked_items:  # 正样本排在 top_k 中
            hit_count += 1
            rank = np.where(ranked_items == 0)[0][0]
            ndcg_sum += 1.0 / np.log2(rank + 2)

        total += 1

    hr = hit_count / total
    ndcg = ndcg_sum / total
    return hr, ndcg

# 完整训练流程
def train_ncf_complete():
    """NCF 模型完整训练流程"""
    print("=" * 50)
    print("MindSpore NCF 推荐模型训练")
    print("=" * 50)

    # 模拟数据生成
    np.random.seed(42)
    num_users, num_items = 1000, 500

    interactions = {}
    for user_id in range(num_users):
        num_interactions = np.random.randint(10, 50)
        interactions[user_id] = list(np.random.choice(num_items, num_interactions, replace=False))

    # 创建数据集
    dataset = RecDataset(interactions, num_negatives=4)

    # 初始化模型
    model = NCF(num_users, num_items, embedding_dim=32, mlp_layers=[64, 32, 16])
    loss_fn = NCFLoss()
    optimizer = nn.Adam(model.trainable_params(), learning_rate=0.001)

    # 训练循环
    epochs = 20
    for epoch in range(epochs):
        total_loss = 0.0
        batch_count = 0

        for user_ids, item_ids, labels in dataset.get_batch(batch_size=512):
            def forward_fn():
                pred = model(user_ids, item_ids)
                return loss_fn(pred, labels)

            loss, grads = ms.value_and_grad(forward_fn, None, model.trainable_params())()
            optimizer(grads)
            total_loss += loss.asnumpy()
            batch_count += 1

        avg_loss = total_loss / batch_count
        if (epoch + 1) % 5 == 0:
            print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

    print("\n模型训练完成!")
    print("=" * 50)

    # 模拟评估
    print("模型评估结果:")
    hr, ndcg = evaluate_model(model, {0: (1, list(range(2, 102)))})
    print(f"Hit Rate@10: {hr:.4f}")
    print(f"NDCG@10: {ndcg:.4f}")

    return model

if __name__ == "__main__":
    ms.set_context(mode=ms.GRAPH_MODE, device_target="CPU")
    ncf_model = train_ncf_complete()

五、推荐系统评估指标

5.1 离线评估指标

指标 公式 说明
Hit Rate@K 命中数 / 总用户数 推荐列表中包含正样本的比例
NDCG@K DCG / IDCG 考虑排序位置的加权指标
MAP@K 精确率平均值 多个位置的精确率均值
RMSE √(Σ(r-ř)²/n) 预测评分与真实评分的差异

5.2 在线评估指标

  • 点击率(CTR):用户点击推荐物品的比例
  • 转化率(CVR):点击后完成目标行为的比例
  • 用户停留时长:用户在推荐内容上的时间
  • GMV:推荐带来的成交金额

六、生产环境部署建议

6.1 架构设计

推荐系统在生产环境通常采用分层架构:

  1. 召回层:从百万级物品中快速筛选千级候选集
  2. 排序层:对候选集进行精细打分排序
  3. 重排层:考虑多样性、时效性等业务规则

6.2 MindSpore 部署优势

  • 高性能推理:支持 Ascend/GPU/CPU 多硬件加速
  • 模型导出:支持 MindIR、ONNX 格式导出
  • 分布式部署:支持参数服务器和 AllReduce 模式
# 模型导出示例
def export_model(model, num_users, num_items):
    """导出模型为 MindIR 格式"""
    user_input = Tensor(np.array([0], dtype=np.int32))
    item_input = Tensor(np.array([0], dtype=np.int32))

    ms.export(model, user_input, item_input, 
              file_name="ncf_model", file_format="MINDIR")
    print("模型已导出为 ncf_model.mindir")

# 模型加载与推理
def load_and_infer():
    """加载模型进行推理"""
    graph = ms.load("ncf_model.mindir")
    net = nn.GraphCell(graph)

    # 推理
    user_id = Tensor([1], dtype=ms.int32)
    item_id = Tensor([100], dtype=ms.int32)
    score = net(user_id, item_id)
    print(f"预测评分: {score.asnumpy()[0]:.4f}")

七、总结

本文系统介绍了使用 MindSpore 构建推荐系统的完整流程:

  1. 协同过滤:经典的用户行为推荐算法,适合快速原型验证
  2. 矩阵分解:处理稀疏数据的利器,挖掘隐语义特征
  3. NCF 深度模型:融合线性和非线性交互,提升推荐精度
  4. 评估体系:建立完整的离线和在线评估指标
  5. 生产部署:分层架构设计,支持高并发实时推荐

推荐系统是一个不断迭代的领域,需要结合业务场景持续优化。MindSpore 提供的灵活架构和高性能算力,为推荐系统的研发和部署提供了坚实基础。


更多 MindSpore 示例代码可参考华为云 ModelArts 平台的相关教程资源。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。