首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Tello无人机SDK使用Python封装+简单示例

Tello无人机SDK使用Python封装+简单示例

作者头像
云深无际
发布于 2021-03-12 03:44:45
发布于 2021-03-12 03:44:45
1.1K00
代码可运行
举报
文章被收录于专栏:云深之无迹云深之无迹
运行总次数:0
代码可运行
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import socket
import threading
import time
import traceback


class Tello:
    """Wrapper to simply interactions with the Ryze Tello drone."""

    def __init__(self, local_ip, local_port, imperial=True, command_timeout=.3, tello_ip='192.168.10.1', tello_port=8889):
        """Binds to the local IP/port and puts the Tello into command mode.
        Args:
            local_ip (str): 要绑定的本地 IP 地址.
            local_port (int): Local port to bind,本地端口.
            imperial (bool): 如果为 True, 速度是 Mph, 距离是英尺
                             如果 False,速度为 KPH,距离为米
            command_timeout (int|float):等待响应命令的秒数.
            tello_ip (str): Tello IP.
            tello_port (int): Tello port.
        Raises:
            RuntimeError: If the Tello rejects the attempt to enter command mode.
        """

        self.abort_flag = False
        self.command_timeout = command_timeout
        self.imperial = imperial
        self.response = None

        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        # 创建一个UDP的连接
        # socket这个是插座的意思,其实有点像
        self.tello_address = (tello_ip, tello_port)
        # 元组传递ip+端口

        self.socket.bind((local_ip, local_port))

        self.receive_thread = threading.Thread(target=self._receive_thread)
        self.receive_thread.daemon = True
        # 守护线程

        self.receive_thread.start()
        开始一个线程

        if self.send_command('command') != 'OK':
            raise RuntimeError('Tello 被拒绝尝试进入命令模式')

    def __del__(self):
        """关闭Tello端口."""
        self.socket.close()

    def _receive_thread(self):
        """Listens for responses from the Tello.
        Runs as a thread, sets self.response to whatever the Tello last returned.
        """
        while True:
            try:
                self.response, ip = self.socket.recvfrom(256)
            except Exception:
                break

    def flip(self, direction):
        """Flips.
        Args:
            direction (str): Direction to flip, 'l', 'r', 'f', 'b', 'lb', 'lf', 'rb' or 'rf'.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """

        return self.send_command('flip %s' % direction)

    def get_battery(self):
        """Returns percent battery life remaining.
        Returns:
            int: Percent battery life remaining.
        """

        battery = self.send_command('battery?')

        try:
            battery = int(battery)
        except:
            pass

        return battery

    def get_flight_time(self):
        """Returns the number of seconds elapsed during flight.
        Returns:
            int: Seconds elapsed during flight.
        """

        flight_time = self.send_command('time?')

        try:
            flight_time = int(flight_time)
        except:
            pass

        return flight_time

    def get_speed(self):
        """Returns the current speed.
        Returns:
            int: Current speed in KPH or MPH.
        """

        speed = self.send_command('speed?')

        try:
            speed = float(speed)

            if self.imperial is True:
                speed = round((speed / 44.704), 1)
            else:
                speed = round((speed / 27.7778), 1)
        except:
            pass

        return speed

    def land(self):
        """Initiates landing.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """

        return self.send_command('land')

    def move(self, direction, distance):
        """Moves in a direction for a distance.
        This method expects meters or feet. The Tello API expects distances
        from 20 to 500 centimeters.
        Metric: .1 to 5 meters
        Imperial: .7 to 16.4 feet
        Args:
            direction (str): Direction to move, 'forward', 'back', 'right' or 'left'.
            distance (int|float): Distance to move.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """

        distance = float(distance)

        if self.imperial is True:
            distance = int(round(distance * 30.48))
        else:
            distance = int(round(distance * 100))

        return self.send_command('%s %s' % (direction, distance))

    def move_backward(self, distance):
        """Moves backward for a distance.
        See comments for Tello.move().
        Args:
            distance (int): Distance to move.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """

        return self.move('back', distance)

    def move_down(self, distance):
        """Moves down for a distance.
        See comments for Tello.move().
        Args:
            distance (int): Distance to move.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """

        return self.move('down', distance)

    def move_forward(self, distance):
        """Moves forward for a distance.
        See comments for Tello.move().
        Args:
            distance (int): Distance to move.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """
        return self.move('forward', distance)

    def move_left(self, distance):
        """Moves left for a distance.
        See comments for Tello.move().
        Args:
            distance (int): Distance to move.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """
        return self.move('left', distance)

    def move_right(self, distance):
        """Moves right for a distance.
        See comments for Tello.move().
        Args:
            distance (int): Distance to move.
        """
        return self.move('right', distance)

    def move_up(self, distance):
        """Moves up for a distance.
        See comments for Tello.move().
        Args:
            distance (int): Distance to move.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """

        return self.move('up', distance)

    def send_command(self, command):
        """Sends a command to the Tello and waits for a response.
        If self.command_timeout is exceeded before a response is received,
        a RuntimeError exception is raised.
        Args:
            command (str): Command to send.
        Returns:
            str: Response from Tello.
        Raises:
            RuntimeError: If no response is received within self.timeout seconds.
        """

        self.abort_flag = False
        timer = threading.Timer(self.command_timeout, self.set_abort_flag)

        self.socket.sendto(command.encode('utf-8'), self.tello_address)

        timer.start()

        while self.response is None:
            if self.abort_flag is True:
                raise RuntimeError('No response to command')

        timer.cancel()

        response = self.response.decode('utf-8')
        self.response = None

        return response

    def set_abort_flag(self):
        """Sets self.abort_flag to True.
        Used by the timer in Tello.send_command() to indicate to that a response
        timeout has occurred.
        """

        self.abort_flag = True

    def set_speed(self, speed):
        """Sets speed.
        This method expects KPH or MPH. The Tello API expects speeds from
        1 to 100 centimeters/second.
        Metric: .1 to 3.6 KPH
        Imperial: .1 to 2.2 MPH
        Args:
            speed (int|float): Speed.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """

        speed = float(speed)

        if self.imperial is True:
            speed = int(round(speed * 44.704))
        else:
            speed = int(round(speed * 27.7778))

        return self.send_command('speed %s' % speed)

    def takeoff(self):
        """Initiates take-off.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """

        return self.send_command('takeoff')

    def rotate_cw(self, degrees):
        """Rotates clockwise.
        Args:
            degrees (int): Degrees to rotate, 1 to 360.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """

        return self.send_command('cw %s' % degrees)

    def rotate_ccw(self, degrees):
        """Rotates counter-clockwise.
        Args:
            degrees (int): Degrees to rotate, 1 to 360.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """
        return self.send_command('ccw %s' % degrees)

