前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【二】gym初次入门一学就会---代码详细解析简明教程----平衡杆案例

【二】gym初次入门一学就会---代码详细解析简明教程----平衡杆案例

作者头像
汀丶人工智能
发布2022-12-01 16:48:27
8310
发布2022-12-01 16:48:27
举报
文章被收录于专栏:NLP/KG

相关文章:

【一】gym环境安装以及安装遇到的错误解决

【二】gym初次入门一学就会-简明教程

【三】gym简单画图

【四】gym搭建自己的环境,全网最详细版本,3分钟你就学会了!

【五】gym搭建自己的环境____详细定义自己myenv.py文件

【六】gym搭建自己环境升级版设计,动态障碍------强化学习


gym简明教程

创建CartPole-v0的环境.

代码语言:javascript
复制
import gym
env = gym.make('CartPole-v0')
env.reset()
for i in range(1000):
    env.render()
    env.step(env.action_space.sample()) # take a random action
env.close()

代码含义:

  • reset(self):重置环境的状态,返回观察。
  • step(self, action):推进一个时间步长,返回observation, reward, done, info。
  • render(self, mode=‘human’, close=False):重绘环境的一帧。默认模式一般比较友好,如弹出一个窗口。
  • close(self):关闭环境,并清除内存。

注释:导入gym库,第2行创建CartPole-v0环境,并在第3行重置环境状态。在for循环中进行1000个时间步长(timestep)的控制,第5行刷新每个时间步长环境画面,第6行对当前环境状态采取一个随机动作(0或1),最后第7行循环结束后关闭仿真环境。

同时本地会渲染出一个窗口进行模拟如下图:

关于Space的说明 在上面的代码中, 我们可以看到我们每一次的action都是随机进行取值的. 事实上, 每一个环境都有action_space和observation_space.(Every environment comes with an action_space and an observation_space) 以CartPole-v0来作为例子. 首先我们来看action_spaces, 这个代表可以采取的action的种类, 在CartPole-v0的例子中, 可以采取的action的种类只有两种. 我们看一下下面的示例.

代码语言:javascript
复制
 import gym
env = gym.make('CartPole-v0')
print(env.action_space)
#> Discrete(2)
print(env.observation_space)
#> Box(4,)

  • action_space 是一个离散Discrete类型,从discrete.py源码可知,范围是一个{0,1,…,n-1} 长度为 n 的非负整数集合,在CartPole-v0例子中,动作空间表示为{0,1}。

对于observation_space. 则查看这个space的shape四个边界的上界和下界(能取到的最大值和最小值)

代码语言:javascript
复制
print(env.observation_space.high)
print(env.observation_space.low)
[4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38]
[-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38]

observation_space 是一个Box类型,从box.py源码可知,表示一个 n 维的盒子,所以在上一节打印出来的observation是一个长度为 4 的数组。数组中的每个元素都具有上下界。

利用运动空间和观测空间的定义和范围,在许多仿真环境中,BoxDiscrete是最常见的空间描述,在智体每次执行动作时,都属于这些空间范围内,代码示例为:

代码语言:javascript
复制
from gym import spaces
space = spaces.Discrete(6) 
# Set with 6 elements {0, 1, 2, ..., 6}
x = space.sample()
print(space.contains(x)) 
print(space.n == 6)
True
True

CartPole-v0栗子中,运动只能选择左和右,分别用{0,1}表示

对于step的详细说明 上面我们只是每次做随机的action, 为了更好的进行action, 我们需要知道每一步step之后的返回值. 事实上, step会返回四个值. 下面我们一一进行介绍.

  • 观测 Observation (Object):当前step执行后,环境的观测(类型为对象)。例如,从相机获取的像素点,机器人各个关节的角度或棋盘游戏当前的状态等;
  • 奖励 Reward (Float): 执行上一步动作(action)后,智能体( agent)获得的奖励(浮点类型),不同的环境中奖励值变化范围也不相同,但是强化学习的目标就是使得总奖励值最大;
  • 完成 Done (Boolen): 表示是否需要将环境重置 env.reset。大多数情况下,当 Done 为True 时,就表明当前回合(episode)或者试验(tial)结束。例如当机器人摔倒或者掉出台面,就应当终止当前回合进行重置(reset);
  • 信息 Info (Dict): 针对调试过程的诊断信息。在标准的智体仿真评估当中不会使用到这个info,

