【MADRL】多智能体双延迟深度确定性策略梯度(MATD3)算法

举报
不去幼儿园 发表于 2024/12/20 11:10:10 2024/12/20
469 0 0
【摘要】 MATD3(Multi-Agent Twin Delayed Deep Deterministic Policy Gradient)是基于TD3(Twin Delayed DDPG)算法的多智能体版本。TD3是深度确定性策略梯度(DDPG)算法的一个改进版本,主要针对其在确定性策略学习中的一些不稳定性进行了增强。MATD3则扩展了TD3,使其能够在多智能体环境下进行训练和执行。 DDPG算法用于连

         本篇文章是博主强化学习RL领域学习时,用于个人学习、研究或者欣赏使用,并基于博主对相关等领域的一些理解而记录的学习摘录和笔记,若有不当和侵权之处,指出后将会立即改正,还望谅解。文章分类在强化学习专栏:

       强化学习(7)---《【MADRL】多智能体双延迟深度确定性策略梯度(MATD3)算法》

多智能体双延迟深度确定性策略梯度(MATD3)算法

目录

1.MATD3算法介绍

2.背景

3.算法结构

4.具体公式

5.算法流程

6.公式总结

7.优势与应用场景

8.总结

 [Python] MATD3实现(可移植)


1.MATD3算法介绍

        MATD3(Multi-Agent Twin Delayed Deep Deterministic Policy Gradient)是基于TD3(Twin Delayed DDPG)算法的多智能体版本。TD3是深度确定性策略梯度(DDPG)算法的一个改进版本,主要针对其在确定性策略学习中的一些不稳定性进行了增强。MATD3则扩展了TD3,使其能够在多智能体环境下进行训练和执行。

文章Addressing Function Approximation Error in Actor-Critic Methods