先直接放主要的代码一段

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import Tello
import time

drone = tello.Tello('192.168.10.2', 8888)
drone.takeoff()
time.sleep(5)

drone.set_speed(2)
time.sleep(1)

drone.move_up(3)
time.sleep(5)

drone.move_forward(10)
time.sleep(10)

drone.rotate_cw(180)
time.sleep(5)

drone.move_forward(10)
time.sleep(10)

drone.land()

print('Flight time: %s' % drone.get_flight_time())

再放一段简单的demo

demo因为封装的好,所以就是函数传参加逻辑控制的模式了~

这样放置使用

对于代码来说,照着SDK写的,按说是没有什么大问题.但是我也没有单元测试过,毕竟也没有人给我工资~而且里面的网络编程,多线程,都是我过年现学的,我还会一点GUI,但是写出来好丑呀~就不放了.

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
https://docs.python.org/3/search.html?q=traceback&check_keywords=yes&area=default

为了方便debug引入这个库

该模块提供了一个标准接口来提取、格式化和打印 Python 程序的堆栈跟踪结果。它完全模仿Python 解释器在打印堆栈跟踪结果时的行为。当您想要在程序控制下打印堆栈跟踪结果时,例如在“封装”解释器时,这是非常有用的。

这个模块使用 traceback 对象 —— 这是存储在 sys.last_traceback 中的对象类型变量,并作为 sys.exc_info() 的第三项被返回。

可以方便的看这个模块的api

最终就是用了这么多的库

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def __init__(self, local_ip, local_port, imperial=True, command_timeout=.3, tello_ip='192.168.10.1', tello_port=8889):
        """Binds to the local IP/port and puts the Tello into command mode.
        Args:
            local_ip (str): 要绑定的本地 IP 地址.
            local_port (int): Local port to bind,本地端口.
            imperial (bool): 如果为 True, 速度是 Mph, 距离是英尺
                             如果 False,速度为 KPH,距离为米
            command_timeout (int|float):等待响应命令的秒数.
            tello_ip (str): Tello IP.
            tello_port (int): Tello port.
        Raises:
            RuntimeError: If the Tello rejects the attempt to enter command mode.
        """

        self.abort_flag = False
        self.command_timeout = command_timeout
        self.imperial = imperial
        self.response = None

        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        # 创建一个UDP的连接
        # socket这个是插座的意思,其实有点像
        self.tello_address = (tello_ip, tello_port)
        # 元组传递ip+端口

        self.socket.bind((local_ip, local_port))

        self.receive_thread = threading.Thread(target=self._receive_thread)
        self.receive_thread.daemon = True
        # 守护线程

        self.receive_thread.start()
        开始一个线程

        if self.send_command('command') != 'OK':
            raise RuntimeError('Tello 被拒绝尝试进入命令模式')

这个是一个构造函数,对于与飞机的连接,通信,信息处理都是至关重要的

这段是建立一个IPv4的UDP套接字

注意是元组传参

多线程的知识建议自己去看,我有空写教程

接收的话,也是用一个线程来写,接收256字节的数据,因为就是个控制而已

翻滚的函数,没有难度,直接传参数,发送到飞机

电量

再封装这个函数的时候,考虑到了单位

这个函数可以说是很重要啦,是一个发送函数

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
self.socket.sendto(command.encode('utf-8'), self.tello_address)

sendto函数是把数据按照接收时候的地址,原路发回去

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 timer.start()
        while self.response is None:
            if self.abort_flag is True:
                raise RuntimeError('No response to command')
        timer.cancel()

为了提高代码的健壮性,这个地方做了大量的判断,确保发送成功

剩下就没有上面难点了,感兴趣的可以看注释~

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-02-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 云深之无迹 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档