在 Gym 仿真中,每一次回合开始,需要先执行 reset() 函数,返回初始观测信息,然后根据标志位 done 的状态,来决定是否进行下一次回合。所以更恰当的方法是遵守done的标志.

代码语言:javascript
复制
import gym
env = gym.make('CartPole-v0')
for i_episode in range(20):
    observation = env.reset()
    for t in range(100):
        env.render()
        print(observation)
        action = env.action_space.sample()
        observation, reward, done, info = env.step(action)
        if done:
            print("Episode finished after {} timesteps".format(t+1))
            break
env.close()

done 为true时,控制失败,此阶段episode 结束。可以计算每 episode 的回报就是其坚持的t+1时间,坚持的越久回报越大.在上面算法中,agent 的行为选择是随机的,平均回报为20左右。

*再次说明gym模块中环境的常用函数 gym的初始化

代码语言:javascript
复制
env = gym.make('CartPole-v0')   
# 定义使用gym库中的某一个环境,'CartPole-v0'可以改为其它环境
env = env.unwrapped             
# unwrapped是打开限制的意思

gym的各个参数的获取

代码语言:javascript
复制
env.action_space   		
# 查看这个环境中可用的action有多少个,返回Discrete()格式
env.observation_space   
# 查看这个环境中observation的特征,返回Box()格式
n_actions=env.action_space.n 
# 查看这个环境中可用的action有多少个,返回int
n_features=env.observation_space.shape[0] 
# 查看这个环境中observation的特征有多少个,返回int

刷新环境

代码语言:javascript
复制
env.reset()
# 用于一个done后环境的重启,获取回合的第一个observation
env.render()	
# 用于每一步后刷新环境状态
observation_, reward, done, info = env.step(action)
# 获取下一步的环境、得分、检测是否完成。

实例应用 平衡杆测试代码:以AC算法为例,详细解析看下面链接分析。

代码语言:javascript
复制
  
import numpy as np
import tensorflow as tf
import gym
 
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
 
tf.compat.v1.disable_eager_execution()  #这句话可有可无
 
np.random.seed(2)
tf.set_random_seed(2)  # reproducible
 
# Superparameters
OUTPUT_GRAPH = False
MAX_EPISODE = 3000
DISPLAY_REWARD_THRESHOLD = 200  # renders environment if total episode reward is greater then this threshold
MAX_EP_STEPS = 1000   # maximum time step in one episode
RENDER = False  # rendering wastes time
GAMMA = 0.9     # reward discount in TD error
LR_A = 0.001    # learning rate for actor
LR_C = 0.01     # learning rate for critic
 
env = gym.make('CartPole-v0')
env.seed(1)  # reproducible
env = env.unwrapped
 
