MindSpore 强化学习实战

举报
whitea133 发表于 2026/05/06 15:27:58 2026/05/06
【摘要】 MindSpore 强化学习实战 一、引言强化学习(Reinforcement Learning,RL)是机器学习领域三大分支之一,与监督学习和无监督学习并列。与监督学习不同,强化学习强调智能体(Agent)通过与环境(Environment)的持续交互来学习最优策略。在每一轮交互中,智能体根据当前状态(State)选择动作(Action),环境随后返回新的状态和奖励(Reward),智能...

MindSpore 强化学习实战

一、引言

强化学习(Reinforcement Learning,RL)是机器学习领域三大分支之一,与监督学习和无监督学习并列。与监督学习不同,强化学习强调智能体(Agent)通过与环境(Environment)的持续交互来学习最优策略。在每一轮交互中,智能体根据当前状态(State)选择动作(Action),环境随后返回新的状态和奖励(Reward),智能体据此调整策略。这种"试错-反馈"的学习范式使强化学习在游戏、机器人控制、推荐系统等领域展现出强大的能力。

MindSpore是华为自主研发的全场景AI计算框架,提供了完整的强化学习工具链MindSpore RL,支持多智能体强化学习、策略优化、分布式训练等高级特性。本文将通过完整的代码示例,带领读者从零实现基于MindSpore的强化学习算法,掌握DQN(Deep Q-Network)和Policy Gradient两大核心技术的实战技能。

二、强化学习基础概念

在深入代码实现之前,我们首先梳理强化学习的核心概念和数学框架。强化学习的基本模型可以表示为马尔可夫决策过程(Markov Decision Process,MDP),由五元组(S, A, P, R, γ)定义:

  • S:状态空间(State Space),智能体可能处于的所有状态集合
  • A:动作空间(Action Space),智能体可以执行的所有动作集合
  • P:状态转移概率(Transition Probability),P(s’|s,a)表示在状态s执行动作a后转移到s’的概率
  • R:奖励函数(Reward Function),R(s,a)或R(s,a,s’)表示当前状态动作获得的即时奖励
  • γ:折扣因子(Discount Factor),控制未来奖励的重要性,取值范围通常为[0.95, 0.999]

智能体的目标是最大化累计折扣奖励Gt = Rt + γRt+1 + γ²Rt+2 + …,这被称为回报(Return)。策略π(a|s)定义了给定状态下选择动作的概率分布,智能体的任务是找到最优策略π*使期望回报最大化。

三、环境配置与MindSpore RL安装

3.1 安装MindSpore RL

# 使用pip安装MindSpore RL
pip install mindspore mindspore-rl

# 验证安装
python -c "import mindspore as ms; import mindspore_rl; print('MindSpore version:', ms.__version__)"
print('MindSpore RL installed successfully!')

3.2 验证强化学习环境

import mindspore as ms
import mindspore.nn as nn
from mindspore import Tensor
import numpy as np

print(f"MindSpore版本: {ms.__version__}")
print("强化学习环境配置完成!")

# 测试基础张量运算
x = Tensor([1.0, 2.0, 3.0], ms.float32)
y = Tensor([4.0, 5.0, 6.0], ms.float32)
print(f"测试张量加法: {x + y}")

四、DQN算法实战

4.1 DQN算法原理

DQN(Deep Q-Network)是将深度学习与强化学习结合的开创性算法,由DeepMind于2013年提出并在Nature 2015论文中完善。DQN的核心思想是利用深度神经网络逼近最优动作价值函数Q*(s,a),从而在给定状态下选择最优动作。

DQN引入了两个关键技术来解决深度网络训练的稳定性问题:

  1. 经验回放(Experience Replay):将智能体与环境的交互经验存储在回放缓冲区中,训练时随机采样,打破了样本间的时序相关性

  2. 目标网络(Target Network):使用独立的网络计算目标Q值并定期同步权重,避免计算目标时的自举误差

4.2 DQN完整实现

