本篇文章是博主强化学习RL领域学习时,用于个人学习、研究或者欣赏使用,并基于博主对相关等领域的一些理解而记录的学习摘录和笔记,若有不当和侵权之处,指出后将会立即改正,还望谅解。文章分类在强化学习专栏:
强化学习(7)---《【MADRL】多智能体双延迟深度确定性策略梯度(MATD3)算法》
多智能体双延迟深度确定性策略梯度(MATD3)算法
目录
1.MATD3算法介绍
2.背景
3.算法结构
4.具体公式
5.算法流程
6.公式总结
7.优势与应用场景
8.总结
[Python] MATD3实现(可移植)
1.MATD3算法介绍
MATD3(Multi-Agent Twin Delayed Deep Deterministic Policy Gradient)是基于TD3(Twin Delayed DDPG)算法的多智能体版本。TD3是深度确定性策略梯度(DDPG)算法的一个改进版本,主要针对其在确定性策略学习中的一些不稳定性进行了增强。MATD3则扩展了TD3,使其能够在多智能体环境下进行训练和执行。
文章:Addressing Function Approximation Error in Actor-Critic Methods
代码:MADRL多智能体双延迟深度确定性策略梯度(MATD3)算法
其他多智能体深度强化学习(MADRL)算法见下面博客:
【MADRL】多智能体深度强化学习《纲要》
2.背景
DDPG算法用于连续动作空间的强化学习任务,但在复杂环境下容易出现策略估计偏差、探索不足等问题。TD3通过引入两种关键机制来解决这些问题:
- 延迟更新:Actor的更新频率较Critic更低,以避免策略更新过快导致的不稳定性。
- 目标策略平滑:在计算目标Q值时,给动作增加噪声以减小过估计偏差。
在多智能体场景中,每个智能体不仅要与环境交互,还需要适应其他智能体的行为。MATD3结合了TD3的稳定性增强机制,并将其应用到多智能体系统中,使其能够在混合协作与竞争的环境下表现更佳。

