前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【九】强化学习之TD3算法四轴飞行器仿真---PaddlePaddlle【PARL】框架

【九】强化学习之TD3算法四轴飞行器仿真---PaddlePaddlle【PARL】框架

作者头像
汀丶人工智能
发布2022-12-01 16:02:25
5870
发布2022-12-01 16:02:25
举报
文章被收录于专栏:NLP/KG

相关文章:

【一】飞桨paddle【GPU、CPU】安装以及环境配置+python入门教学

【二】-Parl基础命令

【三】-Notebook、&pdb、ipdb 调试

【四】-强化学习入门简介

【五】-Sarsa&Qlearing详细讲解

【六】-DQN

【七】-Policy Gradient

【八】-DDPG

【九】-四轴飞行器仿真

代码链接:码云:https://gitee.com/dingding962285595/parl_work  ;github:https://github.com/PaddlePaddle/PARL


一、AI Studio 项目详解【VisualDL工具】

二、AI Studio 项目详解【环境使用说明、脚本任务】

三、AI Studio 项目详解【分布式训练-单机多机】

四、AI Studio 项目详解【图形化任务】

五、AI Studio 项目详解【在线部署及预测】

本节码源链接:https://github.com/PaddlePaddle/RLSchool

1.RLSchool:强化学习模拟环境集合

悬浮控制任务:

●保持四轴飞行器悬停不掉落

●环境:

●水平地板100 x 100

●飞行器自身

●初始状态:

●起始坐标(0, 0, 5)

●随机角度和随机速度

●停止条件:

●撞到地板

●超过1000个step

●Reward:

●悬停越久越好

●能量损耗越小越好

●离初始位置越近越好

2.安装

代码语言:javascript
复制

pip install rlschool

代码语言:txt
复制
## 测试环境可视化功能

```sh

避障任务, 'no_collision'

python -m rlschool.quadrotor.env

速度控制任务, 'velocity_control'

python -m rlschool.quadrotor.env velocity_control

悬浮控制任务, 'hovering_control'

python -m rlschool.quadrotor.env hovering_control

代码语言:txt
复制
环境正确安装后,飞行器飞行渲染效果如下:

* 避障任务

<img src="demo/demo_no_collision.gif" width="400"/>

* 速度控制任务

<img src="demo/demo_velocity_control.gif" width="400"/>

* 悬浮控制任务

<img src="demo/demo_hovering_control.gif" width="400"/>

3.环境创建及使用

四轴飞行器环境遵循标准的Gym APIs接口来创建、运行和渲染环境。目前,四轴飞行器支持3种任务:避障任务、速度控制任务、悬浮控制任务。任务由创建环境时的task参数设定。

创建避障任务的示例代码如下:

代码语言:javascript
复制
```python

from rlschool import make_env

env = make_env("Quadrotor", task="no_collision", map_file=None, simulator_conf=None)

env.reset()

代码语言:txt
复制

当参数map\_file为默认值None时,模拟器世界是将使用100x100的平底作为地图。一旦飞行器落下,即认为击中障碍物,环境终止。地图文件是如default_map.txt格式的文本文件,其中每个数字表示对应位置障碍墙的高度。-1标记了飞行器的初识位置。需要时,可以设定map\_file为用户自己生成的地图配置文件。

当参数simulator\_conf是默认值None时,将会使用默认的模拟器配置config.json。如果用户需要自主设定四轴飞行器的动力学参数,可以将simulator\_conf设定为新的_config.json_的路径。

创建速度控制任务的示例代码如下:

代码语言:javascript
复制
```python

from rlschool import make_env

env = make_env("Quadrotor", task="velocity_control", seed=0)

env.reset()

代码语言:txt
复制

其中,seed参数是用来采样目标速度序列的随机种子,不同随机种子将生成不同的速度控制任务。

创建悬浮控制任务的示例代码如下:

代码语言:javascript
复制
```python

from rlschool import make_env

env = make_env("Quadrotor", task="hovering_control")

env.reset()

代码语言:txt
复制

 3.1 动作空间

四轴飞行器的动作代表施加在4个螺旋桨发动机的4个电压值,安装默认配置文件config.json,电压的范围在[0.10, 15.0]

用户可以通过一些特殊的动作理解控制电压如何操作四轴飞行器,例如,动作[1.0, 1.0, 1.0, 1.0]将使无初速度的飞行器垂直向上或向下运动。

