首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >【MADRL】多智能体深度确定性策略梯度(MADDPG )算法

【MADRL】多智能体深度确定性策略梯度(MADDPG )算法

作者头像
不去幼儿园
发布2024-12-03 13:10:47
发布2024-12-03 13:10:47
92800
代码可运行
举报
文章被收录于专栏:强化学习专栏强化学习专栏
运行总次数:0
代码可运行

本篇文章是博主强化学习RL领域学习时,用于个人学习、研究或者欣赏使用,并基于博主对相关等领域的一些理解而记录的学习摘录和笔记,若有不当和侵权之处,指出后将会立即改正,还望谅解。文章分类在强化学习专栏: 强化学习(6)---《【MADRL】多智能体深度确定性策略梯度(MADDPG )算法》

【MADRL】多智能体深度确定性策略梯度(MADDPG )算法

1.MADDPG算法详解

MADDPG (Multi-Agent Deep Deterministic Policy Gradient) 是一种用于多智能体强化学习环境的算法。它由2017年发布的论文《Multi-Agent Actor-Critic for Mixed Cooperative-Competitive Environments》提出。MADDPG结合了深度确定性策略梯度(DDPG)算法的思想,并对多智能体场景进行了扩展,能够处理混合的协作与竞争环境。

链接:《Multi-Agent Actor-Critic for Mixed Cooperative-Competitive Environments》

代码: MADRL多智能体深度确定性策略梯度(MADDPG )算法

其他多智能体深度强化学习(MADRL)算法见下面博客: 【MADRL】多智能体深度强化学习《纲要》


2.背景与动机

在多智能体系统中,多个智能体同时作用于同一个环境,相互之间可能是竞争的、协作的,或者二者兼有。这类环境下,单智能体算法如DDPG往往无法取得较好的效果,因为每个智能体的行为都会影响其他智能体的状态和奖励。为了解决这一问题,MADDPG采用了一种集中式训练,分布式执行的架构。

  • 集中式训练:训练期间,每个智能体可以观察所有其他智能体的动作和状态,从而学到更有效的策略。
  • 分布式执行:在执行阶段,智能体只依赖其自身的观测来做出决策,保持分布式控制的特性。

3.算法结构

MADDPG是基于Actor-Critic结构的,其中每个智能体都有自己的Actor和Critic模型。Actor用于输出动作策略,而Critic用于评估动作的价值。该算法的独特之处在于,Critic模型是全局的,即Critic不仅依赖于单个智能体的状态和动作,还使用所有智能体的状态和动作。


4.具体公式

MADDPG扩展了DDPG的公式,针对多智能体环境进行如下改动:

  1. 环境设定
    • 状态
    ( s \in S )
    ( s \in S )

    表示整个环境的状态。

    • 对于每个智能体
    ( i )
    ( i )

    ,它的观测值

    ( o_i \in O )
    ( o_i \in O )

    是全局状态的一部分。

    • 每个智能体采取动作
    ( a_i \in A )
    ( a_i \in A )

    • 每个智能体根据其策略
    ( \pi_i(o_i) )
    ( \pi_i(o_i) )

    选择动作。

  2. 目标:每个智能体的目标是最大化其期望累积回报
( R_i = \mathbb{E}[\sum_t \gamma^t r_i^t] )
( R_i = \mathbb{E}[\sum_t \gamma^t r_i^t] )

,其中

( r_i^t )
( r_i^t )

是时刻

( t )
( t )

智能体

( i )
( i )

的即时奖励,

( \gamma )
( \gamma )

是折扣因子。

  1. Critic网络:每个智能体
( i )
( i )

的 Critic 网络

( Q_i(s, a_1, ..., a_N) )
( Q_i(s, a_1, ..., a_N) )

估计全局的状态和所有智能体动作的联合Q值。这个Q值函数可以通过Bellman方程进行更新:(或者其他方式)