代码MADRL多智能体双延迟深度确定性策略梯度(MATD3)算法

 其他多智能体深度强化学习(MADRL算法见下面博客:

【MADRL】多智能体深度强化学习《纲要》


2.背景

        DDPG算法用于连续动作空间的强化学习任务,但在复杂环境下容易出现策略估计偏差、探索不足等问题。TD3通过引入两种关键机制来解决这些问题:

  • 延迟更新:Actor的更新频率较Critic更低,以避免策略更新过快导致的不稳定性。
  • 目标策略平滑:在计算目标Q值时,给动作增加噪声以减小过估计偏差。

        在多智能体场景中,每个智能体不仅要与环境交互,还需要适应其他智能体的行为。MATD3结合了TD3的稳定性增强机制,并将其应用到多智能体系统中,使其能够在混合协作与竞争的环境下表现更佳。


3.算法结构

        MATD3算法同样是基于Actor-Critic架构的多智能体强化学习算法,其中每个智能体都有独立的Actor网络和双Critic网络。该算法采用集中式训练,分布式执行的结构:

  • 集中式训练:训练过程中,每个智能体的Critic网络可以访问所有智能体的状态和动作,以最大化每个智能体的累积回报。
  • 分布式执行:在执行阶段,智能体仅根据自身的观测来选择动作。

4.具体公式

  1. 环境设定

    • 系统状态为( s \in S ),每个智能体( i )的观测为 ( o_i \in O ),动作为( a_i \in A )
    • 每个智能体根据其策略( \pi_i(o_i) ) 选择动作( a_i ),并根据奖励函数 ( r_i ) 得到即时奖励。
  2. 目标:每个智能体 ( i )的目标是最大化其期望累积回报:

    [ R_i = \mathbb{E} \left[ \sum_{t=0}^{T} \gamma^t r_i^t \right] ]

    其中,( r_i^t )是智能体( i )在时刻( t )的即时奖励,( \gamma )是折扣因子。

  3. Critic网络:MATD3每个智能体( i )的Critic网络有两个Q值函数 ( Q_i^{1}(s, a_1, ..., a_N) )和 (Q_i^{2}(s, a_1, ..., a_N) )用于减少Q值估计的偏差。这两个Q值函数的更新方式类似于TD3中的方式:

    1. 首先计算目标Q值,使用所有智能体的下一个动作 ( a'j )[ y_i = r_i + \gamma \min{j=1,2} Q*_i^{j}(s', a'_1, ..., a'_N) ]

    2. 其中( Q*_i^{1}, Q*_i^{2} )是目标网络,( a'_j )是通过目标Actor网络 ( \pi'_j ) 生成的动作。

    3. 使用均方误差(MSE)损失函数更新两个Critic网络的参数( \theta_i^{1}, \theta_i^{2} )[ L(\theta_i^{j}) = \mathbb{E}_{s,a,r,s'} \left[ \left( Q_i^{j}(s, a_1, ..., a_N; \theta_i^{j}) - y_i \right)^2 \right] ]其中( j = 1, 2 )

  4. Actor网络:Actor网络的更新也是通过最大化Critic网络的Q值来进行的。Actor策略的梯度可以通过下式计算:

    [ \nabla_{\theta_{\pi_i}} J(\pi_i) = \mathbb{E}{s,a} \left[ \nabla{a_i} Q_i^{1}(s, a_1, ..., a_N) \nabla_{\theta_{\pi_i}} \pi_i(o_i) \right] ]

    其中只使用( Q_i^{1} )来更新Actor策略。

  5. 目标策略平滑:为了减少估计偏差,在计算目标Q值时,给每个智能体的动作 ( a'_i )加入噪声:[ a'_i = \pi'_i(o'_i) + \epsilon, \quad \epsilon \sim \mathcal{N}(0, \sigma) ]这样可以避免策略过拟合某些特定的动作,从而提高策略的鲁棒性。

  6. 延迟更新:为了进一步提高稳定性,Actor网络的更新频率低于Critic网络。例如,Critic网络每更新两次,Actor网络更新一次。此外,目标网络的参数更新也较慢,遵循“软更新”策略: [ \theta'_i \leftarrow \tau \theta_i + (1 - \tau) \theta'_i ]其中 ( \tau )是软更新的速率参数。


5.算法流程

  1. 初始化:为每个智能体初始化两个Critic网络 ( Q_i^{1}, Q_i^{2} )和一个Actor网络 ( \pi_i ),并初始化对应的目标网络( Q*_i^{1}, Q*_i^{2}, \pi'_i )

  2. 交互与经验存储:每个智能体( i )与环境交互,记录当前状态、动作、奖励和下一个状态。

  3. 更新Critic网络

    • 从经验回放池中采样一个批次数据。
    • 根据上述公式计算目标Q值 ( y_i ),并更新两个Critic网络。
  4. 延迟更新Actor网络

    • 每隔若干步,更新Actor网络的参数,最大化其对应的Critic网络的Q值。
  5. 软更新目标网络:更新目标Critic网络和目标Actor网络的参数。

  6. 重复步骤,直到智能体在环境中学会优化策略。


6.公式总结

  • Critic更新[ L(\theta_i^{j}) = \mathbb{E}{s,a,r,s'} \left[ \left( Q_i^{j}(s, a_1, ..., a_N; \theta_i^{j}) - \left( r_i + \gamma \min{j=1,2} Q*_i^{j}(s', a'_1, ..., a'_N) \right) \right)^2 \right] ]

  • Actor更新[ \nabla_{\theta_{\pi_i}} J(\pi_i) = \mathbb{E}{s,a} \left[ \nabla{a_i} Q_i^{1}(s, a_1, ..., a_N) \nabla_{\theta_{\pi_i}} \pi_i(o_i) \right] ]


7.优势与应用场景

  • 减少Q值估计偏差:通过引入两个Critic网络,MATD3显著减少了单个Critic在更新过程中的过估计问题,从而提高了稳定性。

  • 解决多智能体非平稳性问题:多智能体环境下,其他智能体的策略会影响每个智能体的策略学习。MATD3通过全局信息的中心化训练方式,使得每个智能体能够学习到更加鲁棒的策略。

  • 混合协作和竞争环境:该算法特别适用于协作与竞争混合的环境,因为它能够处理多个智能体之间的复杂交互。


8.总结

        MATD3算法是TD3算法在多智能体场景下的扩展,通过中心化的Critic结构和去中心化的Actor结构,MATD3能够有效应对多智能体环境下的挑战。算法通过双Critic结构减少Q值估计偏差,并且延迟更新机制进一步提高了训练过程的稳定性,使其在混合协作与竞争的复杂环境中具有良好的表现。


 [Python] MATD3实现(可移植)

        若是下面代码复现困难或者有问题,欢迎评论区留言;需要以整个项目形式的代码,请在评论区留下您的邮箱,以便于及时分享给您(私信难以及时回复)。

