【MADRL】多智能体深度确定性策略梯度(MADDPG )算法

举报
不去幼儿园 发表于 2024/12/20 11:04:43 2024/12/20
【摘要】 MADDPG (Multi-Agent Deep Deterministic Policy Gradient) 是一种用于多智能体强化学习环境的算法。它由2017年发布的论文《Multi-Agent Actor-Critic for Mixed Cooperative-Competitive Environments》提出。MADDPG结合了深度确定性策略梯度(DDPG)算法的思想,并对多智能体场

 本篇文章是博主强化学习RL领域学习时,用于个人学习、研究或者欣赏使用,并基于博主对相关等领域的一些理解而记录的学习摘录和笔记,若有不当和侵权之处,指出后将会立即改正,还望谅解。文章分类在强化学习专栏:

       强化学习(6)---《【MADRL】多智能体深度确定性策略梯度(MADDPG )算法》

【MADRL】多智能体深度确定性策略梯度(MADDPG )算法

目录

1.MADDPG算法详解

2.背景与动机

3.算法结构

4.具体公式

5.算法流程

6.优势与应用场景

7.结论

 [Python] MADDPG实现(可移植)


1.MADDPG算法详解

        MADDPG (Multi-Agent Deep Deterministic Policy Gradient) 是一种用于多智能体强化学习环境的算法。它由2017年发布的论文《Multi-Agent Actor-Critic for Mixed Cooperative-Competitive Environments》提出。MADDPG结合了深度确定性策略梯度(DDPG)算法的思想,并对多智能体场景进行了扩展,能够处理混合的协作与竞争环境。

链接:《Multi-Agent Actor-Critic for Mixed Cooperative-Competitive Environments》

代码:  MADRL多智能体深度确定性策略梯度(MADDPG )算法 

其他多智能体深度强化学习(MADRL)算法见下面博客:

【MADRL】多智能体深度强化学习《纲要》


2.背景与动机

        在多智能体系统中,多个智能体同时作用于同一个环境,相互之间可能是竞争的、协作的,或者二者兼有。这类环境下,单智能体算法如DDPG往往无法取得较好的效果,因为每个智能体的行为都会影响其他智能体的状态和奖励。为了解决这一问题,MADDPG采用了一种集中式训练,分布式执行的架构。

  • 集中式训练:训练期间,每个智能体可以观察所有其他智能体的动作和状态,从而学到更有效的策略。
  • 分布式执行:在执行阶段,智能体只依赖其自身的观测来做出决策,保持分布式控制的特性。


3.算法结构

        MADDPG是基于Actor-Critic结构的,其中每个智能体都有自己的Actor和Critic模型。Actor用于输出动作策略,而Critic用于评估动作的价值。该算法的独特之处在于,Critic模型是全局的,即Critic不仅依赖于单个智能体的状态和动作,还使用所有智能体的状态和动作。


4.具体公式

