【摘要】 A3C算法(Asynchronous advantage actor critic)是基于Actor-critic架构提出来的一种并行强化学习算法,解决了单个智能体与环境交互收集速度慢,训练难以收敛的问题。本案例基于单机多进程的方法实现了对ATARI游戏PONG智能体的训练。在16个并行环境下,本案例中的智能体能在14-24min训练后解决ATARI_PONG游戏。




  1. 了解A3C算法的基本概念
  2. 了解如何基于A3C训练ATARI游戏
  3. 了解强化学习训练推理游戏的整体流程

A3C算法(Asynchronous advantage actor critic)是基于Actor-critic架构提出来的一种并行强化学习算法,解决了单个智能体与环境交互收集速度慢,训练难以收敛的问题。本案例基于单机多进程的方法实现了对ATARI游戏PONG智能体的训练。在16个并行环境下,本案例中的智能体能在14-24min训练后解决ATARI_PONG游戏。









  1. 本案例的基于lockfree的并行梯度下降算法(Hogwild!),避免了分布式系统的通信开销,实现了高效的梯度数据同步。
  2. 多个独立的并行环境有助于降低数据之间的相关性,同时增加智能体的exploration。


Policy Loss,Value Loss,Regularizaiton with Policy Entropy。

Atari Pong环境介绍

Pong是起源于1972年美国的一款模拟两个人打乒乓球的游戏,近几年常用于测试强化学习算法的性能。游戏规则:智能体玩一边的球拍(AI控制另一个),将球打到另一边。对手失球得1分,智能体失球对手得1分,先达到21分的获得游戏胜利。游戏环境输出的标准observation为(210,160, 3)的RGB图像。



1. 程序初始化


import os
import math
import time
import argparse
import collections
from collections import deque

import cv2
import numpy as np
import gym
from gym.spaces.box import Box
import torch
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
import torch.nn.functional as F
import matplotlib
import matplotlib.pyplot as plt
from IPython import display
import moxing as mox
%matplotlib inline
mox.file.copy_parallel("obs://modelarts-labs-bj4-v2/course/modelarts/reinforcement_learning/pong_A3C/model/Pong_A3C_pretrained.pth", "model/Pong_A3C_pretrained.pth")

2. 定义基于共享内存的Adam优化器