import mindspore as ms
import mindspore.nn as nn
import mindspore.ops as ops
from mindspore.common.initializer import HeNormal
import numpy as np
from collections import deque
import random

class ReplayBuffer:
    """经验回放缓冲区"""
    def __init__(self, capacity: int = 10000):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size: int):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (np.array(states), np.array(actions), 
                np.array(rewards), np.array(next_states), np.array(dones))

    def __len__(self):
        return len(self.buffer)


class QNetwork(nn.Cell):
    """深度Q网络"""
    def __init__(self, state_dim: int, action_dim: int):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Dense(state_dim, 128, weight_init=HeNormal())
        self.fc2 = nn.Dense(128, 128, weight_init=HeNormal())
        self.fc3 = nn.Dense(128, action_dim, weight_init=HeNormal())
        self.relu = nn.ReLU()

    def construct(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        return self.fc3(x)


class DQNAgent:
    """DQN智能体"""
    def __init__(self, state_dim: int, action_dim: int, 
                 learning_rate: float = 1e-3, gamma: float = 0.99,
                 epsilon: float = 1.0, epsilon_decay: float = 0.995,
                 epsilon_min: float = 0.01, target_update_freq: int = 100):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.target_update_freq = target_update_freq
        self.train_step = 0

        # 主网络和目标网络
        self.q_network = QNetwork(state_dim, action_dim)
        self.target_network = QNetwork(state_dim, action_dim)
        self.target_network.set_train(False)
        self._sync_weights()

        # 优化器
        self.optimizer = nn.Adam(self.q_network.trainable_params(), 
                                  learning_rate=learning_rate)

        # 经验回放缓冲区
        self.replay_buffer = ReplayBuffer(capacity=10000)

        # 损失函数
        self.loss_fn = nn.MSELoss()

    def _sync_weights(self):
        """同步目标网络权重"""
        for target_param, param in zip(
            self.target_network.get_parameters(),
            self.q_network.get_parameters()
        ):
            target_param.set_data(param.data)

    def select_action(self, state: np.ndarray, training: bool = True) -> int:
        """epsilon-greedy策略选择动作"""
        if training and random.random() < self.epsilon:
            return random.randint(0, self.action_dim - 1)

        state_tensor = Tensor(state.reshape(1, -1), ms.float32)
        q_values = self.q_network(state_tensor)
        return int(q_values.argmax(1)[0])

    def store_transition(self, state, action, reward, next_state, done):
        """存储转换经验"""
        self.replay_buffer.push(state, action, reward, next_state, done)

    def train(self, batch_size: int = 64) -> float:
        """训练网络"""
        if len(self.replay_buffer) < batch_size:
            return 0.0

        states, actions, rewards, next_states, dones = \
            self.replay_buffer.sample(batch_size)

        states = Tensor(states, ms.float32)
        actions = Tensor(actions.astype(int), ms.int32)
        rewards = Tensor(rewards, ms.float32)
        next_states = Tensor(next_states, ms.float32)
        dones = Tensor(dones.astype(float), ms.float32)

        # 计算当前Q值
        q_values = self.q_network(states)
        current_q = q_values.gather_elements(1, actions.unsqueeze(1)).squeeze(1)

        # 计算目标Q值(使用目标网络)
        next_q_values = self.target_network(next_states)
        max_next_q = next_q_values.max(1)[0]
        target_q = rewards + self.gamma * max_next_q * (1 - dones)

        # 计算损失并更新
        loss = self.loss_fn(current_q, target_q)
        self.optimizer(loss)

        # 更新目标网络
        self.train_step += 1
        if self.train_step % self.target_update_freq == 0:
            self._sync_weights()

        # 衰减epsilon
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

        return float(loss.asnumpy())

    def save(self, path: str):
        """保存模型"""
        ms.save_checkpoint(self.q_network, path)

    def load(self, path: str):
        """加载模型"""
        ms.load_param_into_net(self.q_network, 
                               ms.load_checkpoint(path))


class CartPoleEnv:
    """简化的CartPole环境"""
    def __init__(self):
        self.state_dim = 4
        self.action_dim = 2
        self.gravity = 9.8
        self.masscart = 1.0
        self.masspole = 0.1
        self.length = 0.5
        self.force_mag = 10.0
        self.tau = 0.02
        self._reset()

    def _reset(self):
        self.x = np.random.uniform(-0.1, 0.1)
        self.x_dot = np.random.uniform(-0.1, 0.1)
        self.theta = np.random.uniform(-0.1, 0.1)
        self.theta_dot = np.random.uniform(-0.1, 0.1)
        return self._get_state()

    def _get_state(self):
        return np.array([self.x, self.x_dot, self.theta, self.theta_dot])

    def step(self, action: int):
        force = self.force_mag if action == 1 else -self.force_mag
        costheta = np.cos(self.theta)
        sintheta = np.sin(self.theta)

        temp = (force + self.masspole * self.length * self.theta_dot**2 * sintheta) \
               / (self.masscart + self.masspole)
        thetaacc = (self.gravity * sintheta - costheta * temp) / \
                   (self.length * (4.0/3.0 - self.masspole * costheta**2 / 
                    (self.masscart + self.masspole)))
        xacc = temp - self.masspole * self.length * thetaacc * costheta / \
               (self.masscart + self.masspole)

        self.x += self.tau * self.x_dot
        self.x_dot += self.tau * xacc
        self.theta += self.tau * self.theta_dot
        self.theta_dot += self.tau * thetaacc

        state = self._get_state()
        done = abs(self.x) > 2.4 or abs(self.theta) > np.pi / 12
        reward = 0 if done else 1.0

        return state, reward, done

    def render(self):
        pass


def train_dqn(episodes: int = 500, batch_size: int = 64):
    """训练DQN智能体"""
    env = CartPoleEnv()
    agent = DQNAgent(state_dim=4, action_dim=2, 
                    learning_rate=1e-3, gamma=0.99)

    episode_rewards = []
    recent_rewards = deque(maxlen=20)

    print("开始训练DQN智能体...")
    for episode in range(episodes):
        state = env._reset()
        total_reward = 0
        done = False

        while not done:
            # 选择动作
            action = agent.select_action(state, training=True)

            # 执行动作
            next_state, reward, done = env.step(action)

            # 存储经验
            agent.store_transition(state, action, reward, next_state, done)

            # 训练
            loss = agent.train(batch_size)

            state = next_state
            total_reward += reward

        episode_rewards.append(total_reward)
        recent_rewards.append(total_reward)
        avg_reward = np.mean(recent_rewards)

        if (episode + 1) % 10 == 0:
            print(f"Episode {episode+1}/{episodes} | "
                  f"奖励: {total_reward:.1f} | "
                  f"平均奖励(近20轮): {avg_reward:.1f} | "
                  f"epsilon: {agent.epsilon:.3f}")

        if avg_reward >= 195:
            print(f"\n🎉 训练完成!连续20轮平均奖励达到{avg_reward:.1f}")
            break

    return agent, episode_rewards


if __name__ == "__main__":
    ms.set_context(mode=ms.GRAPH_MODE)
    trained_agent, rewards = train_dqn(episodes=500)

    # 保存训练好的模型
    trained_agent.save("dqn_cartpole.ckpt")
    print("\n✅ 模型已保存至 dqn_cartpole.ckpt")

4.3 DQN训练结果可视化

import matplotlib.pyplot as plt

def plot_training_curve(rewards: list, window: int = 20):
    """绘制训练曲线"""
    fig, ax = plt.subplots(1, 1, figsize=(10, 6))

    episodes = range(1, len(rewards) + 1)
    ax.plot(episodes, rewards, alpha=0.3, label='原始奖励')

    # 计算滑动平均
    avg_rewards = []
    for i in range(len(rewards)):
        start = max(0, i - window + 1)
        avg_rewards.append(np.mean(rewards[start:i+1]))

    ax.plot(episodes, avg_rewards, linewidth=2, 
            label=f'{window}轮滑动平均', color='red')

    ax.set_xlabel('训练轮次', fontsize=12)
    ax.set_ylabel('累计奖励', fontsize=12)
    ax.set_title('DQN训练曲线 - CartPole环境', fontsize=14)
    ax.legend()
    ax.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig('dqn_training_curve.png', dpi=150)
    plt.show()
    print("训练曲线已保存至 dqn_training_curve.png")

五、Policy Gradient算法实战

5.1 Policy Gradient原理

与DQN学习动作价值函数不同,Policy Gradient方法直接学习策略函数π(a|s)。这类方法的优势在于可以处理连续动作空间,并且收敛更加稳定。REINFORCE算法是最基础的Policy Gradient方法,其核心思想是利用蒙特卡洛采样估计梯度,然后使用梯度上升更新策略参数。

策略梯度定理指出:∇θJ(θ) = E[∇θlogπθ(a|s) · Qπ(s,a)],其中J(θ)是期望累计回报,πθ是参数化为θ的策略。

5.2 REINFORCE算法完整实现

import mindspore as ms
import mindspore.nn as nn
import mindspore.ops as ops
from mindspore.common.initializer import HeNormal
import numpy as np

class PolicyNetwork(nn.Cell):
    """策略网络"""
    def __init__(self, state_dim: int, action_dim: int, hidden_dim: int = 128):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Dense(state_dim, hidden_dim, weight_init=HeNormal())
        self.fc2 = nn.Dense(hidden_dim, hidden_dim, weight_init=HeNormal())
        self.fc3 = nn.Dense(hidden_dim, action_dim, weight_init=HeNormal())
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax()

    def construct(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        logits = self.fc3(x)
        return self.softmax(logits)


class ValueNetwork(nn.Cell):
    """价值网络(用于 Advantage 估计)"""
    def __init__(self, state_dim: int, hidden_dim: int = 128):
        super(ValueNetwork, self).__init__()
        self.fc1 = nn.Dense(state_dim, hidden_dim, weight_init=HeNormal())
        self.fc2 = nn.Dense(hidden_dim, hidden_dim, weight_init=HeNormal())
        self.fc3 = nn.Dense(hidden_dim, 1, weight_init=HeNormal())
        self.relu = nn.ReLU()

    def construct(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        return self.fc3(x)


class REINFORCEAgent:
    """REINFORCE智能体 with Advantage"""
    def __init__(self, state_dim: int, action_dim: int,
                 learning_rate: float = 3e-4, gamma: float = 0.99):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma

        # 策略网络和价值网络
        self.policy_net = PolicyNetwork(state_dim, action_dim)
        self.value_net = ValueNetwork(state_dim)

        # 优化器
        self.policy_optimizer = nn.Adam(
            self.policy_net.trainable_params(), 
            learning_rate=learning_rate
        )
        self.value_optimizer = nn.Adam(
            self.value_net.trainable_params(),
            learning_rate=learning_rate
        )

        self.loss_fn = nn.SparseCategoricalCrossEntropyWithLogits(
            reduction='mean'
        )

    def select_action(self, state: np.ndarray) -> int:
        """根据策略网络选择动作"""
        state_tensor = Tensor(state.reshape(1, -1), ms.float32)
        probs = self.policy_net(state_tensor)

        # 采样而非贪婪选择,增加探索性
        action_probs = probs.asnumpy()[0]
        action = np.random.choice(self.action_dim, p=action_probs)
        return action

    def get_action_prob(self, state: np.ndarray, action: int) -> float:
        """获取特定动作的概率"""
        state_tensor = Tensor(state.reshape(1, -1), ms.float32)
        probs = self.policy_net(state_tensor)
        return float(probs[0][action].asnumpy())

    def compute_returns(self, rewards: list, gamma: float = 0.99) -> list:
        """计算折扣回报"""
        returns = []
        running_return = 0
        for reward in reversed(rewards):
            running_return = reward + gamma * running_return
            returns.insert(0, running_return)
        return returns

    def compute_advantages(self, states: list, rewards: list) -> tuple:
        """计算Advantage A(s,a) = Q(s,a) - V(s)"""
        returns = self.compute_returns(rewards, self.gamma)

        # 计算value estimates
        states_tensor = Tensor(np.array(states), ms.float32)
        values = self.value_net(states_tensor).asnumpy().flatten()

        advantages = []
        for ret, val in zip(returns, values):
            advantages.append(ret - val)

        # 标准化advantage
        advantages = np.array(advantages)
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        return advantages.tolist()

    def update(self, states: list, actions: list, rewards: list):
        """更新网络"""
        states_tensor = Tensor(np.array(states), ms.float32)
        actions_tensor = Tensor(np.array(actions), ms.int32)
        advantages = self.compute_advantages(states, rewards)
        advantages_tensor = Tensor(np.array(advantages), ms.float32)

        # 更新价值网络(最小化MSE)
        values = self.value_net(states_tensor)
        returns_tensor = Tensor(np.array(self.compute_returns(rewards, 1.0)), ms.float32)
        value_loss = nn.MSELoss()(values.squeeze(), returns_tensor)

        self.value_optimizer(value_loss)

        # 更新策略网络(策略梯度)
        probs = self.policy_net(states_tensor)
        log_probs = ops.log(probs + 1e-8)

        action_log_probs = log_probs.gather_elements(1, actions_tensor.unsqueeze(1)).squeeze(1)
        policy_loss = -(action_log_probs * advantages_tensor).mean()

        self.policy_optimizer(policy_loss)

        return float(policy_loss.asnumpy()), float(value_loss.asnumpy())

    def save(self, path: str):
        ms.save_checkpoint(self.policy_net, path)

    def load(self, path: str):
        ms.load_param_into_net(self.policy_net, ms.load_checkpoint(path))


def train_reinforce(episodes: int = 500, max_steps: int = 500):
    """训练REINFORCE智能体"""
    env = CartPoleEnv()
    agent = REINFORCEAgent(state_dim=4, action_dim=2, learning_rate=3e-4)

    episode_rewards = []
    recent_rewards = deque(maxlen=20)

    print("开始训练REINFORCE智能体...")
    for episode in range(episodes):
        state = env._reset()
        states, actions, rewards = [], [], []

        for step in range(max_steps):
            action = agent.select_action(state)
            next_state, reward, done = env.step(action)

            states.append(state)
            actions.append(action)
            rewards.append(reward)

            state = next_state
            if done:
                break

        # 收集完整轨迹后更新
        policy_loss, value_loss = agent.update(states, actions, rewards)

        episode_rewards.append(sum(rewards))
        recent_rewards.append(sum(rewards))
        avg_reward = np.mean(recent_rewards)

        if (episode + 1) % 10 == 0:
            print(f"Episode {episode+1}/{episodes} | "
                  f"奖励: {sum(rewards):.1f} | "
                  f"平均奖励(近20轮): {avg_reward:.1f}")

        if avg_reward >= 195:
            print(f"\n🎉 训练完成!连续20轮平均奖励达到{avg_reward:.1f}")
            break

    return agent, episode_rewards


if __name__ == "__main__":
    ms.set_context(mode=ms.GRAPH_MODE)
    trained_agent, rewards = train_reinforce(episodes=500)

    trained_agent.save("reinforce_cartpole.ckpt")
    print("\n✅ 模型已保存至 reinforce_cartpole.ckpt")

六、DQN vs Policy Gradient对比实验

def compare_algorithms():
    """对比DQN和REINFORCE算法"""
    from collections import deque

    print("=" * 60)
    print("算法对比实验:DQN vs REINFORCE")
    print("=" * 60)

    # DQN训练
    print("\n🚀 开始训练 DQN...")
    ms.set_context(mode=ms.GRAPH_MODE)
    dqn_agent, dqn_rewards = train_dqn(episodes=300)

    # REINFORCE训练
    print("\n🚀 开始训练 REINFORCE...")
    reinforce_agent, reinforce_rewards = train_reinforce(episodes=300)

    # 绘制对比图
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))

    # 左图:训练曲线对比
    ax1 = axes[0]
    ax1.plot(dqn_rewards, alpha=0.4, label='DQN 原始', color='blue')
    dqn_avg = [np.mean(dqn_rewards[max(0,i-10):i+1]) for i in range(len(dqn_rewards))]
    ax1.plot(dqn_avg, linewidth=2, label='DQN 平均', color='blue')

    ax1.plot(reinforce_rewards, alpha=0.4, label='REINFORCE 原始', color='green')
    reinforce_avg = [np.mean(reinforce_rewards[max(0,i-10):i+1]) 
                     for i in range(len(reinforce_rewards))]
    ax1.plot(reinforce_avg, linewidth=2, label='REINFORCE 平均', color='green')

    ax1.set_xlabel('训练轮次')
    ax1.set_ylabel('累计奖励')
    ax1.set_title('训练曲线对比')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # 右图:最终性能统计
    ax2 = axes[1]
    algorithms = ['DQN', 'REINFORCE']
    final_rewards = [
        np.mean(dqn_rewards[-20:]),
        np.mean(reinforce_rewards[-20:])
    ]
    colors = ['#3498db', '#2ecc71']

    bars = ax2.bar(algorithms, final_rewards, color=colors, edgecolor='black')
    ax2.set_ylabel('平均累计奖励(近20轮)')
    ax2.set_title('最终性能对比')
    ax2.set_ylim(0, 250)

    for bar, reward in zip(bars, final_rewards):
        ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 5,
                f'{reward:.1f}', ha='center', fontsize=12, fontweight='bold')

    plt.tight_layout()
    plt.savefig('algorithm_comparison.png', dpi=150)
    plt.show()

    print("\n" + "=" * 60)
    print("实验结论:")
    print(f"  DQN最终平均奖励: {np.mean(dqn_rewards[-20:]):.2f}")
    print(f"  REINFORCE最终平均奖励: {np.mean(reinforce_rewards[-20:]):.2f}")
    print("=" * 60)