N_F = env.observation_space.shape[0]
N_A = env.action_space.n
 
 
class Actor(object):
    def __init__(self, sess, n_features, n_actions, lr=0.001):
        self.sess = sess
 
        self.s = tf.placeholder(tf.float32, [1, n_features], "state")
        self.a = tf.placeholder(tf.int32, None, "act")
        self.td_error = tf.placeholder(tf.float32, None, "td_error")  # TD_error
 
        with tf.variable_scope('Actor'):
            l1 = tf.layers.dense(
                inputs=self.s,
                units=20,    # number of hidden units
                activation=tf.nn.relu,
                kernel_initializer=tf.random_normal_initializer(0., .1),    # weights
                bias_initializer=tf.constant_initializer(0.1),  # biases
                name='l1'
            )
 
            self.acts_prob = tf.layers.dense(
                inputs=l1,
                units=n_actions,    # output units
                activation=tf.nn.softmax,   # get action probabilities
                kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
                bias_initializer=tf.constant_initializer(0.1),  # biases
                name='acts_prob'
            )
 
        with tf.variable_scope('exp_v'):
            log_prob = tf.log(self.acts_prob[0, self.a])
            self.exp_v = tf.reduce_mean(log_prob * self.td_error)  # advantage (TD_error) guided loss
 
        with tf.variable_scope('train'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(-self.exp_v)  # minimize(-exp_v) = maximize(exp_v)
 
    def learn(self, s, a, td):
        s = s[np.newaxis, :]
 
        feed_dict = {self.s: s, self.a: a, self.td_error: td}
 
        _, exp_v = self.sess.run([self.train_op, self.exp_v], feed_dict)
 
        return exp_v
 
    def choose_action(self, s):
        s = s[np.newaxis, :]
        probs = self.sess.run(self.acts_prob, {self.s: s})   # get probabilities for all actions
        return np.random.choice(np.arange(probs.shape[1]), p=probs.ravel())   # return a int
 
 
class Critic(object):
    def __init__(self, sess, n_features, lr=0.01):
        self.sess = sess
 
        self.s = tf.placeholder(tf.float32, [1, n_features], "state")
        self.v_ = tf.placeholder(tf.float32, [1, 1], "v_next")
        self.r = tf.placeholder(tf.float32, None, 'r')
 
        with tf.variable_scope('Critic'):
            l1 = tf.layers.dense(
                inputs=self.s,
                units=20,  # number of hidden units
                activation=tf.nn.relu,  # None
                # have to be linear to make sure the convergence of actor.
                # But linear approximator seems hardly learns the correct Q.
                kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
                bias_initializer=tf.constant_initializer(0.1),  # biases
                name='l1'
            )
 
            self.v = tf.layers.dense(
                inputs=l1,
                units=1,  # output units
                activation=None,
                kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
                bias_initializer=tf.constant_initializer(0.1),  # biases
                name='V'
            )
 
        with tf.variable_scope('squared_TD_error'):
            self.td_error = self.r + GAMMA * self.v_ - self.v
            self.loss = tf.square(self.td_error)    # TD_error = (r+gamma*V_next) - V_eval
        with tf.variable_scope('train'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)
 
    def learn(self, s, r, s_):
        s, s_ = s[np.newaxis, :], s_[np.newaxis, :]
 
        v_ = self.sess.run(self.v, {self.s: s_})
        td_error, _ = self.sess.run([self.td_error, self.train_op],
                                          {self.s: s, self.v_: v_, self.r: r})
        return td_error
 
 
sess = tf.Session()
 
actor = Actor(sess, n_features=N_F, n_actions=N_A, lr=LR_A)
critic = Critic(sess, n_features=N_F, lr=LR_C)     # we need a good teacher, so the teacher should learn faster than the actor
 
sess.run(tf.global_variables_initializer())
 
if OUTPUT_GRAPH:
    tf.summary.FileWriter("logs/", sess.graph)
 
for i_episode in range(MAX_EPISODE):
    s = env.reset()
    t = 0
    track_r = []
    while True:
        if RENDER: env.render()
 
        a = actor.choose_action(s)
 
        s_, r, done, info = env.step(a)
 
        if done: r = -20
 
        track_r.append(r)
 
        td_error = critic.learn(s, r, s_)  # gradient = grad[r + gamma * V(s_) - V(s)]
        actor.learn(s, a, td_error)     # true_gradient = grad[logPi(s,a) * td_error]
 
        s = s_
        t += 1
 
        if done or t >= MAX_EP_STEPS:
            ep_rs_sum = sum(track_r)
 
            if 'running_reward' not in globals():
                running_reward = ep_rs_sum
            else:
                running_reward = running_reward * 0.95 + ep_rs_sum * 0.05
            if running_reward > DISPLAY_REWARD_THRESHOLD: RENDER = True  # rendering
            print("episode:", i_episode, "  reward:", int(running_reward))
            break
 

更多实例教程可以参考我下面的文章在本地或者在parl中制作自己的游戏环境:

PaddlePaddlle强化学习及PARL框架{飞桨}

【一】-环境配置+python入门教学

【二】-Parl基础命令

【三】-Notebook、&pdb、ipdb 调试

【四】-强化学习入门简介

【五】-Sarsa&Qlearing详细讲解

【六】-DQN

【七】-Policy Gradient

【八】-DDPG

【九】-四轴飞行器仿真

都有详细原理分析和码源解释的。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-04-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • gym简明教程
  • PaddlePaddlle强化学习及PARL框架{飞桨}
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档