MADDPG扩展了DDPG的公式,针对多智能体环境进行如下改动:

  1. 环境设定

    • 状态( s \in S ) 表示整个环境的状态。
    • 对于每个智能体( i ),它的观测值( o_i \in O ) 是全局状态的一部分。
    • 每个智能体采取动作 ( a_i \in A )
    • 每个智能体根据其策略( \pi_i(o_i) ) 选择动作。
  2. 目标:每个智能体的目标是最大化其期望累积回报( R_i = \mathbb{E}[\sum_t \gamma^t r_i^t] ),其中( r_i^t ) 是时刻( t )智能体 ( i ) 的即时奖励,( \gamma )是折扣因子。

  3. Critic网络:每个智能体 ( i )的 Critic 网络( Q_i(s, a_1, ..., a_N) )估计全局的状态和所有智能体动作的联合Q值。这个Q值函数可以通过Bellman方程进行更新:(或者其他方式)

    [ L(\theta_i) = \mathbb{E}_{s,a,r,s'} \left[ \left( Q_i(s, a_1, ..., a_N; \theta_i) - y \right)^2 \right] ]

    其中目标值( y ) 为:

    [ y = r_i + \gamma Q'_i(s', a'_1, ..., a'_N; \theta'_i) ]

    这里,( Q'_i )是目标Critic网络,动作( a'_j )是通过各自的目标Actor策略选出的动作。

  4. Actor网络:每个智能体( i ) 的Actor策略是通过最大化其Critic函数的期望来更新的:

    [ \nabla_{\theta_{\pi_i}} J(\pi_i) = \mathbb{E}{s, a} \left[ \nabla{a_i} Q_i(s, a_1, ..., a_N) \nabla_{\theta_{\pi_i}} \pi_i(o_i) \right] ]

    通过策略梯度法对Actor网络的参数 ( \theta_{\pi_i} )进行更新。

  5. 去中心化执行:在实际执行过程中,每个智能体根据其自身的观测值( o_i ) 通过策略( \pi_i(o_i) ) 选择动作。


5.算法流程

  1. 初始化:为每个智能体( i )初始化Actor网络( \pi_i )和Critic网络( Q_i ) 以及对应的目标网络( \pi'_i )( Q'_i )

  2. 交互:智能体与环境进行交互,在每个时刻 ( t ),每个智能体根据其策略( \pi_i(o_i) )选择动作( a_i ),环境返回下一个状态和奖励( r_i )

  3. 存储经验:将状态、动作、奖励和下一个状态存入经验回放池。

  4. 采样与更新:从经验回放池中采样一个批次,使用前述的公式更新每个智能体的Critic和Actor网络。

  5. 软更新目标网络:以慢速的方式更新目标网络的参数:

    [ \theta'_i \leftarrow \tau \theta_i + (1 - \tau) \theta'_i ]

  6. 重复:重复交互和更新过程,直到训练完成。


6.优势与应用场景

  • 解决多智能体环境中的非平稳性问题:由于多个智能体的存在,环境对每个智能体来说是非平稳的。MADDPG通过中心化的Critic结构来应对这一问题,确保在训练过程中,每个智能体都能有效学习到策略。

  • 处理协作与竞争混合的环境:MADDPG非常适合混合了协作和竞争的多智能体环境,因为它允许智能体通过全局视角进行策略学习,但在执行时保持去中心化。


7.结论

        MADDPG是一种针对多智能体系统的强化学习算法,结合了Actor-Critic框架和集中式训练分布式执行的思想,能够有效处理协作与竞争共存的复杂环境。通过引入全局信息进行训练,它显著提高了多智能体环境下的学习效果,同时保留了分布式控制的灵活性。


 [Python] MADDPG实现(可移植)

        若是下面代码复现困难或者有问题,欢迎评论区留言;需要以整个项目形式的代码,请在评论区留下您的邮箱,以便于及时分享给您(私信难以及时回复)。

主文件:MADDPG_MATD3_main

import torch
import numpy as np
from torch.utils.tensorboard import SummaryWriter
from environment import Env
import argparse
from replay_buffer import ReplayBuffer
from maddpg import MADDPG
from matd3 import MATD3
import copy


class Runner:
    def __init__(self, args, env_name, number, seed):
        self.args = args
        self.env_name = env_name
        self.number = number
        self.seed = seed
        # Create env
        self.env = Env(env_name, discrete=False)  # Continuous action space
        self.env_evaluate = Env(env_name, discrete=False)
        self.args.N = self.env.n  # The number of agents
        self.args.obs_dim_n = [self.env.observation_space[i].shape[0] for i in range(self.args.N)]  # obs dimensions of N agents
        self.args.action_dim_n = [self.env.action_space[i].shape[0] for i in range(self.args.N)]  # actions dimensions of N agents
        print("observation_space=", self.env.observation_space)
        print("obs_dim_n={}".format(self.args.obs_dim_n))
        print("action_space=", self.env.action_space)
        print("action_dim_n={}".format(self.args.action_dim_n))

        # Set random seed
        np.random.seed(self.seed)
        torch.manual_seed(self.seed)

        # Create N agents
        if self.args.algorithm == "MADDPG":
            print("Algorithm: MADDPG")
            self.agent_n = [MADDPG(args, agent_id) for agent_id in range(args.N)]
        elif self.args.algorithm == "MATD3":
            print("Algorithm: MATD3")
            self.agent_n = [MATD3(args, agent_id) for agent_id in range(args.N)]
        else:
            print("Wrong!!!")

        self.replay_buffer = ReplayBuffer(self.args)

        # Create a tensorboard
        self.writer = SummaryWriter(log_dir='runs/{}/{}_env_{}_number_{}_seed_{}'.format(self.args.algorithm, self.args.algorithm, self.env_name, self.number, self.seed))

        self.evaluate_rewards = []  # Record the rewards during the evaluating
        self.total_steps = 0

        self.noise_std = self.args.noise_std_init  # Initialize noise_std

    def run(self, ):
        self.evaluate_policy()

        while self.total_steps < self.args.max_train_steps:
            obs_n = self.env.reset()
            for _ in range(self.args.episode_limit):
                # Each agent selects actions based on its own local observations(add noise for exploration)
                a_n = [agent.choose_action(obs, noise_std=self.noise_std) for agent, obs in zip(self.agent_n, obs_n)]
                # --------------------------!!!注意!!!这里一定要deepcopy,MPE环境会把a_n乘5-------------------------------------------
                obs_next_n, r_n, done_n, _ = self.env.step(copy.deepcopy(a_n))
                # Store the transition
                self.replay_buffer.store_transition(obs_n, a_n, r_n, obs_next_n, done_n)
                obs_n = obs_next_n
                self.total_steps += 1

                # Decay noise_std
                if self.args.use_noise_decay:
                    self.noise_std = self.noise_std - self.args.noise_std_decay if self.noise_std - self.args.noise_std_decay > self.args.noise_std_min else self.args.noise_std_min

                if self.replay_buffer.current_size > self.args.batch_size:
                    # Train each agent individually
                    for agent_id in range(self.args.N):
                        self.agent_n[agent_id].train(self.replay_buffer, self.agent_n)

                if self.total_steps % self.args.evaluate_freq == 0:
                    self.evaluate_policy()

                if all(done_n):
                    break

        self.env.close()
        self.env_evaluate.close()

    def evaluate_policy(self, ):
        evaluate_reward = 0
        for _ in range(self.args.evaluate_times):
            obs_n = self.env_evaluate.reset()
            episode_reward = 0
            for _ in range(self.args.episode_limit):
                a_n = [agent.choose_action(obs, noise_std=0) for agent, obs in zip(self.agent_n, obs_n)]  # We do not add noise when evaluating
                obs_next_n, r_n, done_n, _ = self.env_evaluate.step(copy.deepcopy(a_n))
                episode_reward += r_n[0]
                obs_n = obs_next_n
                if all(done_n):
                    break
            evaluate_reward += episode_reward

        evaluate_reward = evaluate_reward / self.args.evaluate_times
        self.evaluate_rewards.append(evaluate_reward)
        print("total_steps:{} \t evaluate_reward:{} \t noise_std:{}".format(self.total_steps, evaluate_reward, self.noise_std))
        self.writer.add_scalar('evaluate_step_rewards_{}'.format(self.env_name), evaluate_reward, global_step=self.total_steps)
        # Save the rewards and models
        np.save('./data_train/{}_env_{}_number_{}_seed_{}.npy'.format(self.args.algorithm, self.env_name, self.number, self.seed), np.array(self.evaluate_rewards))
        for agent_id in range(self.args.N):
            self.agent_n[agent_id].save_model(self.env_name, self.args.algorithm, self.number, self.total_steps, agent_id)


if __name__ == '__main__':
    parser = argparse.ArgumentParser("Hyperparameters Setting for MADDPG and MATD3 in MPE environment")
    parser.add_argument("--max_train_steps", type=int, default=int(1e6), help=" Maximum number of training steps")
    parser.add_argument("--episode_limit", type=int, default=25, help="Maximum number of steps per episode")
    parser.add_argument("--evaluate_freq", type=float, default=5000, help="Evaluate the policy every 'evaluate_freq' steps")
    parser.add_argument("--evaluate_times", type=float, default=3, help="Evaluate times")
    parser.add_argument("--max_action", type=float, default=1.0, help="Max action")

    parser.add_argument("--algorithm", type=str, default="MATD3", help="MADDPG or MATD3")
    parser.add_argument("--buffer_size", type=int, default=int(1e6), help="The capacity of the replay buffer")
    parser.add_argument("--batch_size", type=int, default=1024, help="Batch size")
    parser.add_argument("--hidden_dim", type=int, default=64, help="The number of neurons in hidden layers of the neural network")
    parser.add_argument("--noise_std_init", type=float, default=0.2, help="The std of Gaussian noise for exploration")
    parser.add_argument("--noise_std_min", type=float, default=0.05, help="The std of Gaussian noise for exploration")
    parser.add_argument("--noise_decay_steps", type=float, default=3e5, help="How many steps before the noise_std decays to the minimum")
    parser.add_argument("--use_noise_decay", type=bool, default=True, help="Whether to decay the noise_std")
    parser.add_argument("--lr_a", type=float, default=5e-4, help="Learning rate of actor")
    parser.add_argument("--lr_c", type=float, default=5e-4, help="Learning rate of critic")
    parser.add_argument("--gamma", type=float, default=0.95, help="Discount factor")
    parser.add_argument("--tau", type=float, default=0.01, help="Softly update the target network")
    parser.add_argument("--use_orthogonal_init", type=bool, default=True, help="Orthogonal initialization")
    parser.add_argument("--use_grad_clip", type=bool, default=True, help="Gradient clip")
    # --------------------------------------MATD3--------------------------------------------------------------------
    parser.add_argument("--policy_noise", type=float, default=0.2, help="Target policy smoothing")
    parser.add_argument("--noise_clip", type=float, default=0.5, help="Clip noise")
    parser.add_argument("--policy_update_freq", type=int, default=2, help="The frequency of policy updates")

    args = parser.parse_args()
    args.noise_std_decay = (args.noise_std_init - args.noise_std_min) / args.noise_decay_steps

    env_names = ["simple_speaker_listener", "simple_spread"]
    env_index = 0
    runner = Runner(args, env_name=env_names[env_index], number=1, seed=0)
    runner.run()

环境文件:environment

# Please write down your environment Settings
# Pay attention to the input and output of parameters
class Env:
	def __init__(self, args, discrete):
		self.args = args
		self.discrete = discrete

移植事项:

1.注意环境参数的设置格式

2.注意环境的返回值利用

3.注意主运行流程的runner.run()的相关设置,等

可借鉴:【MADRL】基于MADRL的单调价值函数分解(QMIX)算法​​​​​​ 中关于 QMIX算法移植的注意事项和代码注释。


     文章若有不当和不正确之处,还望理解与指出。由于部分文字、图片等来源于互联网,无法核实真实出处,如涉及相关争议,请联系博主删除。如有错误、疑问和侵权,欢迎评论留言联系作者,或者关注VX公众号:Rain21321,联系作者。

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。