主文件:MATD3_main 

import torch
import numpy as np
from torch.utils.tensorboard import SummaryWriter
from environment import Env
import argparse
from replay_buffer import ReplayBuffer
# from maddpg import MADDPG
from matd3 import MATD3
import copy
class Runner:
    def __init__(self, args, env_name, number, seed):
        self.args = args
        self.env_name = env_name
        self.number = number
        self.seed = seed
        # Create env
        self.env = Env(env_name, discrete=False)  # Continuous action space
        self.env_evaluate = Env(env_name, discrete=False)
        self.args.N = self.env.n  # The number of agents
        self.args.obs_dim_n = [self.env.observation_space[i].shape[0] for i in range(self.args.N)]  # obs dimensions of N agents
        self.args.action_dim_n = [self.env.action_space[i].shape[0] for i in range(self.args.N)]  # actions dimensions of N agents
        print("observation_space=", self.env.observation_space)
        print("obs_dim_n={}".format(self.args.obs_dim_n))
        print("action_space=", self.env.action_space)
        print("action_dim_n={}".format(self.args.action_dim_n))

        # Set random seed
        np.random.seed(self.seed)
        torch.manual_seed(self.seed)

        # Create N agents
        if self.args.algorithm == "MADDPG":
            print("Algorithm: MADDPG")
            # self.agent_n = [MADDPG(args, agent_id) for agent_id in range(args.N)]
        elif self.args.algorithm == "MATD3":
            print("Algorithm: MATD3")
            self.agent_n = [MATD3(args, agent_id) for agent_id in range(args.N)]
        else:
            print("Wrong!!!")

        self.replay_buffer = ReplayBuffer(self.args)

        # Create a tensorboard
        self.writer = SummaryWriter(log_dir='runs/{}/{}_env_{}_number_{}_seed_{}'.format(self.args.algorithm, self.args.algorithm, self.env_name, self.number, self.seed))

        self.evaluate_rewards = []  # Record the rewards during the evaluating
        self.total_steps = 0

        self.noise_std = self.args.noise_std_init  # Initialize noise_std

    def run(self, ):
        self.evaluate_policy()

        while self.total_steps < self.args.max_train_steps:
            obs_n = self.env.reset()
            for _ in range(self.args.episode_limit):
                # Each agent selects actions based on its own local observations(add noise for exploration)
                a_n = [agent.choose_action(obs, noise_std=self.noise_std) for agent, obs in zip(self.agent_n, obs_n)]
                # --------------------------!!!注意!!!这里一定要deepcopy,MPE环境会把a_n乘5-------------------------------------------
                obs_next_n, r_n, done_n, _ = self.env.step(copy.deepcopy(a_n))
                # Store the transition
                self.replay_buffer.store_transition(obs_n, a_n, r_n, obs_next_n, done_n)
                obs_n = obs_next_n
                self.total_steps += 1

                # Decay noise_std
                if self.args.use_noise_decay:
                    self.noise_std = self.noise_std - self.args.noise_std_decay if self.noise_std - self.args.noise_std_decay > self.args.noise_std_min else self.args.noise_std_min

                if self.replay_buffer.current_size > self.args.batch_size:
                    # Train each agent individually
                    for agent_id in range(self.args.N):
                        self.agent_n[agent_id].train(self.replay_buffer, self.agent_n)

                if self.total_steps % self.args.evaluate_freq == 0:
                    self.evaluate_policy()

                if all(done_n):
                    break

        self.env.close()
        self.env_evaluate.close()

    def evaluate_policy(self, ):
        evaluate_reward = 0
        for _ in range(self.args.evaluate_times):
            obs_n = self.env_evaluate.reset()
            episode_reward = 0
            for _ in range(self.args.episode_limit):
                a_n = [agent.choose_action(obs, noise_std=0) for agent, obs in zip(self.agent_n, obs_n)]  # We do not add noise when evaluating
                obs_next_n, r_n, done_n, _ = self.env_evaluate.step(copy.deepcopy(a_n))
                episode_reward += r_n[0]
                obs_n = obs_next_n
                if all(done_n):
                    break
            evaluate_reward += episode_reward

        evaluate_reward = evaluate_reward / self.args.evaluate_times
        self.evaluate_rewards.append(evaluate_reward)
        print("total_steps:{} \t evaluate_reward:{} \t noise_std:{}".format(self.total_steps, evaluate_reward, self.noise_std))
        self.writer.add_scalar('evaluate_step_rewards_{}'.format(self.env_name), evaluate_reward, global_step=self.total_steps)
        # Save the rewards and models
        np.save('./data_train/{}_env_{}_number_{}_seed_{}.npy'.format(self.args.algorithm, self.env_name, self.number, self.seed), np.array(self.evaluate_rewards))
        for agent_id in range(self.args.N):
            self.agent_n[agent_id].save_model(self.env_name, self.args.algorithm, self.number, self.total_steps, agent_id)