[ L(\theta_i) = \mathbb{E}_{s,a,r,s'} \left[ \left( Q_i(s, a_1, ..., a_N; \theta_i) - y \right)^2 \right] ]
[ L(\theta_i) = \mathbb{E}_{s,a,r,s'} \left[ \left( Q_i(s, a_1, ..., a_N; \theta_i) - y \right)^2 \right] ]

其中目标值

( y )
( y )

为:

[ y = r_i + \gamma Q'_i(s', a'_1, ..., a'_N; \theta'_i) ]
[ y = r_i + \gamma Q'_i(s', a'_1, ..., a'_N; \theta'_i) ]

这里,

( Q'_i )
( Q'_i )

是目标Critic网络,动作

( a'_j )
( a'_j )

是通过各自的目标Actor策略选出的动作。

  1. Actor网络:每个智能体
( i )
( i )

的Actor策略是通过最大化其Critic函数的期望来更新的:

[ \nabla_{\theta_{\pi_i}} J(\pi_i) = \mathbb{E}{s, a} \left[ \nabla{a_i} Q_i(s, a_1, ..., a_N) \nabla_{\theta_{\pi_i}} \pi_i(o_i) \right] ]
[ \nabla_{\theta_{\pi_i}} J(\pi_i) = \mathbb{E}{s, a} \left[ \nabla{a_i} Q_i(s, a_1, ..., a_N) \nabla_{\theta_{\pi_i}} \pi_i(o_i) \right] ]

通过策略梯度法对Actor网络的参数

( \theta_{\pi_i} )
( \theta_{\pi_i} )

进行更新。

  1. 去中心化执行:在实际执行过程中,每个智能体根据其自身的观测值
( o_i )
( o_i )

通过策略

( \pi_i(o_i) )
( \pi_i(o_i) )

选择动作。


5.算法流程

  1. 初始化:为每个智能体
( i )
( i )

初始化Actor网络

( \pi_i )
( \pi_i )

和Critic网络

( Q_i )
( Q_i )

以及对应的目标网络

( \pi'_i )
( \pi'_i )

( Q'_i )
( Q'_i )

  1. 交互:智能体与环境进行交互,在每个时刻
( t )
( t )

,每个智能体根据其策略

( \pi_i(o_i) )
( \pi_i(o_i) )

选择动作

( a_i )
( a_i )

,环境返回下一个状态和奖励

( r_i )
( r_i )

  1. 存储经验:将状态、动作、奖励和下一个状态存入经验回放池。
  2. 采样与更新:从经验回放池中采样一个批次,使用前述的公式更新每个智能体的Critic和Actor网络。
  3. 软更新目标网络:以慢速的方式更新目标网络的参数:
[ \theta'_i \leftarrow \tau \theta_i + (1 - \tau) \theta'_i ]
[ \theta'_i \leftarrow \tau \theta_i + (1 - \tau) \theta'_i ]
  1. 重复:重复交互和更新过程,直到训练完成。

6.优势与应用场景

  • 解决多智能体环境中的非平稳性问题:由于多个智能体的存在,环境对每个智能体来说是非平稳的。MADDPG通过中心化的Critic结构来应对这一问题,确保在训练过程中,每个智能体都能有效学习到策略。
  • 处理协作与竞争混合的环境:MADDPG非常适合混合了协作和竞争的多智能体环境,因为它允许智能体通过全局视角进行策略学习,但在执行时保持去中心化。

7.结论

MADDPG是一种针对多智能体系统的强化学习算法,结合了Actor-Critic框架和集中式训练分布式执行的思想,能够有效处理协作与竞争共存的复杂环境。通过引入全局信息进行训练,它显著提高了多智能体环境下的学习效果,同时保留了分布式控制的灵活性。


[Python] MADDPG实现(可移植)

若是下面代码复现困难或者有问题,欢迎评论区留言;需要以整个项目形式的代码,请在评论区留下您的邮箱,以便于及时分享给您(私信难以及时回复)。

主文件:MADDPG_MATD3_main

代码语言:javascript
代码运行次数:0
运行
复制
import torch
import numpy as np
from torch.utils.tensorboard import SummaryWriter
from environment import Env
import argparse
from replay_buffer import ReplayBuffer
from maddpg import MADDPG
from matd3 import MATD3
import copy


class Runner:
    def __init__(self, args, env_name, number, seed):
        self.args = args
        self.env_name = env_name
        self.number = number
        self.seed = seed
        # Create env
        self.env = Env(env_name, discrete=False)  # Continuous action space
        self.env_evaluate = Env(env_name, discrete=False)
        self.args.N = self.env.n  # The number of agents
        self.args.obs_dim_n = [self.env.observation_space[i].shape[0] for i in range(self.args.N)]  # obs dimensions of N agents
        self.args.action_dim_n = [self.env.action_space[i].shape[0] for i in range(self.args.N)]  # actions dimensions of N agents
        print("observation_space=", self.env.observation_space)
        print("obs_dim_n={}".format(self.args.obs_dim_n))
        print("action_space=", self.env.action_space)
        print("action_dim_n={}".format(self.args.action_dim_n))

        # Set random seed
        np.random.seed(self.seed)
        torch.manual_seed(self.seed)

        # Create N agents
        if self.args.algorithm == "MADDPG":
            print("Algorithm: MADDPG")
            self.agent_n = [MADDPG(args, agent_id) for agent_id in range(args.N)]
        elif self.args.algorithm == "MATD3":
            print("Algorithm: MATD3")
            self.agent_n = [MATD3(args, agent_id) for agent_id in range(args.N)]
        else:
            print("Wrong!!!")

        self.replay_buffer = ReplayBuffer(self.args)

        # Create a tensorboard
        self.writer = SummaryWriter(log_dir='runs/{}/{}_env_{}_number_{}_seed_{}'.format(self.args.algorithm, self.args.algorithm, self.env_name, self.number, self.seed))

        self.evaluate_rewards = []  # Record the rewards during the evaluating
        self.total_steps = 0

        self.noise_std = self.args.noise_std_init  # Initialize noise_std

    def run(self, ):
        self.evaluate_policy()

        while self.total_steps < self.args.max_train_steps:
            obs_n = self.env.reset()
            for _ in range(self.args.episode_limit):
                # Each agent selects actions based on its own local observations(add noise for exploration)
                a_n = [agent.choose_action(obs, noise_std=self.noise_std) for agent, obs in zip(self.agent_n, obs_n)]
                # --------------------------!!!注意!!!这里一定要deepcopy,MPE环境会把a_n乘5-------------------------------------------
                obs_next_n, r_n, done_n, _ = self.env.step(copy.deepcopy(a_n))
                # Store the transition
                self.replay_buffer.store_transition(obs_n, a_n, r_n, obs_next_n, done_n)
                obs_n = obs_next_n
                self.total_steps += 1

                # Decay noise_std
                if self.args.use_noise_decay:
                    self.noise_std = self.noise_std - self.args.noise_std_decay if self.noise_std - self.args.noise_std_decay > self.args.noise_std_min else self.args.noise_std_min

                if self.replay_buffer.current_size > self.args.batch_size:
                    # Train each agent individually
                    for agent_id in range(self.args.N):
                        self.agent_n[agent_id].train(self.replay_buffer, self.agent_n)

                if self.total_steps % self.args.evaluate_freq == 0:
                    self.evaluate_policy()

                if all(done_n):
                    break

        self.env.close()
        self.env_evaluate.close()

    def evaluate_policy(self, ):
        evaluate_reward = 0
        for _ in range(self.args.evaluate_times):
            obs_n = self.env_evaluate.reset()
            episode_reward = 0
            for _ in range(self.args.episode_limit):
                a_n = [agent.choose_action(obs, noise_std=0) for agent, obs in zip(self.agent_n, obs_n)]  # We do not add noise when evaluating
                obs_next_n, r_n, done_n, _ = self.env_evaluate.step(copy.deepcopy(a_n))
                episode_reward += r_n[0]
                obs_n = obs_next_n
                if all(done_n):
                    break
            evaluate_reward += episode_reward

        evaluate_reward = evaluate_reward / self.args.evaluate_times
        self.evaluate_rewards.append(evaluate_reward)
        print("total_steps:{} \t evaluate_reward:{} \t noise_std:{}".format(self.total_steps, evaluate_reward, self.noise_std))
        self.writer.add_scalar('evaluate_step_rewards_{}'.format(self.env_name), evaluate_reward, global_step=self.total_steps)
        # Save the rewards and models
        np.save('./data_train/{}_env_{}_number_{}_seed_{}.npy'.format(self.args.algorithm, self.env_name, self.number, self.seed), np.array(self.evaluate_rewards))
        for agent_id in range(self.args.N):
            self.agent_n[agent_id].save_model(self.env_name, self.args.algorithm, self.number, self.total_steps, agent_id)


if __name__ == '__main__':
    parser = argparse.ArgumentParser("Hyperparameters Setting for MADDPG and MATD3 in MPE environment")
    parser.add_argument("--max_train_steps", type=int, default=int(1e6), help=" Maximum number of training steps")
    parser.add_argument("--episode_limit", type=int, default=25, help="Maximum number of steps per episode")
    parser.add_argument("--evaluate_freq", type=float, default=5000, help="Evaluate the policy every 'evaluate_freq' steps")
    parser.add_argument("--evaluate_times", type=float, default=3, help="Evaluate times")
    parser.add_argument("--max_action", type=float, default=1.0, help="Max action")

    parser.add_argument("--algorithm", type=str, default="MATD3", help="MADDPG or MATD3")
    parser.add_argument("--buffer_size", type=int, default=int(1e6), help="The capacity of the replay buffer")
    parser.add_argument("--batch_size", type=int, default=1024, help="Batch size")
    parser.add_argument("--hidden_dim", type=int, default=64, help="The number of neurons in hidden layers of the neural network")
    parser.add_argument("--noise_std_init", type=float, default=0.2, help="The std of Gaussian noise for exploration")
    parser.add_argument("--noise_std_min", type=float, default=0.05, help="The std of Gaussian noise for exploration")
    parser.add_argument("--noise_decay_steps", type=float, default=3e5, help="How many steps before the noise_std decays to the minimum")
    parser.add_argument("--use_noise_decay", type=bool, default=True, help="Whether to decay the noise_std")
    parser.add_argument("--lr_a", type=float, default=5e-4, help="Learning rate of actor")
    parser.add_argument("--lr_c", type=float, default=5e-4, help="Learning rate of critic")
    parser.add_argument("--gamma", type=float, default=0.95, help="Discount factor")
    parser.add_argument("--tau", type=float, default=0.01, help="Softly update the target network")
    parser.add_argument("--use_orthogonal_init", type=bool, default=True, help="Orthogonal initialization")
    parser.add_argument("--use_grad_clip", type=bool, default=True, help="Gradient clip")
    # --------------------------------------MATD3--------------------------------------------------------------------
    parser.add_argument("--policy_noise", type=float, default=0.2, help="Target policy smoothing")
    parser.add_argument("--noise_clip", type=float, default=0.5, help="Clip noise")
    parser.add_argument("--policy_update_freq", type=int, default=2, help="The frequency of policy updates")

    args = parser.parse_args()
    args.noise_std_decay = (args.noise_std_init - args.noise_std_min) / args.noise_decay_steps

    env_names = ["simple_speaker_listener", "simple_spread"]
    env_index = 0
    runner = Runner(args, env_name=env_names[env_index], number=1, seed=0)
    runner.run()

环境文件:environment

代码语言:javascript
代码运行次数:0
运行
复制
# Please write down your environment Settings
# Pay attention to the input and output of parameters
class Env:
	def __init__(self, args, discrete):
		self.args = args
		self.discrete = discrete

移植事项:

1.注意环境参数的设置格式

2.注意环境的返回值利用

3.注意主运行流程的runner.run()的相关设置,等

可借鉴:【MADRL】基于MADRL的单调价值函数分解(QMIX)算法​​​​​​ 中关于 QMIX算法移植的注意事项和代码注释。

文章若有不当和不正确之处,还望理解与指出。由于部分文字、图片等来源于互联网,无法核实真实出处,如涉及相关争议,请联系博主删除。如有错误、疑问和侵权,欢迎评论留言联系作者

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-09-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 【MADRL】多智能体深度确定性策略梯度(MADDPG )算法
    • 1.MADDPG算法详解
    • 2.背景与动机
    • 3.算法结构
    • 4.具体公式
    • 5.算法流程
    • 6.优势与应用场景
    • 7.结论
    • [Python] MADDPG实现(可移植)
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档