星云闪耀——AI控制登月器着陆——A2C算法控制登月器着陆

举报
o0龙龙0o 发表于 2021/11/15 19:51:12 2021/11/15
【摘要】 这个实验是利用A2C算法实现的一个登月小游戏。

0前沿

首先收一下,这个实验是利用A2C算法实现的一个登月小游戏。

https://marketplace.huaweicloud.com/markets/aihub/notebook/detail/?id=e35334ab-11d1-4b5c-bece-955fa48b8044

通过上面地址我们可以看一下我们需要实验的内容。和实验的过程。

A2C的算法基本可以看下面图片,是一种从优的计算过程。

1、实践

通过ModelArts进行实验过程,感谢华为提供的资源。

该实验需要是到GPU,首先进行资源切换,如下图

一会即可以切换资源

执行资源安装代码如下:

!pip install gym
!conda install swig -y
!pip install box2d-py
!pip install gym[box2d]

安装之后出现下图:

导入库:

import os
import gym
import numpy as np
import tensorflow as tf
import pandas as pd

导入参数

MAX_EPISODE = 100               # 游戏最大局数
DISPLAY_REWARD_THRESHOLD = 100  # 开启可视化的reward阈值
SAVE_REWARD_THRESHOLD = 100     # 保存模型的reward阈值
MAX_EP_STEPS = 2000             # 每局最大步长
TEST_EPISODE = 10               # 测试局
RENDER = False                  # 是否启用可视化(耗时)
GAMMA = 0.9                     # TD error中reward衰减系数
RUNNING_REWARD_DECAY=0.95       # running reward 衰减系数
LR_A = 0.001                    # Actor网络的学习率
LR_C = 0.01                     # Critic网络学习率
NUM_UNITS = 20                  # FC层神经元个数
SEED = 1                        # 种子数,减小随机性
SAVE_EPISODES = 20              # 保存模型的局数
model_dir = './models'          # 模型保存路径

游戏环境创建和网络构建

def create_env():
    env = gym.make('LunarLander-v2')
    # 减少随机性
    env.seed(SEED)
    env = env.unwrapped
    num_features = env.observation_space.shape[0]
    num_actions = env.action_space.n
    return env, num_features, num_actions

class Actor:
    """
    Actor网络

    Parameters
    ----------
    sess : tensorflow.Session()

    n_features : int
        特征维度

    n_actions : int
        动作空间大小

    lr : float
        学习率大小
    """

    def __init__(self, sess, n_features, n_actions, lr=0.001):
        self.sess = sess
        # 状态空间
        self.s = tf.placeholder(tf.float32, [1, n_features], "state")
        # 动作空间
        self.a = tf.placeholder(tf.int32, None, "action")
        # TD_error
        self.td_error = tf.placeholder(tf.float32, None, "td_error")

        # actor网络为两层全连接层,输出为动作概率
        with tf.variable_scope('Actor'):
            l1 = tf.layers.dense(
                inputs=self.s,
                units=NUM_UNITS,
                activation=tf.nn.relu,
                kernel_initializer=tf.random_normal_initializer(0., .1),
                bias_initializer=tf.constant_initializer(0.1),
                name='l1'
            )

            self.acts_prob = tf.layers.dense(
                inputs=l1,
                units=n_actions,
                activation=tf.nn.softmax,
                kernel_initializer=tf.random_normal_initializer(0., .1),
                bias_initializer=tf.constant_initializer(0.1),
                name='acts_prob'
            )

        with tf.variable_scope('exp_v'):
            log_prob = tf.log(self.acts_prob[0, self.a])
            # 损失函数
            self.exp_v = tf.reduce_mean(log_prob * self.td_error)

        with tf.variable_scope('train'):
             # minimize(-exp_v) = maximize(exp_v)
            self.train_op = tf.train.AdamOptimizer(lr).minimize(-self.exp_v)

    def learn(self, s, a, td):
        s = s[np.newaxis, :]
        feed_dict = {self.s: s, self.a: a, self.td_error: td}
        _, exp_v = self.sess.run([self.train_op, self.exp_v], feed_dict)
        return exp_v

    # 生成动作
    def choose_action(self, s):
        s = s[np.newaxis, :]
        probs = self.sess.run(self.acts_prob, {self.s: s})  
        return np.random.choice(np.arange(probs.shape[1]), p=probs.ravel())

