相关文章:
【四】gym搭建自己的环境,全网最详细版本,3分钟你就学会了!
【五】gym搭建自己的环境____详细定义自己myenv.py文件
【六】gym搭建自己环境升级版设计,动态障碍------强化学习
创建CartPole-v0的环境.
import gym
env = gym.make('CartPole-v0')
env.reset()
for i in range(1000):
env.render()
env.step(env.action_space.sample()) # take a random action
env.close()
代码含义:
注释:导入gym库,第2行创建CartPole-v0环境,并在第3行重置环境状态。在for循环中进行1000个时间步长(timestep)的控制,第5行刷新每个时间步长环境画面,第6行对当前环境状态采取一个随机动作(0或1),最后第7行循环结束后关闭仿真环境。
同时本地会渲染出一个窗口进行模拟如下图:
关于Space的说明 在上面的代码中, 我们可以看到我们每一次的action都是随机进行取值的. 事实上, 每一个环境都有action_space和observation_space.(Every environment comes with an
action_space
and anobservation_space
) 以CartPole-v0来作为例子. 首先我们来看action_spaces, 这个代表可以采取的action的种类, 在CartPole-v0的例子中, 可以采取的action的种类只有两种. 我们看一下下面的示例.
import gym
env = gym.make('CartPole-v0')
print(env.action_space)
#> Discrete(2)
print(env.observation_space)
#> Box(4,)
对于observation_space. 则查看这个space的shape四个边界的上界和下界(能取到的最大值和最小值)
print(env.observation_space.high)
print(env.observation_space.low)
[4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38]
[-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38]
observation_space 是一个Box类型,从box.py源码可知,表示一个 n 维的盒子,所以在上一节打印出来的observation是一个长度为 4 的数组。数组中的每个元素都具有上下界。
利用运动空间和观测空间的定义和范围,在许多仿真环境中,Box和Discrete是最常见的空间描述,在智体每次执行动作时,都属于这些空间范围内,代码示例为:
from gym import spaces
space = spaces.Discrete(6)
# Set with 6 elements {0, 1, 2, ..., 6}
x = space.sample()
print(space.contains(x))
print(space.n == 6)
True
True
在CartPole-v0栗子中,运动只能选择左和右,分别用{0,1}表示
对于step的详细说明 上面我们只是每次做随机的action, 为了更好的进行action, 我们需要知道每一步step之后的返回值. 事实上, step会返回四个值. 下面我们一一进行介绍.
观测 Observation (Object):当前step执行后,环境的观测(类型为对象)。例如,从相机获取的像素点,机器人各个关节的角度或棋盘游戏当前的状态等;
完成 Done (Boolen): 表示是否需要将环境重置 env.reset。大多数情况下,当 Done 为True 时,就表明当前回合(episode)或者试验(tial)结束。例如当机器人摔倒或者掉出台面,就应当终止当前回合进行重置(reset);
信息 Info (Dict): 针对调试过程的诊断信息。在标准的智体仿真评估当中不会使用到这个info,
在 Gym 仿真中,每一次回合开始,需要先执行 reset() 函数,返回初始观测信息,然后根据标志位 done 的状态,来决定是否进行下一次回合。所以更恰当的方法是遵守done的标志.
import gym
env = gym.make('CartPole-v0')
for i_episode in range(20):
observation = env.reset()
for t in range(100):
env.render()
print(observation)
action = env.action_space.sample()
observation, reward, done, info = env.step(action)
if done:
print("Episode finished after {} timesteps".format(t+1))
break
env.close()
当done 为true时,控制失败,此阶段episode 结束。可以计算每 episode 的回报就是其坚持的t+1时间,坚持的越久回报越大.在上面算法中,agent 的行为选择是随机的,平均回报为20左右。
*再次说明gym模块中环境的常用函数 gym的初始化
env = gym.make('CartPole-v0')
# 定义使用gym库中的某一个环境,'CartPole-v0'可以改为其它环境
env = env.unwrapped
# unwrapped是打开限制的意思
gym的各个参数的获取
env.action_space
# 查看这个环境中可用的action有多少个,返回Discrete()格式
env.observation_space
# 查看这个环境中observation的特征,返回Box()格式
n_actions=env.action_space.n
# 查看这个环境中可用的action有多少个,返回int
n_features=env.observation_space.shape[0]
# 查看这个环境中observation的特征有多少个,返回int
刷新环境
env.reset()
# 用于一个done后环境的重启,获取回合的第一个observation
env.render()
# 用于每一步后刷新环境状态
observation_, reward, done, info = env.step(action)
# 获取下一步的环境、得分、检测是否完成。
实例应用 平衡杆测试代码:以AC算法为例,详细解析看下面链接分析。
import numpy as np
import tensorflow as tf
import gym
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
tf.compat.v1.disable_eager_execution() #这句话可有可无
np.random.seed(2)
tf.set_random_seed(2) # reproducible
# Superparameters
OUTPUT_GRAPH = False
MAX_EPISODE = 3000
DISPLAY_REWARD_THRESHOLD = 200 # renders environment if total episode reward is greater then this threshold
MAX_EP_STEPS = 1000 # maximum time step in one episode
RENDER = False # rendering wastes time
GAMMA = 0.9 # reward discount in TD error
LR_A = 0.001 # learning rate for actor
LR_C = 0.01 # learning rate for critic
env = gym.make('CartPole-v0')
env.seed(1) # reproducible
env = env.unwrapped
N_F = env.observation_space.shape[0]
N_A = env.action_space.n
class Actor(object):
def __init__(self, sess, n_features, n_actions, lr=0.001):
self.sess = sess
self.s = tf.placeholder(tf.float32, [1, n_features], "state")
self.a = tf.placeholder(tf.int32, None, "act")
self.td_error = tf.placeholder(tf.float32, None, "td_error") # TD_error
with tf.variable_scope('Actor'):
l1 = tf.layers.dense(
inputs=self.s,
units=20, # number of hidden units
activation=tf.nn.relu,
kernel_initializer=tf.random_normal_initializer(0., .1), # weights
bias_initializer=tf.constant_initializer(0.1), # biases
name='l1'
)
self.acts_prob = tf.layers.dense(
inputs=l1,
units=n_actions, # output units
activation=tf.nn.softmax, # get action probabilities
kernel_initializer=tf.random_normal_initializer(0., .1), # weights
bias_initializer=tf.constant_initializer(0.1), # biases
name='acts_prob'
)
with tf.variable_scope('exp_v'):
log_prob = tf.log(self.acts_prob[0, self.a])
self.exp_v = tf.reduce_mean(log_prob * self.td_error) # advantage (TD_error) guided loss
with tf.variable_scope('train'):
self.train_op = tf.train.AdamOptimizer(lr).minimize(-self.exp_v) # minimize(-exp_v) = maximize(exp_v)
def learn(self, s, a, td):
s = s[np.newaxis, :]
feed_dict = {self.s: s, self.a: a, self.td_error: td}
_, exp_v = self.sess.run([self.train_op, self.exp_v], feed_dict)
return exp_v
def choose_action(self, s):
s = s[np.newaxis, :]
probs = self.sess.run(self.acts_prob, {self.s: s}) # get probabilities for all actions
return np.random.choice(np.arange(probs.shape[1]), p=probs.ravel()) # return a int
class Critic(object):
def __init__(self, sess, n_features, lr=0.01):
self.sess = sess
self.s = tf.placeholder(tf.float32, [1, n_features], "state")
self.v_ = tf.placeholder(tf.float32, [1, 1], "v_next")
self.r = tf.placeholder(tf.float32, None, 'r')
with tf.variable_scope('Critic'):
l1 = tf.layers.dense(
inputs=self.s,
units=20, # number of hidden units
activation=tf.nn.relu, # None
# have to be linear to make sure the convergence of actor.
# But linear approximator seems hardly learns the correct Q.
kernel_initializer=tf.random_normal_initializer(0., .1), # weights
bias_initializer=tf.constant_initializer(0.1), # biases
name='l1'
)
self.v = tf.layers.dense(
inputs=l1,
units=1, # output units
activation=None,
kernel_initializer=tf.random_normal_initializer(0., .1), # weights
bias_initializer=tf.constant_initializer(0.1), # biases
name='V'
)
with tf.variable_scope('squared_TD_error'):
self.td_error = self.r + GAMMA * self.v_ - self.v
self.loss = tf.square(self.td_error) # TD_error = (r+gamma*V_next) - V_eval
with tf.variable_scope('train'):
self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)
def learn(self, s, r, s_):
s, s_ = s[np.newaxis, :], s_[np.newaxis, :]
v_ = self.sess.run(self.v, {self.s: s_})
td_error, _ = self.sess.run([self.td_error, self.train_op],
{self.s: s, self.v_: v_, self.r: r})
return td_error
sess = tf.Session()
actor = Actor(sess, n_features=N_F, n_actions=N_A, lr=LR_A)
critic = Critic(sess, n_features=N_F, lr=LR_C) # we need a good teacher, so the teacher should learn faster than the actor
sess.run(tf.global_variables_initializer())
if OUTPUT_GRAPH:
tf.summary.FileWriter("logs/", sess.graph)
for i_episode in range(MAX_EPISODE):
s = env.reset()
t = 0
track_r = []
while True:
if RENDER: env.render()
a = actor.choose_action(s)
s_, r, done, info = env.step(a)
if done: r = -20
track_r.append(r)
td_error = critic.learn(s, r, s_) # gradient = grad[r + gamma * V(s_) - V(s)]
actor.learn(s, a, td_error) # true_gradient = grad[logPi(s,a) * td_error]
s = s_
t += 1
if done or t >= MAX_EP_STEPS:
ep_rs_sum = sum(track_r)
if 'running_reward' not in globals():
running_reward = ep_rs_sum
else:
running_reward = running_reward * 0.95 + ep_rs_sum * 0.05
if running_reward > DISPLAY_REWARD_THRESHOLD: RENDER = True # rendering
print("episode:", i_episode, " reward:", int(running_reward))
break
更多实例教程可以参考我下面的文章在本地或者在parl中制作自己的游戏环境:
都有详细原理分析和码源解释的。