关注:决策智能与机器学习,深耕AI脱水干货
作者:Abhishek Suran
转载请联系作者
提要:PPO强化学习算法解析及其TensorFlow 2.x实现过程(含代码)
在本文中,我们将尝试理解Open-AI的强化学习算法:近端策略优化算法PPO(
为什么PPO ?
因为PPO可以方便地克服以下两个问题。
PPO的核心理念
在早期的Policy梯度法中,目标函数类似于
但现在我们不用现行Policy的日志,而是用现行Policy与旧Policy的比率.
我们也将裁剪比例,并将两者的最小值,即b/w裁剪和未裁剪。
此压缩目标将限制大型策略更新,如下所示。
因此,最后的目标包括3个部分,
算法的步骤
代码
神经网络:
class critic(tf.keras.Model):
def __init__(self):
super().__init__()
self.d1 = tf.keras.layers.Dense(128,activation='relu')
self.v = tf.keras.layers.Dense(1, activation = None)
def call(self, input_data):
x = self.d1(input_data)
v = self.v(x)
return v
class actor(tf.keras.Model):
def __init__(self):
super().__init__()
self.d1 = tf.keras.layers.Dense(128,activation='relu')
self.a = tf.keras.layers.Dense(2,activation='softmax')
def call(self, input_data):
x = self.d1(input_data)
a = self.a(x)
return a
行动选择:
class agent():
def __init__(self):
self.a_opt = tf.keras.optimizers.Adam(learning_rate=7e-3)
self.c_opt = tf.keras.optimizers.Adam(learning_rate=7e-3)
self.actor = actor()
self.critic = critic()
self.clip_pram = 0.2
def act(self,state):
prob = self.actor(np.array([state]))
prob = prob.numpy()
dist = tfp.distributions.Categorical(probs=prob, dtype=tf.float32)
action = dist.sample()
return int(action.numpy()[0])
测试模型的知识:
这个功能将用来测试我们的代理的知识,并返回一集的总报酬。
def test_reward(env):
total_reward = 0
state = env.reset()
done = False
while not done:
action = np.argmax(agentoo7.actor(np.array([state])).numpy())
next_state, reward, done, _ = env.step(action)
state = next_state
total_reward += reward
return total_reward
训练循环:
tf.random.set_seed(336699)
agentoo7 = agent()
steps = 50
ep_reward = []
total_avgr = []
target = False
best_reward = 0
avg_rewards_list = []
for s in range(steps):
if target == True:
break
done = False
state = env.reset()
all_aloss = []
all_closs = []
rewards = []
states = []
actions = []
probs = []
dones = []
values = []
print("new episod")
for e in range(128):
action = agentoo7.act(state)
value = agentoo7.critic(np.array([state])).numpy()
next_state, reward, done, _ = env.step(action)
dones.append(1-done)
rewards.append(reward)
states.append(state)
#actions.append(tf.one_hot(action, 2, dtype=tf.int32).numpy().tolist())
actions.append(action)
prob = agentoo7.actor(np.array([state]))
probs.append(prob[0])
values.append(value[0][0])
state = next_state
if done:
env.reset()
value = agentoo7.critic(np.array([state])).numpy()
values.append(value[0][0])
np.reshape(probs, (len(probs),2))
probs = np.stack(probs, axis=0)
states, actions,returns, adv = preprocess1(states, actions, rewards, dones, values, 1)
for epocs in range(10):
al,cl = agentoo7.learn(states, actions, adv, probs, returns)
# print(f"al{al}")
# print(f"cl{cl}")
avg_reward = np.mean([test_reward(env) for _ in range(5)])
print(f"total test reward is {avg_reward}")
avg_rewards_list.append(avg_reward)
if avg_reward > best_reward:
print('best reward=' + str(avg_reward))
agentoo7.actor.save('model_actor_{}_{}'.format(s, avg_reward), save_format="tf")
agentoo7.critic.save('model_critic_{}_{}'.format(s, avg_reward), save_format="tf")
best_reward = avg_reward
if best_reward == 200:
target = True
env.reset()
env.close()
估计广义优势:
def preprocess1(states, actions, rewards, done, values, gamma):
g = 0
lmbda = 0.95
returns = []
for i in reversed(range(len(rewards))):
delta = rewards[i] + gamma * values[i + 1] * done[i] - values[i]
g = delta + gamma * lmbda * dones[i] * g
returns.append(g + values[i])
returns.reverse()
adv = np.array(returns, dtype=np.float32) - values[:-1]
adv = (adv - np.mean(adv)) / (np.std(adv) + 1e-10)
states = np.array(states, dtype=np.float32)
actions = np.array(actions, dtype=np.int32)
returns = np.array(returns, dtype=np.float32)
return states, actions, returns, adv
学习功能:
def learn(self, states, actions, adv , old_probs, discnt_rewards):
discnt_rewards = tf.reshape(discnt_rewards, (len(discnt_rewards),))
adv = tf.reshape(adv, (len(adv),))
old_p = old_probs
old_p = tf.reshape(old_p, (len(old_p),2))
with tf.GradientTape() as tape1, tf.GradientTape() as tape2:
p = self.actor(states, training=True)
v = self.critic(states,training=True)
v = tf.reshape(v, (len(v),))
td = tf.math.subtract(discnt_rewards, v)
c_loss = 0.5 * kls.mean_squared_error(discnt_rewards, v)
a_loss = self.actor_loss(p, actions, adv, old_probs, c_loss)
grads1 = tape1.gradient(a_loss, self.actor.trainable_variables)
grads2 = tape2.gradient(c_loss, self.critic.trainable_variables)
self.a_opt.apply_gradients(zip(grads1, self.actor.trainable_variables))
self.c_opt.apply_gradients(zip(grads2, self.critic.trainable_variables))
return a_loss, c_loss
演员损失:
def actor_loss(self, probs, actions, adv, old_probs, closs):
probability = probs
entropy = tf.reduce_mean(tf.math.negative(tf.math.multiply(probability,tf.math.log(probability))))
#print(probability)
#print(entropy)
sur1 = []
sur2 = []
for pb, t, op in zip(probability, adv, old_probs):
t = tf.constant(t)
op = tf.constant(op)
#print(f"t{t}")
#ratio = tf.math.exp(tf.math.log(pb + 1e-10) - tf.math.log(op + 1e-10))
ratio = tf.math.divide(pb,op)
#print(f"ratio{ratio}")
s1 = tf.math.multiply(ratio,t)
#print(f"s1{s1}")
s2 = tf.math.multiply(tf.clip_by_value(ratio, 1.0 - self.clip_pram, 1.0 + self.clip_pram),t)
#print(f"s2{s2}")
sur1.append(s1)
sur2.append(s2)
sr1 = tf.stack(sur1)
sr2 = tf.stack(sur2)
#closs = tf.reduce_mean(tf.math.square(td))
loss = tf.math.negative(tf.reduce_mean(tf.math.minimum(sr1, sr2)) - closs + 0.001 * entropy)
#print(loss)
return loss
这就是编码。现在让我们看看你的代理不学习的原因和一些提示。
执行时需要注意的事项
在编写RL时,需要记住以下几点。
相关资料