3.算法结构
MATD3算法同样是基于Actor-Critic架构的多智能体强化学习算法,其中每个智能体都有独立的Actor网络和双Critic网络。该算法采用集中式训练,分布式执行的结构:
- 集中式训练:训练过程中,每个智能体的Critic网络可以访问所有智能体的状态和动作,以最大化每个智能体的累积回报。
- 分布式执行:在执行阶段,智能体仅根据自身的观测来选择动作。
4.具体公式
-
环境设定:
- 系统状态为
,每个智能体
的观测为
,动作为
。
- 每个智能体根据其策略
选择动作
,并根据奖励函数
得到即时奖励。
-
目标:每个智能体
的目标是最大化其期望累积回报:
![[ R_i = \mathbb{E} \left[ \sum_{t=0}^{T} \gamma^t r_i^t \right] ]](https://res.hc-cdn.com/ecology/9.3.164/v2_resources/ydcomm/libs/images/loading.gif)
其中,
是智能体
在时刻
的即时奖励,
是折扣因子。
-
Critic网络:MATD3每个智能体
的Critic网络有两个Q值函数
和
用于减少Q值估计的偏差。这两个Q值函数的更新方式类似于TD3中的方式:
-
首先计算目标Q值,使用所有智能体的下一个动作
:![[ y_i = r_i + \gamma \min{j=1,2} Q*_i^{j}(s', a'_1, ..., a'_N) ]](https://res.hc-cdn.com/ecology/9.3.164/v2_resources/ydcomm/libs/images/loading.gif)
-
其中
是目标网络,
是通过目标Actor网络
生成的动作。
-
使用均方误差(MSE)损失函数更新两个Critic网络的参数
:
其中
。
-
Actor网络:Actor网络的更新也是通过最大化Critic网络的Q值来进行的。Actor策略的梯度可以通过下式计算:
![[ \nabla_{\theta_{\pi_i}} J(\pi_i) = \mathbb{E}{s,a} \left[ \nabla{a_i} Q_i^{1}(s, a_1, ..., a_N) \nabla_{\theta_{\pi_i}} \pi_i(o_i) \right] ]](https://res.hc-cdn.com/ecology/9.3.164/v2_resources/ydcomm/libs/images/loading.gif)
其中只使用
来更新Actor策略。
-
目标策略平滑:为了减少估计偏差,在计算目标Q值时,给每个智能体的动作
加入噪声:
这样可以避免策略过拟合某些特定的动作,从而提高策略的鲁棒性。
-
延迟更新:为了进一步提高稳定性,Actor网络的更新频率低于Critic网络。例如,Critic网络每更新两次,Actor网络更新一次。此外,目标网络的参数更新也较慢,遵循“软更新”策略:
其中
是软更新的速率参数。
5.算法流程
-
初始化:为每个智能体初始化两个Critic网络
和一个Actor网络
,并初始化对应的目标网络
。
-
交互与经验存储:每个智能体
与环境交互,记录当前状态、动作、奖励和下一个状态。
-
更新Critic网络:
- 从经验回放池中采样一个批次数据。
- 根据上述公式计算目标Q值
,并更新两个Critic网络。
-
延迟更新Actor网络:
- 每隔若干步,更新Actor网络的参数,最大化其对应的Critic网络的Q值。
-
软更新目标网络:更新目标Critic网络和目标Actor网络的参数。
-
重复步骤,直到智能体在环境中学会优化策略。
6.公式总结
-
Critic更新: ![[ L(\theta_i^{j}) = \mathbb{E}{s,a,r,s'} \left[ \left( Q_i^{j}(s, a_1, ..., a_N; \theta_i^{j}) - \left( r_i + \gamma \min{j=1,2} Q*_i^{j}(s', a'_1, ..., a'_N) \right) \right)^2 \right] ]](https://res.hc-cdn.com/ecology/9.3.164/v2_resources/ydcomm/libs/images/loading.gif)
-
Actor更新:![[ \nabla_{\theta_{\pi_i}} J(\pi_i) = \mathbb{E}{s,a} \left[ \nabla{a_i} Q_i^{1}(s, a_1, ..., a_N) \nabla_{\theta_{\pi_i}} \pi_i(o_i) \right] ]](https://res.hc-cdn.com/ecology/9.3.164/v2_resources/ydcomm/libs/images/loading.gif)
7.优势与应用场景
-
减少Q值估计偏差:通过引入两个Critic网络,MATD3显著减少了单个Critic在更新过程中的过估计问题,从而提高了稳定性。
-
解决多智能体非平稳性问题:多智能体环境下,其他智能体的策略会影响每个智能体的策略学习。MATD3通过全局信息的中心化训练方式,使得每个智能体能够学习到更加鲁棒的策略。
-
混合协作和竞争环境:该算法特别适用于协作与竞争混合的环境,因为它能够处理多个智能体之间的复杂交互。
8.总结
MATD3算法是TD3算法在多智能体场景下的扩展,通过中心化的Critic结构和去中心化的Actor结构,MATD3能够有效应对多智能体环境下的挑战。算法通过双Critic结构减少Q值估计偏差,并且延迟更新机制进一步提高了训练过程的稳定性,使其在混合协作与竞争的复杂环境中具有良好的表现。
[Python] MATD3实现(可移植)
若是下面代码复现困难或者有问题,欢迎评论区留言;需要以整个项目形式的代码,请在评论区留下您的邮箱,以便于及时分享给您(私信难以及时回复)。
主文件:MATD3_main
import torch
import numpy as np
from torch.utils.tensorboard import SummaryWriter
from environment import Env
import argparse
from replay_buffer import ReplayBuffer
from matd3 import MATD3
import copy
class Runner:
def __init__(self, args, env_name, number, seed):
self.args = args
self.env_name = env_name
self.number = number
self.seed = seed
self.env = Env(env_name, discrete=False)
self.env_evaluate = Env(env_name, discrete=False)
self.args.N = self.env.n
self.args.obs_dim_n = [self.env.observation_space[i].shape[0] for i in range(self.args.N)]
self.args.action_dim_n = [self.env.action_space[i].shape[0] for i in range(self.args.N)]
print("observation_space=", self.env.observation_space)
print("obs_dim_n={}".format(self.args.obs_dim_n))
print("action_space=", self.env.action_space)
print("action_dim_n={}".format(self.args.action_dim_n))
np.random.seed(self.seed)
torch.manual_seed(self.seed)
if self.args.algorithm == "MADDPG":
print("Algorithm: MADDPG")
elif self.args.algorithm == "MATD3":
print("Algorithm: MATD3")
self.agent_n = [MATD3(args, agent_id) for agent_id in range(args.N)]
else:
print("Wrong!!!")
self.replay_buffer = ReplayBuffer(self.args)
self.writer = SummaryWriter(log_dir='runs/{}/{}_env_{}_number_{}_seed_{}'.format(self.args.algorithm, self.args.algorithm, self.env_name, self.number, self.seed))
self.evaluate_rewards = []
self.total_steps = 0
self.noise_std = self.args.noise_std_init
def run(self, ):
self.evaluate_policy()
while self.total_steps < self.args.max_train_steps:
obs_n = self.env.reset()
for _ in range(self.args.episode_limit):
a_n = [agent.choose_action(obs, noise_std=self.noise_std) for agent, obs in zip(self.agent_n, obs_n)]
obs_next_n, r_n, done_n, _ = self.env.step(copy.deepcopy(a_n))
self.replay_buffer.store_transition(obs_n, a_n, r_n, obs_next_n, done_n)
obs_n = obs_next_n
self.total_steps += 1
if self.args.use_noise_decay:
self.noise_std = self.noise_std - self.args.noise_std_decay if self.noise_std - self.args.noise_std_decay > self.args.noise_std_min else self.args.noise_std_min
if self.replay_buffer.current_size > self.args.batch_size:
for agent_id in range(self.args.N):
self.agent_n[agent_id].train(self.replay_buffer, self.agent_n)
if self.total_steps % self.args.evaluate_freq == 0:
self.evaluate_policy()
if all(done_n):
break
self.env.close()
self.env_evaluate.close()
def evaluate_policy(self, ):
evaluate_reward = 0
for _ in range(self.args.evaluate_times):
obs_n = self.env_evaluate.reset()
episode_reward = 0
for _ in range(self.args.episode_limit):
a_n = [agent.choose_action(obs, noise_std=0) for agent, obs in zip(self.agent_n, obs_n)]
obs_next_n, r_n, done_n, _ = self.env_evaluate.step(copy.deepcopy(a_n))
episode_reward += r_n[0]
obs_n = obs_next_n
if all(done_n):
break
evaluate_reward += episode_reward
evaluate_reward = evaluate_reward / self.args.evaluate_times
self.evaluate_rewards.append(evaluate_reward)
print("total_steps:{} \t evaluate_reward:{} \t noise_std:{}".format(self.total_steps, evaluate_reward, self.noise_std))
self.writer.add_scalar('evaluate_step_rewards_{}'.format(self.env_name), evaluate_reward, global_step=self.total_steps)
np.save('./data_train/{}_env_{}_number_{}_seed_{}.npy'.format(self.args.algorithm, self.env_name, self.number, self.seed), np.array(self.evaluate_rewards))
for agent_id in range(self.args.N):
self.agent_n[agent_id].save_model(self.env_name, self.args.algorithm, self.number, self.total_steps, agent_id)
if __name__ == '__main__':
parser = argparse.ArgumentParser("Hyperparameters Setting for MADDPG and MATD3 in MPE environment")
parser.add_argument("--max_train_steps", type=int, default=int(1e6), help=" Maximum number of training steps")
parser.add_argument("--episode_limit", type=int, default=25, help="Maximum number of steps per episode")
parser.add_argument("--evaluate_freq", type=float, default=5000, help="Evaluate the policy every 'evaluate_freq' steps")
parser.add_argument("--evaluate_times", type=float, default=3, help="Evaluate times")
parser.add_argument("--max_action", type=float, default=1.0, help="Max action")
parser.add_argument("--algorithm", type=str, default="MATD3", help="MADDPG or MATD3")
parser.add_argument("--buffer_size", type=int, default=int(1e6), help="The capacity of the replay buffer")
parser.add_argument("--batch_size", type=int, default=1024, help="Batch size")
parser.add_argument("--hidden_dim", type=int, default=64, help="The number of neurons in hidden layers of the neural network")
parser.add_argument("--noise_std_init", type=float, default=0.2, help="The std of Gaussian noise for exploration")
parser.add_argument("--noise_std_min", type=float, default=0.05, help="The std of Gaussian noise for exploration")
parser.add_argument("--noise_decay_steps", type=float, default=3e5, help="How many steps before the noise_std decays to the minimum")
parser.add_argument("--use_noise_decay", type=bool, default=True, help="Whether to decay the noise_std")
parser.add_argument("--lr_a", type=float, default=5e-4, help="Learning rate of actor")
parser.add_argument("--lr_c", type=float, default=5e-4, help="Learning rate of critic")
parser.add_argument("--gamma", type=float, default=0.95, help="Discount factor")
parser.add_argument("--tau", type=float, default=0.01, help="Softly update the target network")
parser.add_argument("--use_orthogonal_init", type=bool, default=True, help="Orthogonal initialization")
parser.add_argument("--use_grad_clip", type=bool, default=True, help="Gradient clip")
parser.add_argument("--policy_noise", type=float, default=0.2, help="Target policy smoothing")
parser.add_argument("--noise_clip", type=float, default=0.5, help="Clip noise")
parser.add_argument("--policy_update_freq", type=int, default=2, help="The frequency of policy updates")
args = parser.parse_args()
args.noise_std_decay = (args.noise_std_init - args.noise_std_min) / args.noise_decay_steps
env_names = ["simple_speaker_listener", "simple_spread"]
env_index = 0
runner = Runner(args, env_name=env_names[env_index], number=1, seed=0)
runner.run()

环境文件:environment
移植事项:
1.注意环境参数的设置格式
2.注意环境的返回值利用
3.注意主运行流程的runner.run()的相关设置,等
可借鉴:【MADRL】基于MADRL的单调价值函数分解(QMIX)算法 中关于 QMIX算法移植的注意事项和代码注释。
文章若有不当和不正确之处,还望理解与指出。由于部分文字、图片等来源于互联网,无法核实真实出处,如涉及相关争议,请联系博主删除。如有错误、疑问和侵权,欢迎评论留言联系作者,或者关注VX公众号:Rain21321,联系作者。
评论(0)