📢本篇文章是博主强化学习(RL)领域学习时,用于个人学习、研究或者欣赏使用,并基于博主对相关等领域的一些理解而记录的学习摘录和笔记,若有不当和侵权之处,指出后将会立即改正,还望谅解。文章分类在👉强化学习专栏: 【强化学习】(41)---《【RL】强化学习入门:从基础到应用》
事后经验回放,Hindsight Experience Replay (HER) 是一种在稀疏奖励强化学习环境下提高智能体学习效率的策略。稀疏奖励问题是指智能体在多数状态下无法获得有价值的反馈,因为奖励信号极其稀少或完全没有。HER通过回顾智能体过去未能实现的目标,将这些“失败”的经验转换为有价值的学习机会,从而极大地提高了智能体在稀疏奖励场景中的学习效率。
HER算法最早由OpenAI团队提出,主要用于解决目标导向的强化学习任务,其中智能体的目标是达到某个特定的状态(例如到达某个地点或完成某个任务),但由于奖励稀疏,智能体很难获得足够的反馈进行有效学习。(这已经是被广泛利用的机制了)
HER的核心思想是,智能体在与环境交互过程中,即使没有成功实现原定目标,也可以将其经验回放(Replay)并假设达成了某个替代目标,从而将“失败”的经验转化为有用的学习信号。
HER的几个关键概念:
在目标导向的强化学习框架下,智能体的策略不仅依赖于当前状态,还依赖于目标
。传统的状态-动作对
被扩展为状态-目标-动作三元组
,智能体的目标是学会根据不同的目标采取最优动作,从而最大化奖励。
当智能体执行某一动作后,并未能实现原定目标
,HER会在回放池中随机选择某个已经到达的状态
作为替代目标
。接着,HER会对整个经验进行重新标注,假设智能体的目标是实现
,而不是原始目标
。通过这种方式,即便智能体未能实现初始目标,也可以通过实现其他目标进行学习。
公式上,经验回放会从
被重新标注为
,其中
是基于新的替代目标
计算的奖励。
HER的回放策略假设成功达成目标的情况下:
其中
表示指示函数,当状态
达到新的替代目标
时,奖励为1。
在HER中,使用了多种回放策略来选择替代目标:
这些策略的引入使得智能体可以从不同角度回顾经验,提升其在不同情境下的学习能力。
智能体与环境交互,采集经验数据。对于每个时间步
,记录状态
、动作
、原定目标
和奖励
,并存储到经验回放池中。
在每次经验回放时,从经验回放池中取出一条轨迹,随机选择一个替代目标
。此时会将轨迹中的所有目标
替换为新的替代目标
,并重新计算奖励。
利用带有替代目标的经验对智能体进行训练。训练过程中,智能体通过这些“虚拟成功”的经验来更新其策略和价值函数。
HER中的策略更新与经典的深度强化学习算法(如DDPG或DQN)结合使用。通过对经验回放池中的替代目标进行训练,智能体可以更加高效地学习到如何在不同目标下采取最优的行动。
以下是一个简化的HER在强化学习中的工作流程,结合深度确定性策略梯度(DDPG)算法:
🔥若是下面代码复现困难或者有问题,欢迎评论区留言;需要以整个项目形式的代码,请在评论区留下您的邮箱📌,以便于及时分享给您(私信难以及时回复)。
"""《HER 方法实现简单流程》
时间:2024.10.
作者:不去幼儿园
"""
import numpy as np
class HER:
def __init__(self, env, agent, replay_buffer):
self.env = env
self.agent = agent
self.replay_buffer = replay_buffer
def generate_trajectory(self):
state = self.env.reset()
done = False
trajectory = []
while not done:
action = self.agent.select_action(state)
next_state, reward, done, _ = self.env.step(action)
trajectory.append((state, action, reward, next_state))
state = next_state
return trajectory
def hindsight_replay(self, trajectory):
# 随机选择替代目标
final_state = trajectory[-1][3]
for (state, action, reward, next_state) in trajectory:
# 替代目标回放
goal = final_state # 简单起见,使用最终状态作为替代目标
new_reward = 1.0 if np.array_equal(next_state, goal) else 0.0
self.replay_buffer.add(state, action, new_reward, next_state, goal)
def train(self, num_episodes):
for episode in range(num_episodes):
trajectory = self.generate_trajectory()
self.hindsight_replay(trajectory)
self.agent.train(self.replay_buffer)
使用 Hindsight Experience Replay (HER) 结合 深度确定性策略梯度(DDPG) 算法,并在 CartPole 环境中实现的完整 PyTorch 代码。HER 能够在目标失败时,生成新经验,将原本的失败经验转化为新的成功经验。DDPG 是一种强大的基于策略梯度的方法,适合解决连续动作空间问题。
CartPole 的动作空间是离散的,通常使用 DDPG 和 HER 需要连续动作空间,因此在这里,对 CartPole 的离散动作做了一个简单的映射,将其转化为连续动作。
参数设置:
"""《 HER+DDPG 方法实现简单流程》
时间:2024.10.
作者:不去幼儿园
"""
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
# Hyperparameters
GAMMA = 0.99
TAU = 0.005
ACTOR_LR = 1e-4
CRITIC_LR = 1e-3
BUFFER_SIZE = 1000000
BATCH_SIZE = 64
HER_K = 4 # HER 技术的 k 值
网络配置:
# Actor network
class Actor(nn.Module):
def __init__(self, state_dim, action_dim):
super(Actor, self).__init__()
self.fc1 = nn.Linear(state_dim, 256)
self.fc2 = nn.Linear(256, 256)
self.fc3 = nn.Linear(256, action_dim)
def forward(self, state):
x = torch.relu(self.fc1(state))
x = torch.relu(self.fc2(x))
action = torch.tanh(self.fc3(x)) # 使用 tanh 激活函数将输出限制在 [-1, 1]
return action
class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super(Critic, self).__init__()
# 调整 fc1 的输入维度为 5
self.fc1 = nn.Linear(5, 256) # 输入维度为 5 (4 for state + 1 for action)
self.fc2 = nn.Linear(256, 256)
self.fc3 = nn.Linear(256, 1)
def forward(self, state, action):
# 将 state 和 action 拼接,确保维度是 (256, 5)
x = torch.cat([state, action], dim=1)
x = torch.relu(self.fc1(x)) # fc1 输入维度现在匹配输入
x = torch.relu(self.fc2(x))
x = self.fc3(x)
return x
经验池配置:
# Replay Buffer with HER
class ReplayBuffer:
def __init__(self, buffer_size, state_dim, action_dim, goal_dim):
self.buffer = deque(maxlen=buffer_size)
self.state_dim = state_dim
self.action_dim = action_dim
self.goal_dim = goal_dim
def add(self, state, action, reward, next_state, done, goal):
# goal 是一个包含两个元素的元组,提取第一个元素作为实际的 goal
goal = goal[0] # 仅提取数组部分,忽略空字典
# print(f"Processed Goal: {goal}") # 调试输出处理后的 goal
# 确保所有数据都是 NumPy 数组并具有一致的形状
state = np.array(state).reshape(-1)
action = np.array(action).reshape(self.action_dim)
next_state = np.array(next_state).reshape(-1)
goal = np.array(goal).reshape(-1) # 保持 goal 为一维数组
self.buffer.append((state, action, reward, next_state, done, goal))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones, goals = zip(*batch)
# 确保每个维度的数组形状一致
return (
np.array(states),
np.array(actions),
np.array(rewards),
np.array(next_states),
np.array(dones),
np.array(goals)
)
def her_sample(self, batch_size, k):
states, actions, rewards, next_states, dones, goals = self.sample(batch_size)
her_states, her_actions, her_rewards, her_next_states, her_dones, her_goals = [], [], [], [], [], []
for i in range(batch_size):
future_idx = np.random.randint(i, len(self.buffer), size=k)
for idx in future_idx:
new_goal = self.buffer[idx][3] # 使用未来状态作为新的 goal
her_states.append(states[i])
her_actions.append(actions[i])
her_rewards.append(-np.linalg.norm(next_states[i] - new_goal)) # 使用新目标计算奖励
her_next_states.append(next_states[i])
her_dones.append(dones[i]) # 确保 dones 被正确加入
her_goals.append(new_goal)
# 返回 6 个值,而不是 5 个
return (
np.array(her_states),
np.array(her_actions),
np.array(her_rewards),
np.array(her_next_states),
np.array(her_dones),
np.array(her_goals)
)
def __len__(self):
return len(self.buffer)
DDPG agent 配置:
# DDPG agent
class DDPGAgent:
def __init__(self, state_dim, action_dim, goal_dim):
self.actor = Actor(state_dim, action_dim).float()
self.actor_target = Actor(state_dim, action_dim).float()
self.critic = Critic(state_dim + goal_dim, action_dim).float()
self.critic_target = Critic(state_dim + goal_dim, action_dim).float()
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=ACTOR_LR)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=CRITIC_LR)
self.replay_buffer = ReplayBuffer(BUFFER_SIZE, state_dim, action_dim, goal_dim)
def update(self, batch_size):
if len(self.replay_buffer) < batch_size:
return
# 从经验池中采样
states, actions, rewards, next_states, dones, goals = self.replay_buffer.her_sample(batch_size, HER_K)
states = torch.FloatTensor(states)
actions = torch.FloatTensor(actions)
rewards = torch.FloatTensor(rewards).unsqueeze(1)
next_states = torch.FloatTensor(next_states)
dones = torch.FloatTensor(dones).unsqueeze(1)
goals = torch.FloatTensor(goals)
# Actor 更新
pred_actions = self.actor(states)
# print(f"states shape: {states.shape}, pred_actions shape: {pred_actions.shape}")
actor_loss = -self.critic(states, pred_actions).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# Critic 更新
target_actions = self.actor_target(next_states)
target_q_values = rewards + GAMMA * self.critic_target(next_states, target_actions) * (1 - dones)
critic_loss = (self.critic(states, actions) - target_q_values.detach()).pow(2).mean()
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# 软更新目标网络
self.soft_update(self.actor, self.actor_target)
self.soft_update(self.critic, self.critic_target)
def soft_update(self, net, net_target):
for param, param_target in zip(net.parameters(), net_target.parameters()):
param_target.data.copy_(TAU * param.data + (1 - TAU) * param_target.data)
def act(self, state):
state = torch.FloatTensor(state).unsqueeze(0)
action = self.actor(state).detach().numpy()[0]
return action
def add_to_buffer(self, state, action, reward, next_state, done, goal):
self.replay_buffer.add(state, action, reward, next_state, done, goal)
环境、动作处理:
# CartPole action mapping to continuous space [-1, 1]
def action_mapper(action):
return np.array([action * 2 - 1]) # 将 CartPole 的动作 0/1 映射到 [-1, 1]
# CartPole environment with continuous action space
class CartPoleContinuousWrapper(gym.ActionWrapper):
def __init__(self, env):
super(CartPoleContinuousWrapper, self).__init__(env)
def action(self, action):
# 将连续动作转换为离散动作
return int(action[0] > 0)
def reverse_action(self, action):
return np.array([1.0 if action == 1 else -1.0])
训练程序:
# Main training loop
env = CartPoleContinuousWrapper(gym.make('CartPole-v1'))
agent = DDPGAgent(state_dim=env.observation_space.shape[0], action_dim=1, goal_dim=env.observation_space.shape[0])
NUM_EPISODES = 100
MAX_STEPS = 200
for episode in range(NUM_EPISODES):
# 兼容不同版本的 Gym, env.reset() 只取需要的 state
result = env.reset()
state = result[0] if isinstance(result, tuple) else result # 兼容不同版本
goal = env.reset() # 使用初始状态作为目标
total_reward = 0
for step in range(MAX_STEPS):
action = agent.act(state)
result = env.step(action_mapper(action))
# 兼容不同版本的 Gym, env.step() 取前四个返回值
next_state, reward, done = result[:3]
agent.add_to_buffer(state, action, reward, next_state, done, goal)
agent.update(BATCH_SIZE)
state = next_state
total_reward += reward
if done:
break
print(f"Episode {episode + 1}, Total Reward: {total_reward}")
测试程序:
# 测试程序
def test_policy(env, policy, num_episodes=10):
for episode in range(num_episodes):
result = env.reset()
state = result[0] if isinstance(result, tuple) else result # 兼容不同版本
total_reward = 0
for step in range(MAX_STEPS):
action = agent.act(state)
result = env.step(action_mapper(action))
# 兼容不同版本的 Gym, env.step() 取前四个返回值
next_state, reward, done = result[:3]
total_reward += reward
if done:
break
print(f"测试第 {episode + 1} 集,总奖励: {total_reward}")
# 设置测试CartPole环境
env = CartPoleContinuousWrapper(gym.make('CartPole-v1', render_mode="human"))
test_policy(env, agent)
由于博文主要为了介绍相关算法的原理和应用的方法,缺乏对于实际效果的关注,算法可能在上述环境中的效果不佳,一是算法不适配上述环境,二是算法未调参和优化,三是等等。上述代码用于了解和学习算法足够了,但若是想直接将上面代码应用于实际项目中,还需要进行修改。
HER通过回放智能体未能实现目标的经验,并引入替代目标,使得即便在奖励稀疏的情况下,智能体仍然可以有效学习。该算法的核心在于如何选择和生成替代目标,以及如何将这些经验应用到现有的深度强化学习框架中。
参考论文: Hindsight Experience Replay, NeurIPS 2017.
文章若有不当和不正确之处,还望理解与指出。由于部分文字、图片等来源于互联网,无法核实真实出处,如涉及相关争议,请联系博主删除。如有错误、疑问和侵权,欢迎评论留言联系作者