if __name__ == "__main__":
    compare_algorithms()

七、深度强化学习进阶主题

7.1 DDPG算法(连续动作空间)

DDPG(Deep Deterministic Policy Gradient)适用于连续动作空间,结合了DQN的经验回放和策略梯度方法:

class DDPGAgent:
    """DDPG智能体 - 连续动作控制"""
    def __init__(self, state_dim: int, action_dim: int, 
                 action_bound: float = 2.0):
        self.action_bound = action_bound

        # Actor网络(策略网络)
        self.actor = nn.SequentialCell([
            nn.Dense(state_dim, 400, weight_init=HeNormal()),
            nn.ReLU(),
            nn.Dense(400, 300, weight_init=HeNormal()),
            nn.ReLU(),
            nn.Dense(300, action_dim, weight_init=HeNormal()),
            nn.Tanh()
        ])

        # Critic网络(价值网络)
        self.critic = nn.SequentialCell([
            nn.Dense(state_dim + action_dim, 400, weight_init=HeNormal()),
            nn.ReLU(),
            nn.Dense(400, 300, weight_init=HeNormal()),
            nn.ReLU(),
            nn.Dense(300, 1, weight_init=HeNormal())
        ])

        # 目标网络
        self.target_actor = QNetwork(state_dim, action_dim)
        self.target_critic = QNetwork(state_dim + action_dim, 1)
        self._sync_target()

        self.actor_optimizer = nn.Adam(self.actor.trainable_params(), 1e-4)
        self.critic_optimizer = nn.Adam(self.critic.trainable_params(), 1e-3)

        self.replay_buffer = ReplayBuffer(capacity=100000)

    def _sync_target(self):
        """软更新目标网络"""
        for tp, p in zip(self.target_actor.get_parameters(), 
                         self.actor.get_parameters()):
            tp.set_data(tp.data * 0.995 + p.data * 0.005)
        for tp, p in zip(self.target_critic.get_parameters(),
                         self.critic.get_parameters()):
            tp.set_data(tp.data * 0.995 + p.data * 0.005)

    def select_action(self, state: np.ndarray, noise_scale: float = 0.1) -> np.ndarray:
        """带噪声的动作选择"""
        state_tensor = Tensor(state.reshape(1, -1), ms.float32)
        action = self.actor(state_tensor).asnumpy()[0]

        # 添加探索噪声
        noise = np.random.normal(0, noise_scale, size=action.shape)
        action = np.clip(action + noise, -self.action_bound, self.action_bound)
        return action

