星云闪耀——AI控制登月器着陆——A2C算法控制登月器着陆
【摘要】 这个实验是利用A2C算法实现的一个登月小游戏。
0前沿
首先收一下,这个实验是利用A2C算法实现的一个登月小游戏。
https://marketplace.huaweicloud.com/markets/aihub/notebook/detail/?id=e35334ab-11d1-4b5c-bece-955fa48b8044
通过上面地址我们可以看一下我们需要实验的内容。和实验的过程。
A2C的算法基本可以看下面图片,是一种从优的计算过程。
1、实践
通过ModelArts进行实验过程,感谢华为提供的资源。
该实验需要是到GPU,首先进行资源切换,如下图
一会即可以切换资源
执行资源安装代码如下:
!pip install gym
!conda install swig -y
!pip install box2d-py
!pip install gym[box2d]
安装之后出现下图:
导入库:
import os
import gym
import numpy as np
import tensorflow as tf
import pandas as pd
导入参数
MAX_EPISODE = 100 # 游戏最大局数
DISPLAY_REWARD_THRESHOLD = 100 # 开启可视化的reward阈值
SAVE_REWARD_THRESHOLD = 100 # 保存模型的reward阈值
MAX_EP_STEPS = 2000 # 每局最大步长
TEST_EPISODE = 10 # 测试局
RENDER = False # 是否启用可视化(耗时)
GAMMA = 0.9 # TD error中reward衰减系数
RUNNING_REWARD_DECAY=0.95 # running reward 衰减系数
LR_A = 0.001 # Actor网络的学习率
LR_C = 0.01 # Critic网络学习率
NUM_UNITS = 20 # FC层神经元个数
SEED = 1 # 种子数,减小随机性
SAVE_EPISODES = 20 # 保存模型的局数
model_dir = './models' # 模型保存路径
游戏环境创建和网络构建
def create_env():
env = gym.make('LunarLander-v2')
# 减少随机性
env.seed(SEED)
env = env.unwrapped
num_features = env.observation_space.shape[0]
num_actions = env.action_space.n
return env, num_features, num_actions
class Actor:
"""
Actor网络
Parameters
----------
sess : tensorflow.Session()
n_features : int
特征维度
n_actions : int
动作空间大小
lr : float
学习率大小
"""
def __init__(self, sess, n_features, n_actions, lr=0.001):
self.sess = sess
# 状态空间
self.s = tf.placeholder(tf.float32, [1, n_features], "state")
# 动作空间
self.a = tf.placeholder(tf.int32, None, "action")
# TD_error
self.td_error = tf.placeholder(tf.float32, None, "td_error")
# actor网络为两层全连接层,输出为动作概率
with tf.variable_scope('Actor'):
l1 = tf.layers.dense(
inputs=self.s,
units=NUM_UNITS,
activation=tf.nn.relu,
kernel_initializer=tf.random_normal_initializer(0., .1),
bias_initializer=tf.constant_initializer(0.1),
name='l1'
)
self.acts_prob = tf.layers.dense(
inputs=l1,
units=n_actions,
activation=tf.nn.softmax,
kernel_initializer=tf.random_normal_initializer(0., .1),
bias_initializer=tf.constant_initializer(0.1),
name='acts_prob'
)
with tf.variable_scope('exp_v'):
log_prob = tf.log(self.acts_prob[0, self.a])
# 损失函数
self.exp_v = tf.reduce_mean(log_prob * self.td_error)
with tf.variable_scope('train'):
# minimize(-exp_v) = maximize(exp_v)
self.train_op = tf.train.AdamOptimizer(lr).minimize(-self.exp_v)
def learn(self, s, a, td):
s = s[np.newaxis, :]
feed_dict = {self.s: s, self.a: a, self.td_error: td}
_, exp_v = self.sess.run([self.train_op, self.exp_v], feed_dict)
return exp_v
# 生成动作
def choose_action(self, s):
s = s[np.newaxis, :]
probs = self.sess.run(self.acts_prob, {self.s: s})
return np.random.choice(np.arange(probs.shape[1]), p=probs.ravel())
class Critic:
"""
Critic网络
Parameters
----------
sess : tensorflow.Session()
n_features : int
特征维度
lr : float
学习率大小
"""
def __init__(self, sess, n_features, lr=0.01):
self.sess = sess
# 状态空间
self.s = tf.placeholder(tf.float32, [1, n_features], "state")
# value值
self.v_ = tf.placeholder(tf.float32, [1, 1], "v_next")
# 奖励
self.r = tf.placeholder(tf.float32, None, 'r')
# critic网络为两层全连接层,输出为value值
with tf.variable_scope('Critic'):
l1 = tf.layers.dense(
inputs=self.s,
# number of hidden units
units=NUM_UNITS,
activation=tf.nn.relu,
kernel_initializer=tf.random_normal_initializer(0., .1),
bias_initializer=tf.constant_initializer(0.1),
name='l1'
)
self.v = tf.layers.dense(
inputs=l1,
# output units
units=1,
activation=None,
kernel_initializer=tf.random_normal_initializer(0., .1),
bias_initializer=tf.constant_initializer(0.1),
name='V'
)
with tf.variable_scope('squared_TD_error'):
self.td_error = self.r + GAMMA * self.v_ - self.v
# TD_error = (r+gamma*V_next) - V_eval
self.loss = tf.square(self.td_error)
with tf.variable_scope('train'):
self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)
def learn(self, s, r, s_):
s, s_ = s[np.newaxis, :], s_[np.newaxis, :]
v_ = self.sess.run(self.v, {self.s: s_})
td_error, _ = self.sess.run([self.td_error, self.train_op],
{self.s: s, self.v_: v_, self.r: r})
return td_error
创建训练函数
def model_train():
env, num_features, num_actions = create_env()
render = RENDER
sess = tf.Session()
actor = Actor(sess, n_features=num_features, n_actions=num_actions, lr=LR_A)
critic = Critic(sess, n_features=num_features, lr=LR_C)
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
for i_episode in range(MAX_EPISODE+1):
cur_state = env.reset()
cur_step = 0
track_r = []
while True:
# notebook暂不支持该游戏的可视化
# if RENDER:
# env.render()
action = actor.choose_action(cur_state)
next_state, reward, done, info = env.step(action)
track_r.append(reward)
# gradient = grad[reward + gamma * V(next_state) - V(cur_state)]
td_error = critic.learn(cur_state, reward,
next_state)
# true_gradient = grad[logPi(cur_state,action) * td_error]
actor.learn(cur_state, action, td_error)
cur_state = next_state
cur_step += 1
if done or cur_step >= MAX_EP_STEPS:
ep_rs_sum = sum(track_r)
if 'running_reward' not in locals():
running_reward = ep_rs_sum
else:
running_reward = running_reward * RUNNING_REWARD_DECAY + ep_rs_sum * (1-RUNNING_REWARD_DECAY)
# 判断是否达到可视化阈值
# if running_reward > DISPLAY_REWARD_THRESHOLD:
# render = True
print("episode:", i_episode, " reward:", int(running_reward), " steps:", cur_step)
break
if i_episode > 0 and i_episode % SAVE_EPISODES == 0:
if not os.path.exists(model_dir):
os.mkdir(model_dir)
ckpt_path = os.path.join(model_dir, '{}_model.ckpt'.format(i_episode))
saver.save(sess, ckpt_path)
开始训练
print('MAX_EPISODE:', MAX_EPISODE)
model_train()
# reset graph
tf.reset_default_graph()
大约2分钟可以完成,需要为等一下。这个步骤很重要的
结束后可以看到我们制定最大episode数目。
最后进行模型推理,为了可视化,将这里的env.render()进行输出。
def model_test():
env, num_features, num_actions = create_env()
sess = tf.Session()
actor = Actor(sess, n_features=num_features, n_actions=num_actions, lr=LR_A)
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
saver.restore(sess, tf.train.latest_checkpoint(model_dir))
for i_episode in range(TEST_EPISODE):
cur_state = env.reset()
cur_step = 0
track_r = []
while True:
# 可视化
# env.render()
action = actor.choose_action(cur_state)
next_state, reward, done, info = env.step(action)
track_r.append(reward)
cur_state = next_state
cur_step += 1
if done or cur_step >= MAX_EP_STEPS:
ep_rs_sum = sum(track_r)
print("episode:", i_episode, " reward:", int(ep_rs_sum), " steps:", cur_step)
break
最后我们可以看到动画
为了加密代码,我们可以用!env | grep PROJECT_ID命令实现
利用“+”增加这条命令并执行,可以到加密的ID结果,如下图:
最后一定要记得释放资源,不然别人无法使用。
该项目很好地学习了A2C模型过程还有AI训练的过程,对于AI初学者很有好处。
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)