if __name__ == '__main__':
    parser = argparse.ArgumentParser("Hyperparameters Setting for MADDPG and MATD3 in MPE environment")
    parser.add_argument("--max_train_steps", type=int, default=int(1e6), help=" Maximum number of training steps")
    parser.add_argument("--episode_limit", type=int, default=25, help="Maximum number of steps per episode")
    parser.add_argument("--evaluate_freq", type=float, default=5000, help="Evaluate the policy every 'evaluate_freq' steps")
    parser.add_argument("--evaluate_times", type=float, default=3, help="Evaluate times")
    parser.add_argument("--max_action", type=float, default=1.0, help="Max action")

    parser.add_argument("--algorithm", type=str, default="MATD3", help="MADDPG or MATD3")
    parser.add_argument("--buffer_size", type=int, default=int(1e6), help="The capacity of the replay buffer")
    parser.add_argument("--batch_size", type=int, default=1024, help="Batch size")
    parser.add_argument("--hidden_dim", type=int, default=64, help="The number of neurons in hidden layers of the neural network")
    parser.add_argument("--noise_std_init", type=float, default=0.2, help="The std of Gaussian noise for exploration")
    parser.add_argument("--noise_std_min", type=float, default=0.05, help="The std of Gaussian noise for exploration")
    parser.add_argument("--noise_decay_steps", type=float, default=3e5, help="How many steps before the noise_std decays to the minimum")
    parser.add_argument("--use_noise_decay", type=bool, default=True, help="Whether to decay the noise_std")
    parser.add_argument("--lr_a", type=float, default=5e-4, help="Learning rate of actor")
    parser.add_argument("--lr_c", type=float, default=5e-4, help="Learning rate of critic")
    parser.add_argument("--gamma", type=float, default=0.95, help="Discount factor")
    parser.add_argument("--tau", type=float, default=0.01, help="Softly update the target network")
    parser.add_argument("--use_orthogonal_init", type=bool, default=True, help="Orthogonal initialization")
    parser.add_argument("--use_grad_clip", type=bool, default=True, help="Gradient clip")
    # --------------------------------------MATD3--------------------------------------------------------------------
    parser.add_argument("--policy_noise", type=float, default=0.2, help="Target policy smoothing")
    parser.add_argument("--noise_clip", type=float, default=0.5, help="Clip noise")
    parser.add_argument("--policy_update_freq", type=int, default=2, help="The frequency of policy updates")

    args = parser.parse_args()
    args.noise_std_decay = (args.noise_std_init - args.noise_std_min) / args.noise_decay_steps

    env_names = ["simple_speaker_listener", "simple_spread"]
    env_index = 0
    runner = Runner(args, env_name=env_names[env_index], number=1, seed=0)
    runner.run()

环境文件:environment

# Please write down your environment Settings
# Pay attention to the input and output of parameters
class Env:
	def __init__(self, args, discrete):
		self.args = args
		self.discrete = discrete

移植事项:

1.注意环境参数的设置格式

2.注意环境的返回值利用

3.注意主运行流程的runner.run()的相关设置,等

可借鉴:【MADRL】基于MADRL的单调价值函数分解(QMIX)算法​​​​​​ 中关于 QMIX算法移植的注意事项和代码注释。


     文章若有不当和不正确之处,还望理解与指出。由于部分文字、图片等来源于互联网,无法核实真实出处,如涉及相关争议,请联系博主删除。如有错误、疑问和侵权,欢迎评论留言联系作者,或者关注VX公众号:Rain21321,联系作者。

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。