7.2 多智能体强化学习

MindSpore RL支持多智能体环境,以下是简化的多智能体示例:

class MultiAgentExample:
    """多智能体强化学习示例"""
    def __init__(self, num_agents: int, state_dim: int, action_dim: int):
        self.num_agents = num_agents

        # 每个智能体独立的网络
        self.agents = [
            DQNAgent(state_dim, action_dim)
            for _ in range(num_agents)
        ]

    def share_experience(self, experiences: list):
        """共享经验池示例"""
        for exp in experiences:
            for i, agent in enumerate(self.agents):
                agent.store_transition(*exp[i])

    def train_all(self, batch_size: int):
        """并行训练所有智能体"""
        losses = []
        for agent in self.agents:
            loss = agent.train(batch_size)
            losses.append(loss)
        return np.mean(losses)

八、实战技巧与最佳实践

8.1 超参数调优建议

强化学习对超参数敏感,以下是经验性的调参建议:

超参数 典型范围 调参建议
学习率 1e-4 ~ 1e-3 策略网络用较小学习率
折扣因子γ 0.95 ~ 0.999 任务horizon越长,γ越大
探索率ε 1.0 → 0.01 衰减速率根据任务调整
批量大小 32 ~ 256 大batch训练更稳定
目标网络更新频率 100 ~ 1000步 更新太频繁不稳定