class SharedAdam(optim.Adam):

    def __init__(self,
                 betas=(0.9, 0.999),
        super(SharedAdam, self).__init__(params, lr, betas, eps, weight_decay)

        for group in self.param_groups:
            for p in group['params']:
                state = self.state[p]
                state['step'] = torch.zeros(1)
                state['exp_avg'] = p.data.new().resize_as_(p.data).zero_()
                state['exp_avg_sq'] = p.data.new().resize_as_(p.data).zero_()

    def share_memory(self):
        for group in self.param_groups:
            for p in group['params']:
                state = self.state[p]

    def step(self, closure=None):
            closure (callable, optional): 用于重新评估模型并返回损失函数的闭包
        loss = None
        if closure is not None:
            loss = closure()

        for group in self.param_groups:
            for p in group['params']:
                if p.grad is None:
                grad = p.grad.data
                state = self.state[p]

                exp_avg, exp_avg_sq = state['exp_avg'], state['exp_avg_sq']
                beta1, beta2 = group['betas']

                state['step'] += 1

                if group['weight_decay'] != 0:
                    grad = grad.add(group['weight_decay'], p.data)

                # 一二阶moment系数的指数衰减
                exp_avg.mul_(beta1).add_(1 - beta1, grad)
                exp_avg_sq.mul_(beta2).addcmul_(1 - beta2, grad, grad)

                denom = exp_avg_sq.sqrt().add_(group['eps'])

                bias_correction1 = 1 - beta1 ** state['step'].item()
                bias_correction2 = 1 - beta2 ** state['step'].item()
                step_size = group['lr'] * math.sqrt(
                    bias_correction2) / bias_correction1

                p.data.addcdiv_(-step_size, exp_avg, denom)

        return loss

3. 预处理ATARI环境输入

# 预处理方式参考 https://github.com/openai/universe-starter-agent
def create_atari_env(env_id):
    env = gym.make(env_id)
    env = AtariRescale42x42(env)
    env = NormalizedEnv(env)
    return env
def _process_frame42(frame):
    frame = frame[34:34 + 160, :160]
    # 将图像输入裁剪为尺寸[42, 42]
    frame = cv2.resize(frame, (80, 80))
    frame = cv2.resize(frame, (42, 42))
    frame = frame.mean(2, keepdims=True)
    frame = frame.astype(np.float32)
    frame *= (1.0 / 255.0)
    frame = np.moveaxis(frame, -1, 0)
    return frame
class AtariRescale42x42(gym.ObservationWrapper):
    def __init__(self, env=None):
        gym.ObservationWrapper.__init__(self, env)
        self.observation_space = Box(0.0, 1.0, [1, 42, 42])

    def observation(self, observation):
        return _process_frame42(observation)
class NormalizedEnv(gym.ObservationWrapper):
    def __init__(self, env=None):
        gym.ObservationWrapper.__init__(self, env)
        self.state_mean = 0
        self.state_std = 0
        self.alpha = 0.9999
        self.num_steps = 0

    def observation(self, observation):
        self.num_steps += 1
        self.state_mean = self.state_mean * self.alpha + \
            observation.mean() * (1 - self.alpha)
        self.state_std = self.state_std * self.alpha + \
            observation.std() * (1 - self.alpha)

        unbiased_mean = self.state_mean / (1 - pow(self.alpha, self.num_steps))
        unbiased_std = self.state_std / (1 - pow(self.alpha, self.num_steps))

        return (observation - unbiased_mean) / (unbiased_std + 1e-8)

4. 定义神经网络

# 初始化权重张量的方差
def normalized_columns_initializer(weights, std=1.0):
    out = torch.randn(weights.size())
    out *= std / torch.sqrt(out.pow(2).sum(1, keepdim=True))
    return out

# 初始化神经网络参数
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        weight_shape = list(m.weight.data.size())
        fan_in = np.prod(weight_shape[1:4])
        fan_out = np.prod(weight_shape[2:4]) * weight_shape[0]
        w_bound = np.sqrt(6. / (fan_in + fan_out))
        m.weight.data.uniform_(-w_bound, w_bound)
    elif classname.find('Linear') != -1:
        weight_shape = list(m.weight.data.size())
        fan_in = weight_shape[1]
        fan_out = weight_shape[0]
        w_bound = np.sqrt(6. / (fan_in + fan_out))
        m.weight.data.uniform_(-w_bound, w_bound)
class ActorCritic(torch.nn.Module):
    def __init__(self, num_inputs, action_space):
        super(ActorCritic, self).__init__()
        self.conv1 = nn.Conv2d(num_inputs, 32, 3, stride=2, padding=1)
        self.conv2 = nn.Conv2d(32, 32, 3, stride=2, padding=1)
        self.conv3 = nn.Conv2d(32, 32, 3, stride=2, padding=1)
        self.conv4 = nn.Conv2d(32, 32, 3, stride=2, padding=1)
        # 使用LSTM来获取图像输入的时序信息
        self.lstm = nn.LSTMCell(32 * 3 * 3, 256)
        num_outputs = action_space.n
        self.critic_linear = nn.Linear(256, 1)
        self.actor_linear = nn.Linear(256, num_outputs)

        self.actor_linear.weight.data = normalized_columns_initializer(
            self.actor_linear.weight.data, 0.01)
        self.critic_linear.weight.data = normalized_columns_initializer(
            self.critic_linear.weight.data, 1.0)

    def forward(self, inputs):
        inputs, (hx, cx) = inputs
        x = F.elu(self.conv1(inputs))
        x = F.elu(self.conv2(x))
        x = F.elu(self.conv3(x))
        x = F.elu(self.conv4(x))
        # 将最后一层卷积网络的输出展开成一维向量
        x = x.view(-1, 32 * 3 * 3)
        # LSTM层输入x, 上一时刻的hidden state和cell state, 输出新的hiden state和cell state
        hx, cx = self.lstm(x, (hx, cx))
        x = hx
        return self.critic_linear(x), self.actor_linear(x), (hx, cx)

5. 策略评估函数

def test(rank, args, shared_model, counter):
    torch.manual_seed(args.seed + rank)

    env = create_atari_env(args.env_name)
    env.seed(args.seed + rank)

    model = ActorCritic(env.observation_space.shape[0], env.action_space)

    state = env.reset()
    state = torch.from_numpy(state)
    reward_sum = 0
    done = True
    start_time = time.time()

    # 防止agent陷入局部最优
    actions = deque(maxlen=100)
    episode_length = 0
    while True:
        episode_length += 1
        # 和共享模型进行参数同步
        if done:
            cx = torch.zeros(1, 256)
            hx = torch.zeros(1, 256)
            cx = cx.detach()
            hx = hx.detach()

        with torch.no_grad():
            value, logit, (hx, cx) = model((state.unsqueeze(0), (hx, cx)))
        prob = F.softmax(logit, dim=-1)
        # 推理时,动作选择不采用探索
        action = prob.max(1, keepdim=True)[1].numpy()

        state, reward, done, _ = env.step(action[0, 0])
        done = done or episode_length >= args.max_episode_length
        reward_sum += reward

        # 防止agent陷入局部最优
        actions.append(action[0, 0])
        if actions.count(actions[0]) == actions.maxlen:
            done = True

        if done:
            print("Time {}, num steps {}, FPS {:.0f}, episode reward {}, episode length {}".format(
                time.strftime("%Hh %Mm %Ss",
                              time.gmtime(time.time() - start_time)),
                counter.value, counter.value / (time.time() - start_time),
                reward_sum, episode_length))
            reward_sum = 0
            episode_length = 0
            state = env.reset()

        state = torch.from_numpy(state)
        if counter.value > args.max_steps:
    print('testing ends')

6. 模型训练函数

# 将共享Adam的优化器指向train进程内的优化器实例
def ensure_shared_grads(model, shared_model):
    for param, shared_param in zip(model.parameters(),
        if shared_param.grad is not None:
        shared_param._grad = param.grad
def train(rank, args, shared_model, counter, lock, optimizer=None):
    torch.manual_seed(args.seed + rank)

    env = create_atari_env(args.env_name)
    env.seed(args.seed + rank)

    model = ActorCritic(env.observation_space.shape[0], env.action_space)

    if optimizer is None:
        optimizer = optim.Adam(shared_model.parameters(), lr=args.lr)


    state = env.reset()
    state = torch.from_numpy(state)
    done = True
    max_steps = 10000
    episode_length = 0
    while True:
        # 同步模型参数
        # 回合开始前初始化lstm的状态
        if done:
            cx = torch.zeros(1, 256)
            hx = torch.zeros(1, 256)
            cx = cx.detach()
            hx = hx.detach()

        values = []
        log_probs = []
        rewards = []
        entropies = []

        for step in range(args.num_steps):
            episode_length += 1
            value, logit, (hx, cx) = model((state.unsqueeze(0),
                                            (hx, cx)))
            prob = F.softmax(logit, dim=-1)
            log_prob = F.log_softmax(logit, dim=-1)
            # H(p) = - sum_x p(x).log(p(x))
            entropy = -(log_prob * prob).sum(1, keepdim=True)
            # 根据动作概率密度函数进行采样
            action = prob.multinomial(num_samples=1).detach()
            log_prob = log_prob.gather(1, action)

            state, reward, done, _ = env.step(action.numpy())
            done = done or episode_length >= args.max_episode_length
            # reward裁剪
            reward = max(min(reward, 1), -1)

            with lock:
                counter.value += 1

            if done:
                episode_length = 0
                state = env.reset()

            state = torch.from_numpy(state)

            if done:

        R = torch.zeros(1, 1)
        if not done:
            value, _, _ = model((state.unsqueeze(0), (hx, cx)))
            R = value.detach()

        policy_loss = 0
        value_loss = 0
        gae = torch.zeros(1, 1)
        for i in reversed(range(len(rewards))):
            R = args.gamma * R + rewards[i]
            advantage = R - values[i]
            value_loss = value_loss + 0.5 * advantage.pow(2)

            # GAE实现
            delta_t = rewards[i] + args.gamma * \
                values[i + 1] - values[i]
            gae = gae * args.gamma * args.gae_lambda + delta_t

            policy_loss = policy_loss - \
                log_probs[i] * gae.detach() - args.entropy_coef * entropies[i]


        (policy_loss + args.value_loss_coef * value_loss).backward()
        # 梯度裁剪,0~40
        torch.nn.utils.clip_grad_norm_(model.parameters(), args.max_grad_norm)

        ensure_shared_grads(model, shared_model)
        if counter.value > args.max_steps:
    print('training ends')

7. 训练超参数设置

为快速验证训练代码,默认进程数 num-processes 设置为4,最大训练步数max-steps设置为20000。


# 超参数设置参考 https://github.com/pytorch/examples/tree/master/mnist_hogwild
parser = argparse.ArgumentParser(description='A3C')
parser.add_argument('--lr', type=float, default=0.0001,
                    help='learning rate (default: 0.0001)')                         # 学习率
parser.add_argument('--gamma', type=float, default=0.99,
                    help='discount factor for rewards (default: 0.99)')             # 奖励折扣系数
parser.add_argument('--gae-lambda', type=float, default=1.00,
                    help='lambda parameter for GAE (default: 1.00)')                # GAE系数
parser.add_argument('--entropy-coef', type=float, default=0.01,
                    help='entropy term coefficient (default: 0.01)')                # 熵系数
parser.add_argument('--value-loss-coef', type=float, default=0.5,
                    help='value loss coefficient (default: 0.5)')                   # 价值函数折扣系数
parser.add_argument('--max-grad-norm', type=float, default=50,
                    help='clip the global norm of gradients (default: 50)')         # 梯度裁剪
parser.add_argument('--seed', type=int, default=1,
                    help='random seed (default: 1)')                                # 随机种子
parser.add_argument('--num-processes', type=int, default=4,
                    help='how many training processes to use (default: 4)')         # 并行进程数
parser.add_argument('--num-steps', type=int, default=20,
                    help='number of forward steps in A3C (default: 20)')             # 每次采样步数
parser.add_argument('--max-episode-length', type=int, default=1000000,
                    help='maximum length of an episode (default: 1000000)')          # 最大回合步数
parser.add_argument('--env-name', default='PongDeterministic-v4',
                    help='environment to train on (default: PongDeterministic-v4)')  # 环境名称
parser.add_argument('--no-shared', default=False,
                    help='use an optimizer without shared momentum.')                # 共享参数的优化器
parser.add_argument('--max-steps', type=int, default=20000,
                    help='number of max steps for training  (default: 20000)')       # 最大训练步数

# 保证每个核心运行单个进程
os.environ['OMP_NUM_THREADS'] = '1'
# 本案例多进程运行在CPU上
os.environ['CUDA_VISIBLE_DEVICES'] = ""

args = parser.parse_args(args=[])

8. 启动多进程训练

# 创建预处理后的环境
env = create_atari_env(args.env_name)
# 创建共享参数的模型
shared_model = ActorCritic(env.observation_space.shape[0], env.action_space)
# 将共享模型存储在共享内存上

# 创建共享参数的ADAM优化器
if args.no_shared:
    optimizer = None
    optimizer = SharedAdam(shared_model.parameters(), lr=args.lr)

processes = []

counter = mp.Value('i', 0)
lock = mp.Lock()

p = mp.Process(target=test, args=(args.num_processes, args, shared_model, counter))

for rank in range(0, args.num_processes):
    p = mp.Process(target=train, args=(rank, args, shared_model, counter, lock, optimizer))
for p in processes:
9. 加载模型进行可视化游戏推理

展现训练效果,加载在num-processes=16, max_steps=2000000参数下设置的预训练模型,14min内可以快速解决ATARI Pong。

def infer(args):
    env = create_atari_env(args.env_name)

    model = ActorCritic(env.observation_space.shape[0], env.action_space)
    state = env.reset()
    img = plt.imshow(env.render(mode='rgb_array'))
    state = torch.from_numpy(state)
    reward_sum = 0
    done = True
    start_time = time.time()

    # 防止agent陷入局部最优
    actions = deque(maxlen=100)
    episode_length = 0
    iters = 0
    while iters < 1 :
        episode_length += 1
        if done:
            cx = torch.zeros(1, 256)
            hx = torch.zeros(1, 256)
            cx = cx.detach()
            hx = hx.detach()

        with torch.no_grad():
            value, logit, (hx, cx) = model((state.unsqueeze(0), (hx, cx)))
        prob = F.softmax(logit, dim=-1)
        action = prob.max(1, keepdim=True)[1].numpy()

        state, reward, done, _ = env.step(action[0, 0])
        done = done or episode_length >= args.max_episode_length
        reward_sum += reward

        # 防止agent陷入局部最优
        actions.append(action[0, 0])
        if actions.count(actions[0]) == actions.maxlen:
            done = True
        if done:
            reward_sum = 0
            episode_length = 0
            state = env.reset()
            iters += 1    
        state = torch.from_numpy(state)



10. 作业

  1. 请你调整步骤7中的训练参数,重新训练一个模型,使它在游戏中获得更好的表现