代码语言:javascript
复制
```python

from rlschool import make_env

env = make_env("Quadrotor", task="no_collision")

env.reset()

env.render()

reset = False

while not reset:

    state, reward, reset, info = env.step(1.0, 1.0, 1.0, 1.0)

    env.render()

代码语言:txt
复制

3.2 状态

四轴飞行器的状态可以分为3类:传感器测量数据,飞行状态和任务相关状态。可以通过env.step返回的第四个量info获得Python字典形式的状态值。

 3.2.1 传感器测量数据

* acc\_x: 加速度计在x轴方向上的测量值。

* acc\_y: 加速度计在y轴方向上的测量值。

* acc\_z: 加速度计在z轴方向上的测量值。

* gyro\_x: 陀螺仪在x轴方向上的测量值。

* gyro\_y: 陀螺仪在y轴方向上的测量值。

* gyro\_z: 陀螺仪在z轴方向上的测量值。

* z: 气压计测量的高度数据,为方便认为是到地面的距离。

* pitch: 飞行器绕y轴的逆时针转动角度。

* roll: 飞行器绕x轴的逆时针转动角度。

* yaw: 飞行器绕z轴的逆时针转动角度。

这里需要说明的是,pitch, roll, yaw是陀螺仪测量的角速度随时间的积累值,因此算作传感器测量。

 3.2.2 飞行器状态

* b\_v\_x: 在飞行器坐标系下的x轴方向速度。

* b\_v\_y: 在飞行器坐标系下的y轴方向速度。

* b\_v\_z: 在飞行器坐标系下的z轴方向速度。

 3.2.3 任务相关状态

对速度控制任务,有如下额外状态:

* next\_target\_g\_v\_x: 任务设定的下一时刻目标速度在x轴的分量。

* next\_target\_g\_v\_y: 任务设定的下一时刻目标速度在y轴的分量。

* next\_target\_g\_v\_z: 任务设定的下一时刻目标速度在z轴的分量。

4.算法的收敛问题分析

4.1. 常规方法:

首先对于四轴飞行器的控制我们应利用 4 个维度的动作输出,以控制 4 个电机

但是这样的方法收敛速度非常慢

原因:这种情况下 4 个输出不但要学习环境的反馈变化,还要学习 4 者之间的搭配,所以导致学习时间非常长,达到收敛通常需要 100w steps 左右。

4.2. PARL思路探讨:

在本文中,让 DDPG 算法直接进行 5 个维度的输出,其中一个维度作为主控,同时控制 4 个电机的主电压,另外 4 个维度作为微调,对 4 个电机的电压做修正,其中设定输出电压为:主控电压+0.1*修正电压

最后效果非常好,5w-6w steps 左右就可以达到很好的收敛。

4.2.1 主程序train.py

代码语言:javascript
复制
import os
import numpy as np

import parl
from parl import layers
from paddle import fluid
from parl.utils import logger
from parl.utils import ReplayMemory  # 经验回放
#from parl.utils import action_mapping
from parl.env.continuous_wrappers import ActionMappingWrapper  # 将神经网络输出映射到对应的实际动作取值范围内

from rlschool import make_env  # 使用 RLSchool 创建飞行器环境
from quadrotor_model import QuadrotorModel
from quadrotor_agent import QuadrotorAgent
from parl.algorithms import DDPG

GAMMA = 0.99  # reward 的衰减因子,一般取 0.9 到 0.999 不等
TAU = 0.001  # target_model 跟 model 同步参数 的 软更新参数
ACTOR_LR = 0.0002  # Actor网络更新的 learning rate
CRITIC_LR = 0.001  # Critic网络更新的 learning rate
MEMORY_SIZE = 1e6  # replay memory的大小,越大越占用内存
MEMORY_WARMUP_SIZE = 1e4  # replay_memory 里需要预存一些经验数据,再从里面sample一个batch的经验让agent去learn
REWARD_SCALE = 0.01  # reward 的缩放因子
BATCH_SIZE = 256  # 每次给agent learn的数据数量,从replay memory随机里sample一批数据出来
TRAIN_TOTAL_STEPS = 1e6  # 总训练步数
TEST_EVERY_STEPS = 1e4  # 每个N步评估一下算法效果,每次评估5个episode求平均reward


def run_episode(env, agent, rpm):
    obs = env.reset()
    total_reward, steps = 0, 0
    while True:
        steps += 1
        batch_obs = np.expand_dims(obs, axis=0)
        action = agent.predict(batch_obs.astype('float32'))
        action = np.squeeze(action)

        # Add exploration noise, and clip to [-1.0, 1.0]
        action = np.clip(np.random.normal(action, 1.0), -1.0, 1.0)

        next_obs, reward, done, info = env.step(action)
        rpm.append(obs, action, REWARD_SCALE * reward, next_obs, done)

        if rpm.size() > MEMORY_WARMUP_SIZE:
            batch_obs, batch_action, batch_reward, batch_next_obs, \
                    batch_terminal = rpm.sample_batch(BATCH_SIZE)
            critic_cost = agent.learn(batch_obs, batch_action, batch_reward,
                                    batch_next_obs, batch_terminal)

        obs = next_obs
        total_reward += reward

        if done:
            break
    return total_reward, steps


# 评估 agent, 跑 5 个episode,总reward求平均
def evaluate(env, agent, render=False):
    eval_reward = []
    for i in range(5):
        obs = env.reset()
        total_reward, steps = 0, 0
        while True:
            batch_obs = np.expand_dims(obs, axis=0)
            action = agent.predict(batch_obs.astype('float32'))
            action = np.squeeze(action)
            action = np.clip(action, -1.0, 1.0)  ## special

            next_obs, reward, done, info = env.step(action)

            obs = next_obs
            total_reward += reward
            steps += 1

            if render:
                env.render()

            if done:
                break
        eval_reward.append(total_reward)
    return np.mean(eval_reward)


# 创建飞行器环境
env = make_env("Quadrotor", task="hovering_control")
# 将神经网络输出(取值范围为[-1, 1])映射到对应的实际动作取值范围内
#env = ActionMappingWrapper(env)          #TODO:!!!!这一步会报错

env.reset()
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]

# 使用parl框架搭建Agent:QuadrotorModel, DDPG, QuadrotorAgent三者嵌套
model = QuadrotorModel(act_dim)
algorithm = DDPG(
    model, gamma=GAMMA, tau=TAU, actor_lr=ACTOR_LR, critic_lr=CRITIC_LR)
agent = QuadrotorAgent(algorithm, obs_dim, act_dim)

# parl库也为DDPG算法内置了ReplayMemory,可直接从 parl.utils 引入使用
rpm = ReplayMemory(int(MEMORY_SIZE), obs_dim, act_dim)

test_flag = 0
total_steps = 0
'''
testing = 1    #添加的
if(not testing):    '''
while total_steps < TRAIN_TOTAL_STEPS:
    train_reward, steps = run_episode(env, agent, rpm)
    total_steps += steps
#logger.info('Steps: {} Reward: {}'.format(total_steps, train_reward))

    if total_steps // TEST_EVERY_STEPS >= test_flag:
        while total_steps // TEST_EVERY_STEPS >= test_flag:
            test_flag += 1

        evaluate_reward = evaluate(env, agent)
        logger.info('Steps {}, Test reward: {}'.format(total_steps,
                                                evaluate_reward))

        # 保存模型
        ckpt = 'model_dir/steps_{}.ckpt'.format(total_steps)
        agent.save(ckpt)
'''###添加的  TODO:参考添加的加载模型
else:
    # 加载模型
    save_path = 'model_dir/steps_10095.ckpt'
    agent.restore(save_path)

    evaluate_reward = evaluate(env, agent, render=False)
    logger.info('Test reward: {}'.format(evaluate_reward))
'''

4.2.2 加载模型,训练,测试------(包含测试)

该模块参考:https://blog.csdn.net/qq_42067550/article/details/106984355

代码语言:javascript
复制
# 加载模型
# save_path = 'model_dir_3/steps_1000000.ckpt'
# agent.restore(save_path)

# parl库也为DDPG算法内置了ReplayMemory,可直接从 parl.utils 引入使用
rpm = ReplayMemory(int(MEMORY_SIZE), obs_dim, act_dim)

test_flag = 0
total_steps = 0


testing = 1

if (not testing):
    while total_steps < TRAIN_TOTAL_STEPS:
        train_reward, steps = run_episode(env, agent, rpm)
        total_steps += steps
        # logger.info('Steps: {} Reward: {}'.format(total_steps, train_reward))

        if total_steps // TEST_EVERY_STEPS >= test_flag:
            while total_steps // TEST_EVERY_STEPS >= test_flag:
                test_flag += 1

            evaluate_reward = evaluate(env, agent)
            logger.info('Steps {}, Test reward: {}'.format(total_steps,
                                                           evaluate_reward))

            # 保存模型
            ckpt = 'model_dir_1/steps_{}.ckpt'.format(total_steps)
            agent.save(ckpt)

else:
    # 加载模型
    save_path = 'steps_50000.ckpt'
    agent.restore(save_path)

    evaluate_reward = evaluate(env, agent, render=False)
    logger.info('Test reward: {}'.format(evaluate_reward))

4.2.3神经网络model

代码语言:javascript
复制
import paddle.fluid as fluid
import parl
from parl import layers


class ActorModel(parl.Model):
    def __init__(self, act_dim):
        hidden_dim_1, hidden_dim_2 = 64, 64
        self.fc1 = layers.fc(size=hidden_dim_1, act='tanh')
        self.fc2 = layers.fc(size=hidden_dim_2, act='tanh')
        self.fc3 = layers.fc(size=act_dim, act='tanh')

    def policy(self, obs):
        x = self.fc1(obs)
        x = self.fc2(x)
        return self.fc3(x)


class CriticModel(parl.Model):
    def __init__(self):
        hidden_dim_1, hidden_dim_2 = 64, 64
        self.fc1 = layers.fc(size=hidden_dim_1, act='tanh')
        self.fc2 = layers.fc(size=hidden_dim_2, act='tanh')
        self.fc3 = layers.fc(size=1, act=None)

    def value(self, obs, act):
        x = self.fc1(obs)
        concat = layers.concat([x, act], axis=1)
        x = self.fc2(concat)
        Q = self.fc3(x)
        Q = layers.squeeze(Q, axes=[1])
        return Q


class QuadrotorModel(parl.Model):
    def __init__(self, act_dim):
        self.actor_model = ActorModel(act_dim)
        self.critic_model = CriticModel()

    def policy(self, obs):
        return self.actor_model.policy(obs)

    def value(self, obs, act):
        return self.critic_model.value(obs, act)

    def get_actor_params(self):
        return self.actor_model.parameters()

4.2.4 agent模块

代码语言:javascript
复制
import numpy as np
import parl
from parl import layers
from paddle import fluid


class QuadrotorAgent(parl.Agent):
    def __init__(self, algorithm, obs_dim, act_dim=4):
        assert isinstance(obs_dim, int)
        assert isinstance(act_dim, int)
        self.obs_dim = obs_dim
        self.act_dim = act_dim
        super(QuadrotorAgent, self).__init__(algorithm)

        # Attention: In the beginning, sync target model totally.
        self.alg.sync_target(decay=0)

    def build_program(self):
        self.pred_program = fluid.Program()
        self.learn_program = fluid.Program()

        with fluid.program_guard(self.pred_program):
            obs = layers.data(
                name='obs', shape=[self.obs_dim], dtype='float32')
            self.pred_act = self.alg.predict(obs)

        with fluid.program_guard(self.learn_program):
            obs = layers.data(
                name='obs', shape=[self.obs_dim], dtype='float32')
            act = layers.data(
                name='act', shape=[self.act_dim], dtype='float32')
            reward = layers.data(name='reward', shape=[], dtype='float32')
            next_obs = layers.data(
                name='next_obs', shape=[self.obs_dim], dtype='float32')
            terminal = layers.data(name='terminal', shape=[], dtype='bool')
            _, self.critic_cost = self.alg.learn(obs, act, reward, next_obs,
                                                terminal)

    def predict(self, obs):
        obs = np.expand_dims(obs, axis=0)
        act = self.fluid_executor.run(
            self.pred_program, feed={'obs': obs},
            fetch_list=[self.pred_act])[0]
        return act

    def learn(self, obs, act, reward, next_obs, terminal):
        feed = {
            'obs': obs,
            'act': act,
            'reward': reward,
            'next_obs': next_obs,
            'terminal': terminal
        }
        critic_cost = self.fluid_executor.run(
            self.learn_program, feed=feed, fetch_list=[self.critic_cost])[0]
        self.alg.sync_target()
        return critic_cost

5. 仿真结果:

代码详细解析和结果见-链接:https://aistudio.baidu.com/aistudio/projectdetail/1705633

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-03-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.RLSchool:强化学习模拟环境集合
  • 2.安装
  • 避障任务, 'no_collision'
  • 速度控制任务, 'velocity_control'
  • 悬浮控制任务, 'hovering_control'
  • 3.环境创建及使用
    •  3.1 动作空间
      • 3.2 状态
        •  3.2.1 传感器测量数据
        •  3.2.2 飞行器状态
        •  3.2.3 任务相关状态
    • 4.算法的收敛问题分析
      • 4.1. 常规方法:
        • 4.2. PARL思路探讨:
          • 4.2.1 主程序train.py
          • 4.2.2 加载模型,训练,测试------(包含测试)
          • 4.2.3神经网络model
          • 4.2.4 agent模块
      • 5. 仿真结果:
      相关产品与服务
      GPU 云服务器
      GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于生成式AI,自动驾驶,深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档