class Critic:
    """
    Critic网络

    Parameters
    ----------
    sess : tensorflow.Session()

    n_features : int
        特征维度

    lr : float
        学习率大小
    """

    def __init__(self, sess, n_features, lr=0.01):
        self.sess = sess

        # 状态空间
        self.s = tf.placeholder(tf.float32, [1, n_features], "state")
        # value值        
        self.v_ = tf.placeholder(tf.float32, [1, 1], "v_next")
        # 奖励           
        self.r = tf.placeholder(tf.float32, None, 'r')

        # critic网络为两层全连接层,输出为value值
        with tf.variable_scope('Critic'):
            l1 = tf.layers.dense(
                inputs=self.s,
                # number of hidden units
                units=NUM_UNITS,
                activation=tf.nn.relu, 
                kernel_initializer=tf.random_normal_initializer(0., .1),  
                bias_initializer=tf.constant_initializer(0.1),  
                name='l1'
            )

            self.v = tf.layers.dense(
                inputs=l1,
                 # output units
                units=1,
                activation=None,
                kernel_initializer=tf.random_normal_initializer(0., .1),  
                bias_initializer=tf.constant_initializer(0.1), 
                name='V'
            )

        with tf.variable_scope('squared_TD_error'):
            self.td_error = self.r + GAMMA * self.v_ - self.v
            # TD_error = (r+gamma*V_next) - V_eval
            self.loss = tf.square(self.td_error)
        with tf.variable_scope('train'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)

    def learn(self, s, r, s_):
        s, s_ = s[np.newaxis, :], s_[np.newaxis, :]

        v_ = self.sess.run(self.v, {self.s: s_})
        td_error, _ = self.sess.run([self.td_error, self.train_op],
                                    {self.s: s, self.v_: v_, self.r: r})
        return td_error

 创建训练函数

def model_train():
    env, num_features, num_actions = create_env()
    render = RENDER
    sess = tf.Session()
    actor = Actor(sess, n_features=num_features, n_actions=num_actions, lr=LR_A)
    critic = Critic(sess, n_features=num_features, lr=LR_C)

    sess.run(tf.global_variables_initializer())
    saver = tf.train.Saver()

    for i_episode in range(MAX_EPISODE+1):
        cur_state = env.reset()
        cur_step = 0
        track_r = []
        while True:
            # notebook暂不支持该游戏的可视化
            # if RENDER:
            # env.render()
            action = actor.choose_action(cur_state)
            next_state, reward, done, info = env.step(action)
            track_r.append(reward)
            # gradient = grad[reward + gamma * V(next_state) - V(cur_state)]
            td_error = critic.learn(cur_state, reward,
                                    next_state)
            # true_gradient = grad[logPi(cur_state,action) * td_error]
            actor.learn(cur_state, action, td_error)  
            cur_state = next_state
            cur_step += 1
            if done or cur_step >= MAX_EP_STEPS:
                ep_rs_sum = sum(track_r)

                if 'running_reward' not in locals():
                    running_reward = ep_rs_sum
                else:
                    running_reward = running_reward * RUNNING_REWARD_DECAY + ep_rs_sum * (1-RUNNING_REWARD_DECAY)
                # 判断是否达到可视化阈值
                # if running_reward > DISPLAY_REWARD_THRESHOLD:
                #     render = True
                print("episode:", i_episode, "  reward:", int(running_reward), "  steps:", cur_step)
                break
        if i_episode > 0 and i_episode % SAVE_EPISODES == 0:
            if not os.path.exists(model_dir):
                os.mkdir(model_dir)
            ckpt_path = os.path.join(model_dir, '{}_model.ckpt'.format(i_episode))
            saver.save(sess, ckpt_path)

开始训练

print('MAX_EPISODE:', MAX_EPISODE)
model_train()
# reset graph
tf.reset_default_graph()

大约2分钟可以完成,需要为等一下。这个步骤很重要的

结束后可以看到我们制定最大episode数目。

最后进行模型推理,为了可视化,将这里的env.render()进行输出。

def model_test():
    env, num_features, num_actions = create_env()
    sess = tf.Session()
    actor = Actor(sess, n_features=num_features, n_actions=num_actions, lr=LR_A)
    sess.run(tf.global_variables_initializer())
    saver = tf.train.Saver()
    saver.restore(sess, tf.train.latest_checkpoint(model_dir))
    for i_episode in range(TEST_EPISODE):
        cur_state = env.reset()
        cur_step = 0
        track_r = []
        while True:
            # 可视化
            # env.render()
            action = actor.choose_action(cur_state)
            next_state, reward, done, info = env.step(action)
            track_r.append(reward)
            cur_state = next_state
            cur_step += 1

            if done or cur_step >= MAX_EP_STEPS:
                ep_rs_sum = sum(track_r)
                print("episode:", i_episode, "  reward:", int(ep_rs_sum), "  steps:", cur_step)
                break

最后我们可以看到动画

为了加密代码,我们可以用!env | grep PROJECT_ID命令实现

利用“+”增加这条命令并执行,可以到加密的ID结果,如下图:

最后一定要记得释放资源,不然别人无法使用。

该项目很好地学习了A2C模型过程还有AI训练的过程,对于AI初学者很有好处。 

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。