本篇文章是博主强化学习RL领域学习时,用于个人学习、研究或者欣赏使用,并基于博主对相关等领域的一些理解而记录的学习摘录和笔记,若有不当和侵权之处,指出后将会立即改正,还望谅解。文章强化学习:
强化学习(4)---《【MADRL】独立Q学习(IQL)算法》
【MADRL】独立Q学习(IQL)算法
目录
0. 前言
1. 背景与挑战
2. IQL 的基本框架
3. 算法流程
4. 优点与应用
5. 局限性
6. 改进与扩展
7. 总结
[Python] IQL实现(可移植)
0. 前言
独立Q学习 ---- IQL(Independent Q-Learning)是多智能体强化学习(Multi-Agent Reinforcement Learning, MARL)中一种经典且简单的算法,主要思想是将每个智能体视为独立的学习者,各自执行单智能体的 Q-learning 算法。尽管 IQL 是一个早期的方法,但它在处理多智能体任务时具有一定的应用价值。

其他多智能体深度强化学习(MADRL)算法见下面博客:
【MADRL】多智能体深度强化学习《纲要》
1. 背景与挑战
在多智能体环境中,各个智能体需要同时学习并制定策略。这些智能体的决策和行为相互影响,因此单一智能体的 Q-learning 在此类环境中存在一些挑战:
- 非平稳性:在训练过程中,由于其他智能体的策略不断更新,单个智能体所观察到的环境会变得非平稳,这使得传统的 Q-learning 难以收敛。
- 维度灾难:随着智能体数量的增加,状态和动作空间会呈指数增长,增加了学习的难度。
IQL 作为一种简单的基线算法,通过将多智能体问题分解为多个单智能体问题,从而规避了上述问题。
2. IQL 的基本框架
IQL 的核心思想是将每个智能体视为独立的学习者,忽略其他智能体的存在。具体来说:
- 独立 Q-learning (Independent Q-Learning):每个智能体独立运行 Q-learning 算法,基于自己的观测和动作学习 Q 函数。
- 无协作假设:IQL 假设每个智能体都是自私的,只根据自身的奖励来优化策略,而不考虑其他智能体的行为或奖励。
3. 算法流程
IQL 的具体流程如下:
- 初始化 Q 函数:对于每个智能体
,初始化其 Q 函数
。
- 经验采样:在每一轮中,每个智能体独立地选择动作
,与环境进行交互并获得奖励
。
- 更新 Q 函数:根据经典的 Q-learning 更新规则,智能体
的 Q 函数更新为:
其中,
是学习率,
是折扣因子。
- 策略更新:每个智能体基于更新后的 Q 函数选择动作,通常采用
-贪心策略。
4. 优点与应用
- 简单易实现:IQL 是多智能体强化学习中的一个简单基线算法,易于实现和扩展。
- 计算开销小:IQL 直接使用单智能体的 Q-learning 更新规则,没有额外的计算复杂度,适合于处理规模较大的多智能体系统。
- 应用场景:IQL 适用于一些松散耦合的多智能体任务,如多无人机导航、分布式资源管理等场景。
5. 局限性
尽管 IQL 实现简单,但也存在一些明显的缺陷:
- 忽略智能体间的相互作用:IQL 完全忽略了智能体之间的协作和竞争关系,这在需要协作的任务中表现不佳。
- 非平稳性问题:由于其他智能体的策略在不断变化,环境对于单个智能体而言是非平稳的,可能导致 Q 函数难以收敛。
- 可扩展性差:当智能体数量增加或任务复杂度提升时,IQL 的性能可能会显著下降。
6. 改进与扩展
为了克服 IQL 的局限性,后续提出了许多改进和扩展方法,如:
- Centralized Training with Decentralized Execution (CTDE):将训练阶段的智能体策略进行集中化管理,但在执行阶段智能体独立决策。
- Cooperative MARL Algorithms:如 QMIX、VDN 等,通过引入混合网络和价值分解来考虑智能体间的协作关系。
7. 总结
IQL 是多智能体强化学习中一种简单且经典的算法,尽管它在处理复杂协作任务时表现有限,但作为一种基线算法,它为后续更加复杂的多智能体方法提供了重要参考。
[Python] IQL实现(可移植)
"""
@content: IQL
@author: 不去幼儿园
@Timeline: 2024.08.21
"""
import random
from torch import nn
import torch
import numpy as np
import collections
import matplotlib.pyplot as plt
from env_base import Env
import argparse
from scipy.signal import savgol_filter
import time
import os
from tensorboardX import SummaryWriter
writer = SummaryWriter()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device: ", device)
class ReplayBuffer:
def __init__(self, capacity):
self.buffer = collections.deque(maxlen=capacity)
def add(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
transitions = random.sample(self.buffer, batch_size)
state, action, reward, next_state, done = zip(*transitions)
return np.array(state), action, reward, np.array(next_state), done
def size(self):
return len(self.buffer)
class DqnNet(nn.Module):
def __init__(self, n_states, n_actions):
super(DqnNet, self).__init__()
self.model = nn.Sequential(
nn.Linear(n_states, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, n_actions)
)
def forward(self, state):
return self.model(state)
class Agent(object):
def __init__(self, identifier, n_states, n_hidden, n_actions, learning_rate, gamma, epsilon, target_update, device=None):
self.identifier = identifier
self.n_states = n_states
self.n_hidden = n_hidden
self.n_actions = n_actions
self.learning_rate = learning_rate
self.gamma = gamma
self.epsilon = epsilon
self.target_update = target_update
self.device = device
self.Q_net = DqnNet(self.n_states, self.n_actions)
self.Q_target_net = DqnNet(self.n_states, self.n_actions)
self.optimizer = torch.optim.Adam(self.Q_net.parameters(), lr=self.learning_rate)
self.count = 0
def take_action(self, state):
if np.random.uniform(0, 1) < self.epsilon:
state = torch.tensor(state, dtype=torch.float)
action = torch.argmax(self.Q_net(state)).item()
else:
action = np.random.randint(0, self.n_actions, 1)
return int(action)
def update(self, transition_dict):
states = torch.tensor(transition_dict["states"], dtype=torch.float)
actions = torch.tensor(transition_dict["actions"]).view(-1, 1)
rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1)
next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float)
dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1)
predict_q_values = self.Q_net(states).gather(1, actions)
with torch.no_grad():
max_next_q_values = self.Q_target_net(next_states).max(1)[0].view(-1, 1)
q_targets = rewards + self.gamma * max_next_q_values * (1 - dones)
dqn_loss = nn.MSELoss()(predict_q_values, q_targets)
self.optimizer.zero_grad()
dqn_loss.backward()
self.optimizer.step()
if self.count % self.target_update == 0:
self.Q_target_net.load_state_dict(self.Q_net.state_dict())
self.count += 1
def run(env, agent_list, replay_buffer_list, batch_size):
state = env.reset()
reward_total = [0. for _ in range(env.uav_num)]
done = False
while not done:
"""训练主流程代码"""
action_list = []
for i, agent in enumerate(agent_list):
action = agent.take_action(state[i])
action_list.append(action)
"""环境步进更新,返回下一个状态值、奖励值、环境结束状态等
@移植注意:返回值格式问题
"""
next_state, reward, done_, eval_infos = env.step(action_list)
reward = list(np.array(reward).flatten())
done = done_[0]
for j, replay_buffer in enumerate(replay_buffer_list):
replay_buffer.add(state[j], action_list[j], reward[j], next_state[j], done)
if replay_buffer.size() > batch_size:
s, a, r, ns, d = replay_buffer.sample(batch_size)
transition_dict = {
'states': s,
'actions': a,
'next_states': ns,
'rewards': r,
'dones': d,
}
agent_list[j].update(transition_dict)
state = next_state
reward_total = [x + y for x, y in zip(reward_total, reward)]
return reward_total
def run_test(env, agent_list, show_flag, total_num):
state = env.reset()
reward_episode_eva = [0. for _ in range(env.uav_num)]
done = False
while not done:
"""测试主流程代码"""
action_list = []
for i, agent in enumerate(agent_list):
action = agent.take_action(state[i])
action_list.append(action)
"""环境步进更新,返回下一个状态值、奖励值、环境结束状态等
@移植注意:返回值格式问题
"""
next_state, reward, done_, eval_infos = env.step(action_list)
reward = list(np.array(reward).flatten())
done = done_[0]
reward_episode_eva = [x + y for x, y in zip(reward_episode_eva, reward)]
state = next_state
goal_num_buffer = env.get_state_data()
goal_num_buffer = np.array(goal_num_buffer)
log_flag = ["state/target_one_num", "state/target_two_num", "state/barrier_crash_num", "state/coverage_ratio"]
agent_flag = ["agent/agent01", "agent/agent02", "agent/agent03"]
for i in range(len(goal_num_buffer)):
goal_num = goal_num_buffer[i]
goal_num = {log_flag[i]: goal_num}
log_state(name=log_flag[i], state=goal_num, step=total_num)
for i in range(3):
agent = reward_episode_eva[i]
agent = {agent_flag[i]: agent}
log_state(name=agent_flag[i], state=agent, step=total_num)
reward_total = {"state/reward_total": sum(reward_episode_eva)}
log_state(name="state/reward_total", state=reward_total, step=total_num)
return reward_total
"""tensorboard结果展示函数"""
def log_state(name, state, step):
writer.add_scalars(name, state, step)
def display(return_list, test_reward_list):
timestamp = time.strftime("%Y%m%d%H%M%S")
result_path = os.path.dirname(os.path.realpath(__file__)) + '/results/txt/'
plt.figure(2)
window_length = 31
polyorder = 2
smoothed_data_1 = savgol_filter(np.array(return_list), window_length, polyorder)
plt.plot(smoothed_data_1, label='Smoothed Data', alpha=1)
plt.plot(list(range(len(np.array(return_list)))), np.array(return_list), label='Original Data', alpha=0.2)
plt.xlabel('Episodes')
plt.ylabel('Reward')
plt.legend()
plt.title('IQL Train on {}'.format(env_name))
plt.savefig(result_path + f"train_reward_{timestamp}.png", format='png')
np.savetxt(result_path + f'/epi_reward_{timestamp}.txt', return_list)
plt.figure(3)
smoothed_data_2 = savgol_filter(np.array(test_reward_list), window_length, polyorder)
plt.plot(smoothed_data_2, label='Smoothed Data', alpha=1)
plt.plot(list(range(len(np.array(test_reward_list)))), np.array(test_reward_list), alpha=0.2, label='Original Data')
plt.xlabel('Episodes')
plt.ylabel('Test-Reward')
plt.legend()
plt.title('IQL Test on {}'.format(env_name, env.map_size))
plt.savefig(result_path + f"test_reward_{timestamp}.png", format='png')
plt.show()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
EnvArgs = parser.parse_args()
EnvArgs.map_size = 10
EnvArgs.uav_num = 3
EnvArgs.env_step_time = 50
env = Env(args=EnvArgs)
env_name = "Environment"
n_states, n_actions = env.get_state_space()[0], env.get_action_space()
capacity = 5000
lr = 6e-3
gamma = 0.98
epsilon = 0.99
target_update = 1
batch_size = 64
n_hidden = 128
return_list = []
test_reward_list = []
replay_buffer_list = [ReplayBuffer(capacity) for _ in range(env.uav_num)]
num_episodes = 8000
num_test = 8000/5
agent_list = []
total_num = 0
for i in range(0, EnvArgs.uav_num):
new_agent = Agent(identifier=i, n_states=n_states, n_hidden=n_hidden, n_actions=n_actions, learning_rate=lr,
gamma=gamma, epsilon=epsilon, target_update=target_update, device=device)
agent_list.append(new_agent)
for episode in range(num_episodes):
reward_test = run(env=env, agent_list=agent_list, replay_buffer_list=replay_buffer_list, batch_size=batch_size)
total_num += 50
if episode % 5 == 0:
run_test(env=env, agent_list=agent_list, show_flag=False, total_num=total_num)
print(f"第{episode / 5 + 1}轮测试,奖励值为{reward_test[0]}")
print(f"总的运行次数{total_num}")
文章若有不当和不正确之处,还望理解与指出。由于部分文字、图片等来源于互联网,无法核实真实出处,如涉及相关争议,请联系博主删除。如有错误、疑问和侵权,欢迎评论留言联系作者,或者关注VX公众号:Rain21321,联系作者。
评论(0)