使用强化学习AlphaZero算法训练中国象棋AI
使用强化学习AlphaZero算法训练中国象棋AI
案例目标
通过本案例的学习和课后作业的练习:
- 了解强化学习AlphaZero算法;
- 利用AlphaZero算法进行一次中国象棋AI训练;
你也可以将本案例相关的 ipynb 学习笔记分享到 AI Gallery Notebook 版块获得成长值,分享方法请查看此文档。
案例内容介绍
AlphaZero是一种强化学习算法,近期利用AlphaZero训练出的AI以绝对的优势战胜了多名围棋以及国际象棋冠军。AlphaZero创新点在于,它能够在不依赖于外部先验知识(也称专家知识),仅仅了解游戏规则的情况下,在棋盘类游戏中获得超越人类的表现。
本次案例将详细的介绍AlphaZero算法核心原理,包括神经网络构建、MCTS搜索、自博弈训练,以代码的形式加深对算法的理解,算法详情亦可见论文《Mastering the game of Go without human knowledge》。同时本案例提供中国象棋强化学习环境,利用AlphaZero进行一次中国象棋训练,最后可视化象棋AI自博弈对局。
由于训练一个强力的中国象棋AI需要大量的训练时间和资源,本案例偏重于算法理解,在运行过程中简化了训练过程,减少了自博弈次数和搜索次数。如果想要完整地训练一个中国象棋AlphaZero AI,可在AI Gallery中订阅《CChess中国象棋》算法,并在ModelArts中进行训练。
注意事项
-
本案例运行环境为 TensorFlow-1.13.1,且建议使用 GPU 运行,请查看《ModelAtrs JupyterLab 硬件规格使用指南》了解切换硬件规格的方法;
-
如果您是第一次使用 JupyterLab,请查看《ModelAtrs JupyterLab使用指导》了解使用方法;
-
如果您在使用 JupyterLab 过程中碰到报错,请参考《ModelAtrs JupyterLab常见问题解决办法》尝试解决问题;
-
请逐步运行下面的每一个代码块;
!pip install tornado==6.1.0
!pip install tflearn==0.3.2
!pip install tqdm
!pip install urllib3==1.22
!pip install threadpool==1.3.2
!pip install xmltodict==0.12.0
!pip install requests
!pip install pandas==0.19.2
!pip install numpy==1.14.5
!pip install scipy==1.1.0
!pip install matplotlib==2.0.0
!pip install nest_asyncio
!pip install gast==0.2.2
第2步: 下载依赖包
import os
import moxing as mox
if not os.path.exists('cchess_training'):
mox.file.copy("obs://modelarts-labs-bj4/course/modelarts/reinforcement_learning/cchess_gameplay/cchess_training/cchess_training.zip", "cchess_training.zip")
os.system('unzip cchess_training.zip')
第3步:导入相关的库
%matplotlib notebook
%matplotlib auto
import os
import sys
import logging
import subprocess
import copy
import random
import json
import asyncio
import time
import numpy as np
import tensorflow as tf
from multiprocessing import Process
from cchess_training.cchess_zero import board_visualizer
from cchess_training.gameplays import players, gameplay
from cchess_training.config import conf
from cchess_training.common.board import create_uci_labels
from cchess_training.cchess_training_model_update import model_update
from cchess_training.cchess_zero.gameboard import GameBoard
from cchess_training.cchess_zero import cbf
from cchess_training.utils import get_latest_weight_path
import nest_asyncio
nest_asyncio.apply()
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
logging.basicConfig(level=logging.INFO,
format="[%(asctime)s] [%(levelname)s] [%(message)s]",
datefmt='%Y-%m-%d %H:%M:%S'
)
2.构建神经网络
这里基于Resnet实现了AlphaZero中的神经网络,神经网络输入为当前象棋棋面转化得到的0-1图,大小为[10, 9, 14],[10, 9]表示象棋棋盘大小,[14]每一个plane对应一类棋子,我方7类(兵、炮、车、马、相、仕、将),敌方7类,共14个plane。经过Resnet提取特征后分为两个分支,一个是价值分支,输出当前棋面价值,另一个是策略头,输出神经网络计算得到的动作对应概率。
# resnet
def res_block(inputx, name, training, block_num=2, filters=256, kernel_size=(3, 3)):
net = inputx
for i in range(block_num):
net = tf.layers.conv2d(
net,
filters=filters,
kernel_size=kernel_size,
activation=None,
name="{}_res_conv{}".format(name, i),
padding='same'
)
net = tf.layers.batch_normalization(net, training=training, name="{}_res_bn{}".format(name, i))
if i == block_num - 1:
net = net + inputx
net = tf.nn.elu(net, name="{}_res_elu{}".format(name, i))
return net
def conv_block(inputx, name, training, block_num=1, filters=2, kernel_size=(1, 1)):
net = inputx
for i in range(block_num):
net = tf.layers.conv2d(
net,
filters=filters,
kernel_size=kernel_size,
activation=None,
name="{}_convblock_conv{}".format(name, i),
padding='same'
)
net = tf.layers.batch_normalization(net, training=training, name="{}_convblock_bn{}".format(name, i))
net = tf.nn.elu(net, name="{}_convblock_elu{}".format(name, i))
# net shape [None,10,9,2]
netshape = net.get_shape().as_list()
net = tf.reshape(net, shape=(-1, netshape[1] * netshape[2] * netshape[3]))
net = tf.layers.dense(net, 10 * 9, name="{}_dense".format(name))
net = tf.nn.elu(net, name="{}_elu".format(name))
return net
def res_net_board(inputx, name, training, filters=256, num_res_layers=4):
net = inputx
net = tf.layers.conv2d(
net,
filters=filters,
kernel_size=(3, 3),
activation=None,
name="{}_res_convb".format(name),
padding='same'
)
net = tf.layers.batch_normalization(net, training=training, name="{}_res_bnb".format(name))
net = tf.nn.elu(net, name="{}_res_elub".format(name))
for i in range(num_res_layers):
net = res_block(net, name="{}_layer_{}".format(name, i + 1), training=training, filters=filters)
return net
def get_scatter(name):
with tf.variable_scope("Test"):
ph = tf.placeholder(tf.float32, name=name)
op = tf.summary.scalar(name, ph)
return ph, op
def average_gradients(tower_grads):
"""Calculate the average gradient for each shared variable across all towers.
Note that this function provides a synchronization point across all towers.
Args:
tower_grads: List of lists of (gradient, variable) tuples. The outer list
is over individual gradients. The inner list is over the gradient
calculation for each tower.
Returns:
List of pairs of (gradient, variable) where the gradient has been averaged
across all towers.
"""
average_grads = []
for grad_and_vars in zip(*tower_grads):
# Note that each grad_and_vars looks like the following:
# ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN))
grads = []
for g, _ in grad_and_vars:
# Add 0 dimension to the gradients to represent the tower.
expanded_g = tf.expand_dims(g, 0)
# Append on a 'tower' dimension which we will average over below.
grads.append(expanded_g)
# Average over the 'tower' dimension.
grad = tf.concat(grads, 0)
grad = tf.reduce_mean(grad, 0)
# Keep in mind that the Variables are redundant because they are shared
# across towers. So .. we will just return the first tower's pointer to
# the Variable.
v = grad_and_vars[0][1]
grad_and_var = (grad, v)
average_grads.append(grad_and_var)
return average_grads
def add_grad_to_list(opt, train_param, loss, tower_grad):
grads = opt.compute_gradients(loss, var_list=train_param)
grads = [i[0] for i in grads]
tower_grad.append(zip(grads, train_param))
def get_op_mul(tower_gradients, optimizer, gs):
grads = average_gradients(tower_gradients)
train_op = optimizer.apply_gradients(grads, gs)
return train_op
def reduce_mean(x):
return tf.reduce_mean(x)
def merge(x):
return tf.concat(x, axis=0)
def get_model_resnet(
model_name,
labels,
gpu_core=[0],
batch_size=512,
num_res_layers=4,
filters=256,
extra=False,
extrav2=False
):
tf.reset_default_graph()
graph = tf.Graph()
with graph.as_default():
x_input = tf.placeholder(tf.float32, [None, 10, 9, 14])
nextmove = tf.placeholder(tf.float32, [None, len(labels)])
score = tf.placeholder(tf.float32, [None, 1])
training = tf.placeholder(tf.bool, name='training_mode')
learning_rate = tf.placeholder(tf.float32)
global_step = tf.train.get_or_create_global_step()
optimizer_policy = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.9)
optimizer_value = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.9)
optimizer_multitarg = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.9)
tower_gradients_policy, tower_gradients_value, tower_gradients_multitarg = [], [], []
net_softmax_collection = []
value_head_collection = []
multitarget_loss_collection = []
value_loss_collection = []
policy_loss_collection = []
accuracy_select_collection = []
with tf.variable_scope(tf.get_variable_scope()) as vscope:
for ind, one_core in enumerate(gpu_core):
if one_core is not None:
devicestr = "/gpu:{}".format(one_core) if one_core is not None else ""
else:
devicestr = '/cpu:0'
with tf.device(devicestr):
body = res_net_board(
x_input[ind * (batch_size // len(gpu_core)):(ind + 1) * (batch_size // len(gpu_core))],
"selectnet",
training=training,
filters=filters,
num_res_layers=num_res_layers
)
with tf.variable_scope("policy_head"):
policy_head = tf.layers.conv2d(body, 2, 1, padding='SAME')
policy_head = tf.contrib.layers.batch_norm(
policy_head,
center=False,
epsilon=1e-5,
fused=True,
is_training=training,
activation_fn=tf.nn.relu
)
policy_head = tf.reshape(policy_head, [-1, 9 * 10 * 2])
policy_head = tf.contrib.layers.fully_connected(policy_head, len(labels), activation_fn=None)
# 价值头
with tf.variable_scope("value_head"):
value_head = tf.layers.conv2d(body, 1, 1, padding='SAME')
value_head = tf.contrib.layers.batch_norm(
value_head,
center=False,
epsilon=1e-5,
fused=True,
is_training=training,
activation_fn=tf.nn.relu
)
value_head = tf.reshape(value_head, [-1, 9 * 10 * 1])
value_head = tf.contrib.layers.fully_connected(value_head, 256, activation_fn=tf.nn.relu)
value_head = tf.contrib.layers.fully_connected(value_head, 1, activation_fn=tf.nn.tanh)
value_head_collection.append(value_head)
net_unsoftmax = policy_head
with tf.variable_scope("Loss"):
policy_loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(
labels=nextmove[ind * (batch_size // len(gpu_core)):
(ind + 1) * (batch_size // len(gpu_core))],
logits=net_unsoftmax))
value_loss = tf.losses.mean_squared_error(
labels=score[ind * (batch_size // len(gpu_core)):(ind + 1) * (batch_size // len(gpu_core))],
predictions=value_head)
value_loss = tf.reduce_mean(value_loss)
regularizer = tf.contrib.layers.l2_regularizer(scale=1e-5)
regular_variables = tf.trainable_variables()
l2_loss = tf.contrib.layers.apply_regularization(regularizer, regular_variables)
multitarget_loss = value_loss + policy_loss + l2_loss
multitarget_loss_collection.append(multitarget_loss)
value_loss_collection.append(value_loss)
policy_loss_collection.append(policy_loss)
net_softmax = tf.nn.softmax(net_unsoftmax)
net_softmax_collection.append(net_softmax)
correct_prediction = tf.equal(tf.argmax(nextmove, 1), tf.argmax(net_softmax, 1))
with tf.variable_scope("Accuracy"):
accuracy_select = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
accuracy_select_collection.append(accuracy_select)
tf.get_variable_scope().reuse_variables()
trainable_params = tf.trainable_variables()
tp_policy = [i for i in trainable_params if ('value_head' not in i.name)]
tp_value = [i for i in trainable_params if ('policy_head' not in i.name)]
add_grad_to_list(optimizer_policy, tp_policy, policy_loss, tower_gradients_policy)
add_grad_to_list(optimizer_value, tp_value, value_loss, tower_gradients_value)
add_grad_to_list(optimizer_multitarg, trainable_params, multitarget_loss, tower_gradients_multitarg)
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
with tf.control_dependencies(update_ops):
train_op_policy = get_op_mul(tower_gradients_policy, optimizer_policy, global_step)
train_op_value = get_op_mul(tower_gradients_value, optimizer_value, global_step)
train_op_multitarg = get_op_mul(tower_gradients_multitarg, optimizer_multitarg, global_step)
net_softmax = merge(net_softmax_collection)
value_head = merge(value_head_collection)
multitarget_loss = reduce_mean(multitarget_loss_collection)
value_loss = reduce_mean(value_loss_collection)
policy_loss = reduce_mean(policy_loss_collection)
accuracy_select = reduce_mean(accuracy_select_collection)
with graph.as_default():
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.allow_soft_placement = True
sess = tf.Session(config=config)
if model_name is not None:
with graph.as_default():
saver = tf.train.Saver(var_list=tf.global_variables())
saver.restore(sess, model_name)
else:
with graph.as_default():
sess.run(tf.global_variables_initializer())
return (sess, graph), ((x_input, training), (net_softmax, value_head))
3.实现MCTS
AlphaZero利用MCTS来自博弈生成棋局,MCTS搜索原理简述如下:
-
每次模拟通过选择具有最大行动价值Q的边加上取决于所存储的先验概率P和该边的访问计数N(每次访问都被增加一次)的上限置信区间U来遍历树;
-
展开叶子节点,通过神经网络来评估局面s,向量P的值存储在叶子结点扩展的边上;
-
更新行动价值Q等于在该行动下的子树中的所有评估值V的均值;
-
一旦MCTS搜索完成,返回局面s下的落子概率π。
def softmax(x):
probs = np.exp(x - np.max(x))
probs /= np.sum(probs)
return probs
class TreeNode(object):
"""A node in the MCTS tree.
Each node keeps track of its own value Q, prior probability P, and
its visit-count-adjusted prior score u.
"""
def __init__(self, parent, prior_p, noise=False):
self._parent = parent
self._children = {} # a map from action to TreeNode
self._n_visits = 0
self._Q = 0
self._u = 0
self._P = prior_p
self.virtual_loss = 0
self.noise = noise
def expand(self, action_priors):
"""Expand tree by creating new children.
action_priors: a list of tuples of actions and their prior probability
according to the policy function.
"""
# dirichlet noise should be applied when every select action
if False and self.noise is True and self._parent is None:
noise_d = np.random.dirichlet([0.3] * len(action_priors))
for (action, prob), one_noise in zip(action_priors, noise_d):
if action not in self._children:
prob = (1 - 0.25) * prob + 0.25 * one_noise
self._children[action] = TreeNode(self, prob, noise=self.noise)
else:
for action, prob in action_priors:
if action not in self._children:
self._children[action] = TreeNode(self, prob)
def select(self, c_puct):
"""Select action among children that gives maximum action value Q
plus bonus u(P).
Return: A tuple of (action, next_node)
"""
if self.noise is False:
return max(self._children.items(), key=lambda act_node: act_node[1].get_value(c_puct))
elif self.noise is True and self._parent is not None:
return max(self._children.items(), key=lambda act_node: act_node[1].get_value(c_puct))
else:
noise_d = np.random.dirichlet([0.3] * len(self._children))
return max(list(zip(noise_d, self._children.items())),
key=lambda act_node: act_node[1][1].get_value(c_puct, noise_p=act_node[0]))[1]
def update(self, leaf_value):
"""Update node values from leaf evaluation.
leaf_value: the value of subtree evaluation from the current player's
perspective.
"""
# Count visit.
self._n_visits += 1
# Update Q, a running average of values for all visits.
self._Q += 1.0 * (leaf_value - self._Q) / self._n_visits
def update_recursive(self, leaf_value):
"""Like a call to update(), but applied recursively for all ancestors.
"""
# If it is not root, this node's parent should be updated first.
if self._parent:
self._parent.update_recursive(-leaf_value)
self.update(leaf_value)
def get_value(self, c_puct, noise_p=None):
"""Calculate and return the value for this node.
It is a combination of leaf evaluations Q, and this node's prior
adjusted for its visit count, u.
c_puct: a number in (0, inf) controlling the relative impact of
value Q, and prior probability P, on this node's score.
"""
if noise_p is None:
self._u = (c_puct * self._P *
np.sqrt(self._parent._n_visits) / (1 + self._n_visits))
return self._Q + self._u + self.virtual_loss
else:
self._u = (c_puct * (self._P * 0.75 + noise_p * 0.25) *
np.sqrt(self._parent._n_visits) / (1 + self._n_visits))
return self._Q + self._u + self.virtual_loss
def is_leaf(self):
"""Check if leaf node (i.e. no nodes below this have been expanded)."""
return self._children == {}
def is_root(self):
return self._parent is None
class MCTS(object):
"""An implementation of Monte Carlo Tree Search."""
def __init__(
self,
policy_value_fn,
c_puct=5,
n_playout=10000,
search_threads=32,
virtual_loss=3,
policy_loop_arg=False,
dnoise=False,
play=False
):
"""
policy_value_fn: a function that takes in a board state and outputs
a list of (action, probability) tuples and also a score in [-1, 1]
(i.e. the expected value of the end game score from the current
player's perspective) for the current player.
c_puct: a number in (0, inf) that controls how quickly exploration
converges to the maximum-value policy. A higher value means
relying on the prior more.
"""
self._root = TreeNode(None, 1.0, noise=dnoise)
self._policy = policy_value_fn
self._c_puct = c_puct
self._n_playout = n_playout
self.virtual_loss = virtual_loss
self.loop = asyncio.get_event_loop()
self.policy_loop_arg = policy_loop_arg
self.sem = asyncio.Semaphore(search_threads)
self.now_expanding = set()
self.select_time = 0
self.policy_time = 0
self.update_time = 0
self.num_proceed = 0
self.dnoise = dnoise
self.play = play
async def _playout(self, state):
"""Run a single playout from the root to the leaf, getting a value at
the leaf and propagating it back through its parents.
State is modified in-place, so a copy must be provided.
"""
with await self.sem:
node = self._root
road = []
while 1:
while node in self.now_expanding:
await asyncio.sleep(1e-4)
start = time.time()
if node.is_leaf():
break
# Greedily select next move.
action, node = node.select(self._c_puct)
road.append(node)
node.virtual_loss -= self.virtual_loss
state.do_move(action)
self.select_time += (time.time() - start)
# at leave node if long check or long catch then cut off the node
if state.should_cutoff() and not self.play:
# cut off node
for one_node in road:
one_node.virtual_loss += self.virtual_loss
# now at this time, we do not update the entire tree branch, the accuracy loss is supposed to be small
# set virtual loss to -inf so that other threads would not
# visit the same node again(so the node is cut off)
node.virtual_loss = - np.inf
self.update_time += (time.time() - start)
# however the proceed number still goes up 1
self.num_proceed += 1
return
start = time.time()
self.now_expanding.add(node)
# Evaluate the leaf using a network which outputs a list of
# (action, probability) tuples p and also a score v in [-1, 1]
# for the current player
if self.policy_loop_arg is False:
action_probs, leaf_value = await self._policy(state)
else:
action_probs, leaf_value = await self._policy(state, self.loop)
self.policy_time += (time.time() - start)
start = time.time()
# Check for end of game.
end, winner = state.game_end()
if not end:
node.expand(action_probs)
else:
# for end state,return the "true" leaf_value
if winner == -1: # tie
leaf_value = 0.0
else:
leaf_value = (
1.0 if winner == state.get_current_player() else -1.0
)
# Update value and visit count of nodes in this traversal.
for one_node in road:
one_node.virtual_loss += self.virtual_loss
node.update_recursive(-leaf_value)
self.now_expanding.remove(node)
# node.update_recursive(leaf_value)
self.update_time += (time.time() - start)
self.num_proceed += 1
def get_move_probs(self, state, temp=1e-3, predict_workers=[], can_apply_dnoise=False, verbose=False,
infer_mode=False):
"""Run all playouts sequentially and return the available actions and
their corresponding probabilities.
state: the current game state
temp: temperature parameter in (0, 1] controls the level of exploration
"""
if can_apply_dnoise is False:
self._root.noise = False
coroutine_list = []
for n in range(self._n_playout):
state_copy = copy.deepcopy(state)
coroutine_list.append(self._playout(state_copy))
coroutine_list += predict_workers
self.loop.run_until_complete(asyncio.gather(*coroutine_list))
# calc the move probabilities based on visit counts at the root node
act_visits = [(act, node._n_visits) for act, node in self._root._children.items()]
acts, visits = zip(*act_visits)
act_probs = softmax(1.0 / temp * np.log(np.array(visits) + 1e-10))
if infer_mode:
info = [(act, node._n_visits, node._Q, node._P) for act, node in self._root._children.items()]
if infer_mode:
return acts, act_probs, info
else:
return acts, act_probs
def update_with_move(self, last_move, allow_legacy=True):
"""Step forward in the tree, keeping everything we already know
about the subtree.
"""
self.num_proceed = 0
if last_move in self._root._children and allow_legacy:
self._root = self._root._children[last_move]
self._root._parent = None
else:
self._root = TreeNode(None, 1.0, noise=self.dnoise)
def __str__(self):
return "MCTS"
# Self-play
class Game(object):
def __init__(self, white, black, verbose=True):
self.white = white
self.black = black
self.verbose = verbose
self.gamestate = gameplay.GameState()
def play_till_end(self):
winner = 'peace'
moves = []
peace_round = 0
remain_piece = gameplay.countpiece(self.gamestate.statestr)
while True:
start_time = time.time()
if self.gamestate.move_number % 2 == 0:
player_name = 'w'
player = self.white
else:
player_name = 'b'
player = self.black
move, score = player.make_move(self.gamestate)
if move is None:
winner = 'b' if player_name == 'w' else 'w'
break
moves.append(move)
total_time = time.time() - start_time
logging.info('move {} {} play {} use {:.2f}s'.format(
self.gamestate.move_number,
player_name,
move,
total_time,))
game_end, winner_p = self.gamestate.game_end()
if game_end:
winner = winner_p
break
remain_piece_round = gameplay.countpiece(self.gamestate.statestr)
if remain_piece_round < remain_piece:
remain_piece = remain_piece_round
peace_round = 0
else:
peace_round += 1
if peace_round > conf.non_cap_draw_round:
winner = 'peace'
break
return winner, moves
class NetworkPlayGame(Game):
def __init__(self, network_w, network_b, **xargs):
whiteplayer = players.NetworkPlayer('w', network_w, **xargs)
blackplayer = players.NetworkPlayer('b', network_b, **xargs)
super(NetworkPlayGame, self).__init__(whiteplayer, blackplayer)
class ContinousNetworkPlayGames(object):
def __init__(
self,
network_w=None,
network_b=None,
white_name='net',
black_name='net',
random_switch=True,
recoard_game=True,
recoard_dir='data/distributed/',
play_times=np.inf,
distributed_dir='data/prepare_weight',
**xargs
):
self.network_w = network_w
self.network_b = network_b
self.white_name = white_name
self.black_name = black_name
self.random_switch = random_switch
self.play_times = play_times
self.recoard_game = recoard_game
self.recoard_dir = recoard_dir
self.xargs = xargs
# self.distributed_server = distributed_server
self.distributed_dir = distributed_dir
def begin_of_game(self):
pass
def end_of_game(self, cbf_name, moves, cbfile, training_dt, epoch):
pass
def play(self, data_url=None, epoch=0):
num = 0
while num < self.play_times:
time_one_game_start = time.time()
num += 1
self.begin_of_game(epoch)
if self.random_switch and random.random() < 0.5:
self.network_w, self.network_b = self.network_b, self.network_w
self.white_name, self.black_name = self.black_name, self.white_name
network_play_game = NetworkPlayGame(self.network_w, self.network_b, **self.xargs)
winner, moves = network_play_game.play_till_end()
stamp = time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(time.time()))
date = time.strftime('%Y-%m-%d', time.localtime(time.time()))
cbfile = cbf.CBF(
black=self.black_name,
red=self.white_name,
date=date,
site='北京',
name='noname',
datemodify=date,
redteam=self.white_name,
blackteam=self.black_name,
round='第一轮'
)
cbfile.receive_moves(moves)
randstamp = random.randint(0, 1000)
cbffilename = '{}_{}_mcts-mcts_{}-{}_{}.cbf'.format(
stamp, randstamp, self.white_name, self.black_name, winner)
if not os.path.exists(self.recoard_dir):
os.makedirs(self.recoard_dir)
cbf_name = os.path.join(self.recoard_dir, cbffilename)
cbfile.dump(cbf_name)
training_dt = time.time() - time_one_game_start
self.end_of_game(cbffilename, moves, cbfile, training_dt, epoch)
class DistributedSelfPlayGames(ContinousNetworkPlayGames):
def __init__(self, gpu_num=0, auto_update=True, mode='train', **kwargs):
self.gpu_num = gpu_num
self.auto_update = auto_update
self.model_name_in_use = None # for tracking latest weight
self.mode = mode
super(DistributedSelfPlayGames, self).__init__(**kwargs)
def begin_of_game(self, epoch):
"""
when self playing, init network player using the latest weights
"""
if not self.auto_update:
return
latest_model_name = get_latest_weight_path()
logging.info('------------------ restoring model {}'.format(latest_model_name))
model_path = os.path.join(self.distributed_dir, latest_model_name)
if self.network_w is None or self.network_b is None:
network = get_model_resnet(
model_path,
create_uci_labels(),
gpu_core=[self.gpu_num],
filters=conf.network_filters,
num_res_layers=conf.network_layers
)
self.network_w = network
self.network_b = network
self.model_name_in_use = model_path
else:
if model_path != self.model_name_in_use:
(sess, graph), ((X, training), (net_softmax, value_head)) = self.network_w
with graph.as_default():
saver = tf.train.Saver(var_list=tf.global_variables())
saver.restore(sess, model_path)
self.model_name_in_use = model_path
def end_of_game(self, cbf_name, moves, cbfile, training_dt, epoch):
played_games = len(os.listdir(conf.distributed_datadir))
if self.mode == 'train':
logging.info('------------------ epoch {}: trained {} games, this game used {}s'.format(
epoch,
played_games,
round(training_dt, 6),
))
else:
logging.info('------------------ infer {} games, this game used {}s'.format(
played_games,
round(training_dt, 6),
))
def self_play_gpu(gpu_num=0, play_times=np.inf, mode='train', n_playout=50, save_dir=conf.distributed_datadir):
logging.info('------------------ self play start')
cn = DistributedSelfPlayGames(
gpu_num=gpu_num,
n_playout=n_playout,
recoard_dir=save_dir,
c_puct=conf.c_puct,
distributed_dir=conf.distributed_server_weight_dir,
dnoise=True,
is_selfplay=True,
play_times=play_times,
mode=mode,
)
cn.play(epoch=0)
logging.info('------------------ self play done')
5.进行训练参数配置
配置一次训练过程中自博弈次数、训练结束后采用训练出的模型进行推理局数、训练batch_size。为简化训练过程参数均较小。
config = {
"n_playout": 100, # MCTS搜索次数,推荐(10-1600),数字越小程序运行越快,数字越大算法搜索准确度越高
"self_play_games": 2, # 自博弈对局数, 推荐(5-10000),注意数字较大时可能会超过资源免费体验时长
"infer_games": 1, # 推理对局数
"gpu_num": 0, # 使用的GPU卡号
}
6.开始自博弈训练,结束后更新模型
运行过程中会打印出双方下棋动作
self_play_gpu(config['gpu_num'], config['self_play_games'], mode='train', n_playout=config['n_playout'])
# model update
model_update(gpu_num=config['gpu_num'])
7.可视化对局
(等待第六步运行结束后再运行此步)
在此将加载模型进行博弈一次,可视化对局过程,最后显示对弈结束时的棋面
self_play_gpu(config['gpu_num'], config['infer_games'], mode='infer', n_playout=config['n_playout'], save_dir='./infer_res')
加载对局文件显示双方所有动作,动作为棋盘上起点坐标至终点坐标,具体坐标定义见后面的棋盘。
%reload_ext autoreload
%autoreload 2
%matplotlib inline
from matplotlib import pyplot as plt
from cchess_training.cchess_zero.gameboard import *
from PIL import Image
import imageio
gameplay_path = './infer_res'
while not os.path.exists(gameplay_path) or len(os.listdir(gameplay_path)) == 0:
time.sleep(5)
logging.info('第6步未运行结束,建议停止运行,重新逐步运行')
gameplays = os.listdir(gameplay_path)
fullpath = '{}/{}'.format(gameplay_path, random.choice(gameplays))
moves = cbf.cbf2move(fullpath)
fname = fullpath.split('/')[-1]
print(moves)
['b2e2', 'h7h5', 'b0c2', 'b7e7', 'h0g2', 'h5i5', 'h2i2', 'a9a7', 'i0i1', 'i5g5', 'c2e1', 'h9i7', 'c3c4', 'e6e5', 'a0b0', 'e7g7', 'g3g4', 'c6c5', 'i1h1', 'i7g8', 'e2e5', 'i9i8', 'g2f4', 'b9c7', 'g4g5', 'c7b9', 'f4e6', 'a7e7', 'e6g7', 'e7e5', 'b0b9', 'c5c4', 'i2e2', 'c4d4', 'e2e5', 'i8i7', 'b9c9', 'i7g7', 'h1h8', 'i6i5', 'a3a4', 'g7a7', 'i3i4', 'i5i4', 'g5g6', 'i4i3', 'h8g8', 'i3h3', 'g8f8', 'a7a8', 'f8f9', 'e9f9', 'g6f6', 'a6a5', 'a4a5', 'd4e4', 'e3e4', 'g9e7', 'g0e2', 'a8g8', 'c9d9', 'g8g1']
可视化对弈过程
import cv2
from IPython.display import clear_output, Image, display
state = gameplay.GameState()
statestr = 'RNBAKABNR/9/1C5C1/P1P1P1P1P/9/9/p1p1p1p1p/1c5c1/9/rnbakabnr'
for move in moves:
clear_output(wait=True)
statestr = GameBoard.sim_do_action(move, statestr)
img = board_visualizer.get_board_img(statestr)
img_show = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR)
display(Image(data=cv2.imencode('.jpg', img_show)[1]))
time.sleep(0.5)
显示终局棋面
plt.figure(figsize=(8,8))
plt.imshow(board_visualizer.get_board_img(statestr))
至此,本案例结束,如果想要完整地训练一个中国象棋AlphaZero AI,可在AI Gallery中订阅《CChess中国象棋》算法,并在ModelArts中进行训练。
- 点赞
- 收藏
- 关注作者
评论(0)