在本教程系列的(1)中,我演示了如何构建一个agent来在多个选择中选取最有价值的一个。在本文中,我将讲解如何得到一个从现实世界中获取 观测值 ,并作出 长期收益 最大的 行动 的agent。正如前文所说,本文解决的问题将是一个完备的强化学习问题。
完备的强化学习问题所处的环境又被称为马尔科夫决策过程(MDPs)。这个环境不再仅仅通过当前的行动来提供收益并进行状态切换,它提供的收益同时取决于环境的状态以及在当前状态中执行的行为。这是一个动态的暂态过程并且可以延迟生效。
下面给出更加正式一点的马尔科夫决策过程定义:一个马尔科夫决策过程包含可能状态集S
(包含所有可能经历的状态s
)和可选行动集A
(包含所有agent可选的行动a
)。给定行动状态组合(s,a)
,下一个状态的概率分布通过T(s,a)
确定,收益由R(s,a)
给定。由此,在马尔科夫决策过程的任意时刻,agent在给定状态s
和行动a
后获得新的状态s'
和收益r
。
虽然这个概念很简单,但是我们可以将之应用于任何可以视为MDP的任务上。举例来说,你可以想象开门的场景,状态信息为我们眼睛看到的场景以及我们的身体和门在真实世界中的位置,可选行动为我们身体可以作出的任何动作,收益为门是否被成功打开。像走向门这种动作是解决该问题必需的步骤,但是它们并不能为我们直接带来收益,只有开门这个动作才能直接带来收益。在这种场景下,agent需要学会为导向收益的一系列行动分配价值分数,这涉及了开门这一动态过程中的一系列暂态过程。
为了实现这样的功能,我们需要一个比之前的双臂赌博机更有挑战性的问题。OpenAI gym包含了一系列强化学习问题所需的环境,本文也正是利用其中的一个经典案例:Cart-Pole(查看相关文档)。在这个案例中,我们希望agent在学习之后可以使木杆平衡尽可能长的时间不倒下。和双臂赌博机不同,这个任务需要额外考虑以下两点:
考虑到延迟收益,我们之前教程使用的策略梯度的形式需要调整。首先我们每次要利用多个过程来更新agent。为了实现这一点,我们将过程存在中间变量(作为缓冲)里,需要的时候用它来更新agent。这些过程组成的序列有时又被称作rollouts
或experience trace
。存储起来的序列并不能直接使用,我们还需要引入折算引子进行调整。
直观地来看,延迟收益使得每次的行动的收益除了来自当前收益的一部分以外,还有后续全部收益折算给之前行动的收益。相应地,我们使用修改后的收益作为损失函数中对行动的评估标准。在完成这一改动之后,我们可以尝试解决CartPole案例了!
代码示例(选用了评论区用户重写作者代码后的版本):
%matplotlib inline
import numpy as np
from matplotlib import animation
from IPython.display import Image
import tensorflow as tf
import matplotlib.pyplot as plt
import math
import gym
env = gym.make("CartPole-v0")
# 尝试随机行动来测试环境
env.reset()
reward_sum = 0
num_games = 10
num_game = 0
while num_game < num_games:
env.render()
observation, reward, done, _ = env.step(env.action_space.sample())
reward_sum += reward
if done:
print("本episode的收益:{}".format(reward_sum))
reward_sum = 0
num_game += 1
env.reset()
# 初始化agent的神经网络
# 我们使用基于策略梯度的神经网络来接受观测值并传递给隐藏层来产生选择各个行为(左移/右移)的概率分布
# 神经网络超参数
hidden_layer_neurons = 13
batch_size = 50
learning_rate = 1e-2
gamma = .99
dimen = 4
tf.reset_default_graph()
# 定义输入占位符
observations = tf.placeholder(tf.float32, [None, dimen], name="input_x")
# 第一个权重层
W1 = tf.get_variable("W1", shape=[dimen, hidden_layer_neurons], initializer=tf.contrib.layers.xavier_initializer())
layer1 = tf.nn.relu(tf.matmul(observations, W1))
# 第二个权重层
W2 = tf.get_variable("W2", shape=[hidden_layer_neurons, 1], initializer=tf.contrib.layers.xavier_initializer())
output = tf.nn.sigmoid(tf.matmul(layer1, W2))
# 定义网络用于学习的计算图组件
trainable_vars = [W1, W2]
input_y = tf.placeholder(tf.float32, [None, 1], name="input_y")
advantages = tf.placeholder(tf.float32, name="reward_signal")
# 损失函数
log_lik = tf.log(input_y * (input_y - output) + (1 - input_y) * (input_y + output))
loss = -tf.reduce_mean(log_lik * advantages)
# 梯度
new_grads = tf.gradients(loss, trainable_vars)
W1_grad = tf.placeholder(tf.float32, name="batch_grad1")
W2_grad = tf.placeholder(tf.float32, name="batch_grad2")
# 学习
batch_grad = [W1_grad, W2_grad]
adam = tf.train.AdamOptimizer(learning_rate=learning_rate)
update_grads = adam.apply_gradients(zip(batch_grad, [W1, W2]))
def discount_rewards(r, gamma=0.99):
"""
输入一维的收益数组,输出折算后的收益值,例:f([1, 1, 1], 0.99) -> [1, 0.99, 0.9801]
"""
return np.array([val * (gamma ** i) for i, val in enumerate(r)])
reward_sum = 0
init = tf.global_variables_initializer()
# 定义观测值,输出值,收益值的占位符
xs = np.empty(0).reshape(0, dimen)
ys = np.empty(0).reshape(0, 1)
rewards = np.empty(0).reshape(0, 1)
# 初始化环境
sess = tf.Session()
rendering = False
sess.run(init)
observation = env.reset()
# 梯度的占位符
gradients = np.array([np.zeros(var.get_shape()) for var in trainable_vars])
num_episodes = 10000
num_episode = 0
while num_episode < num_episodes:
# 将观测值作为该批次的输入
x = np.reshape(observation, [1, dimen])
# 运行神经网络来决定输出
tf_prob = sess.run(output, feed_dict={observations: x})
# 基于我们的网络来决定输出,允许一定的随机性
y = 0 if tf_prob > np.random.uniform() else 1
# 将观测值和输出值追加至列表中以供学习
xs = np.vstack([xs, x])
ys = np.vstack([ys, y])
# 获取行动的结果
observation, reward, done, _ = env.step(y)
reward_sum += reward
rewards = np.vstack([rewards, reward])
if done:
# 标准化收益值
discounted_rewards = discount_rewards(rewards, gamma)
discounted_rewards -= discounted_rewards.mean()
discounted_rewards /= discounted_rewards.std()
# 根据实时得到的梯度调整梯度
gradients += np.array(sess.run(new_grads, feed_dict={observations: xs, input_y: ys, advantages: discounted_rewards}))
# 重置游戏变量
xs = np.empty(0).reshape(0, dimen)
ys = np.empty(0).reshape(0, 1)
rewards = np.empty(0).reshape(0, 1)
# 一个batch运行结束
if num_episode % batch_size == 0:
# 更新梯度
sess.run(update_grads, feed_dict={W1_grad: gradients[0], W2_grad: gradients[1]})
# 重置梯度
gradients *= 0
# 输出本轮运行状态
print("episode = {} 时的平均收益:{}".format(num_episode, reward_sum / batch_size))
if reward_sum / batch_size > 150:
print("问题在episode = {} 时解决!".format(num_episode))
break
reward_sum = 0
num_episode += 1
observation = env.reset()
# 去除随机决策,测试agent的性能
observation = env.reset()
observation
reward_sum = 0
while True:
env.render()
x = np.reshape(observation, [1, dimen])
y = sess.run(output, feed_dict={observations: x})
y = 0 if y > 0.5 else 1
observation, reward, done, _ = env.step(y)
reward_sum += reward
if done:
print("最终分数: {}".format(reward_sum))
break
···
episode = 2850 时的平均收益:98.26
episode = 2900 时的平均收益:96.96
episode = 2950 时的平均收益:95.62
episode = 3000 时的平均收益:98.82
episode = 3050 时的平均收益:122.34
episode = 3100 时的平均收益:119.22
episode = 3150 时的平均收益:115.66
episode = 3200 时的平均收益:125.16
episode = 3250 时的平均收益:133.44
episode = 3300 时的平均收益:140.7
episode = 3350 时的平均收益:153.4
问题在episode = 3350 时解决!
最终分数: 200.0
现在我们已经拥有了一个实用而又有趣的强化学习agent,不过这离目前最先进的技术还很远。尽管我们使用了基于策略梯度的神经网络,但是网络的深度和复杂度远远不及大部分先进的网络。在下一篇文章中我将展示如何使用深度神经网络来创建agent去在更复杂的环境中学习,同时深入讲解网络在复杂环境下的表征手段。
系列文章(翻译进度):
本文系外文翻译,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系外文翻译,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。