8.2 常见问题与解决方案

  1. 训练不收敛:检查奖励函数设计,降低学习率,增加经验回放容量

  2. Q值过高估计:使用Double DQN或调整目标网络更新频率

  3. 探索不足:增大初始探索率,增加噪声注入

8.3 性能优化技巧

# 1. 使用向量化的环境交互
def batch_collect(agent, env, num_steps: int):
    states, actions, rewards, next_states, dones = [], [], [], [], []
    state = env.reset()

    for _ in range(num_steps):
        action = agent.select_action(state)
        next_state, reward, done = env.step(action)

        states.append(state)
        actions.append(action)
        rewards.append(reward)
        next_states.append(next_state)
        dones.append(done)

        if done:
            state = env.reset()
        else:
            state = next_state

    return np.array(states), np.array(actions), np.array(rewards), \
           np.array(next_states), np.array(dones)


# 2. 异步训练(多进程)
import multiprocessing as mp

def parallel_training(num_workers: int = 4):
    """并行收集经验的示例"""
    processes = []
    for i in range(num_workers):
        p = mp.Process(target=worker_train, args=(i,))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

九、总结

本文系统介绍了基于MindSpore的强化学习实战技术。我们从强化学习基础概念出发,深入讲解了DQN和REINFORCE两大核心算法的原理与实现,通过CartPole环境展示了完整的训练流程,并进行了算法对比实验。此外,我们还介绍了DDPG和多智能体强化学习等进阶主题,以及实用的调参技巧和性能优化方法。

MindSpore RL提供了丰富的强化学习算法支持和高效的计算性能,使得研究者可以快速验证新想法,工程师可以便捷地部署强化学习应用。期待读者在此基础上进一步探索,在游戏AI、机器人控制、推荐系统等领域实现创新突破。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。