Module aerpawlib.vehicle

Core logic surrounding the various Vehicles available to aerpawlib user scripts

Expand source code
"""
Core logic surrounding the various `Vehicle`s available to aerpawlib user
scripts
"""
import asyncio
import dronekit
from pymavlink import mavutil
import time
from typing import Callable

from . import util

# time to wait when polling for dronekit vehicle state changes
_POLLING_DELAY = 0.01 # s

class DummyVehicle:
    """
    vehicle for things that don't need vehicles :)

    hacky lol
    """
    def __init__(self, connection_string: str):
        pass

    def close(self):
        pass

    def _initialize_prearm(self, should_postarm_init):
        pass

    async def _initialize_postarm(self):
        pass

class Vehicle:
    """
    Overarching "generic vehicle" type. Implements all functionality, excluding
    movement commands (which are *always* vehicle specific).
    """
    _vehicle: dronekit.Vehicle
    _has_heartbeat: bool
    
    # function used by "verb" functions to check and see if the vehicle can be
    # commanded to move. should be set to a new closure by verb functions to
    # redefine functionality
    _ready_to_move: Callable[[object], bool]=lambda _: True

    # temp hack to allow for dynamically making the drone abortable or not
    # aborting is triggered by mode changes, so we need to ignore the initial
    # takeoff and final landing changes
    _abortable: bool=False
    _aborted: bool=False

    _home_location: util.Coordinate

    def __init__(self, connection_string: str):
        self._vehicle = dronekit.connect(connection_string, wait_ready=True)
        
        self._vehicle.commands.download()
        self._vehicle.commands.wait_ready() # we need to do this to capture
                                            # things such as the home location
        
        self._has_heartbeat = False

        self._should_postarm_init = True
        
        # register required listeners after connecting
        def _heartbeat_listener(_, __, value):
            if value > 1 and self._has_heartbeat:
                self._has_heartbeat = False
            elif value < 1 and not self._has_heartbeat:
                self._has_heartbeat = True
        self._vehicle.add_attribute_listener("last_heartbeat", _heartbeat_listener)

        def _abort_listener(_, __, value):
            # TODO abort logic is more complicated :P
            # if value != "GUIDED":
            #     self._abort()
            return
        self._vehicle.add_attribute_listener("mode", _abort_listener)

        # wait for connection
        while not self._has_heartbeat:
            time.sleep(_POLLING_DELAY)

    # nouns
    @property
    def connected(self) -> bool:
        """
        True if receiving heartbeats, False otherwise
        """
        return self._has_heartbeat

    @property
    def position(self) -> util.Coordinate:
        """
        Get the current position of the Vehicle as a `util.Coordinate`
        """
        loc = self._vehicle.location.global_relative_frame
        return util.Coordinate(loc.lat, loc.lon, loc.alt)

    @property
    def battery(self) -> dronekit.Battery:
        """
        Get the status of the battery. Wraps `dronekit.Battery`, which makes
        the `voltage`, `current`, and `level` available
        """
        return self._vehicle.battery

    @property
    def gps(self) -> dronekit.GPSInfo:
        """
        Get the current GPS status (for gps_0 -- can be changed in the future).
        Wraps `dronekit.GPSInfo`, which exposes the `fix_type` (0-1: no fix,
        2: 2d fix, 3: 3d fix), and number of `satellites_visible`, among other
        things.
        """
        return self._vehicle.gps_0

    @property
    def armed(self) -> bool:
        return self._vehicle.armed

    @property
    def home_coords(self) -> util.Coordinate:
        return self._home_location
    
    @property
    def heading(self) -> float:
        return self._vehicle.heading
    
    # special things
    def done_moving(self) -> bool:
        """
        See if the vehicle is ready to move (i.e. if the last movement command
        has been completed). Also makes sure that the vehicle is connected and
        that we haven't aborted.
        
        This is more accurately a function that describes the vehicle's
        willingness to take a new command.
        """
        if not self.connected or self._aborted:
            return False

        # syntax hack. functions and methods are different and need to be called
        # differently to prevent them from being bound to self
        if hasattr(self._ready_to_move, "__func__"):        # method
            return self._ready_to_move.__func__(self)
        return self._ready_to_move(self)                    # function
    
    async def await_ready_to_move(self):
        """
        Helper function that blocks execution and waits for the vehicle to
        finish the current action/movement that it was instructed to do.

        Makes use of `Vehicle.done_moving`
        """
        while not self.done_moving(): await asyncio.sleep(_POLLING_DELAY)

    def _abort(self):
        # TODO this should be something different in the future.
        # the intent of it in the past has been blocking further execution of
        # more vehicle control logic.
        if self._abortable:
            print("Aborted.")
            self._abortable = False
            self._aborted = True

    # verbs
    def close(self):
        """
        Clean up the `Vehicle` object/any state
        """
        self._vehicle.close()

    async def set_armed(self, value: bool):
        """
        Arm or disarm this vehicle, and wait for it to be armed (if possible)

        Dronekit doesn't guarentee that the vehicle arms immediately (or at
        all!), so this will block execution until the vehicle has been armed.

        If the vehicle can't be armed, an Exception is raised.
        """
        # dronekit doesn't guarentee that the vehicle arms immediately (or at all!)
        # this pattern keeps the funky logic out of the experimenter's script
        # to make sure that things are safer overall
        if not self._vehicle.is_armable:
            raise Exception("Not ready to arm") # in this case, the script dies completely
                                                # obviously not optimal *unless* we are
                                                # certain that a scipt always arms once
        self._vehicle.armed = value
        while not self._vehicle.armed: await asyncio.sleep(_POLLING_DELAY)
    
    def _initialize_prearm(self, should_postarm_init):
        while not self._vehicle.system_status in ["STANDBY", "ACTIVE"]: time.sleep(_POLLING_DELAY)
        self._should_postarm_init = should_postarm_init

    async def _initialize_postarm(self):
        """
        Generic pre-mission manipulation of the vehicle into a state that is
        acceptable. MUST be called before anything else. Though this is done by
        the runner.
        """
        if not self._should_postarm_init:
            return

        while not self._vehicle.is_armable: await asyncio.sleep(_POLLING_DELAY)
        while not self.armed: await asyncio.sleep(_POLLING_DELAY)

        self._vehicle.mode = dronekit.VehicleMode("GUIDED")
        self._abortable = True
        self._home_location = self.position
    
    async def goto_coordinates(self,
            coordinates: util.Coordinate,
            tolerance: float=2,
            target_heading: float=None):
        """
        Make the vehicle go to provided coordinates.

        `tolerance` is the min distance away from the coordinates, in meters,
        that is acceptable.

        `target_heading`, when set, will make the drone point in the specified
        direction (absolute). Otherwise the drone will remain pointing in the
        current direction. This is only available on drones.

        This method is only available for vehicles built off the `Vehicle` type
        (ex: `Drone` or `Rover`)
        """
        raise Exception("Generic vehicles can't go to coordinates!")

    async def set_velocity(self,
            velocity_vector: util.VectorNED,
            global_relative: bool=True,
            duration: float=None):
        """
        Set a drone's velocity that it will use for `duration` seconds.

        The velocity vector provided will be inerpreted as being global
        relative *in direction* unless the `global_relative` parameter is set
        to False.

        The vehicle will maintain this velocity for `duration` seconds, if
        `duration` is provided, otherwise it will automatically maintain the
        specified velocity until another command is sent.
        """
        raise Exception("set_velocity not implemented")

    async def set_groundspeed(self, velocity: float):
        """
        Set a vehicle's cruise velocity as used by the autopilot when performing
        guided movement operations (ex: goto_coordinates). In m/s.

        NOTE:
            This is not always respected by the autopilot and will not succeed
            on rover type vehicles in simulation.
        """
        self._vehicle.groundspeed = velocity
    
    async def _stop(self):
        """
        Internal utility to stop any movement being run in the background. This
        ignores await_ready_to_move(). Requires set_velocity to be implemented
        for the given vehicle.

        TODO needs testing. setting velocity to 0\bar may not be best way to
        stop
        """
        self._ready_to_move = lambda _: True

class Drone(Vehicle):
    """
    Drone vehicle type. Implements all functionality that AERPAW's drones
    expose to user scripts, which includes basic movement control (going to
    coords, turning, landing).
    """
    async def set_heading(self, heading: float):
        """
        Set the heading of the vehicle (in absolute deg).
        
        To turn a relative # of degrees, you can do something like
        `drone.set_heading(drone.heading + x)`

        NOTE that this function still needs to be tested kind of extensively.
        Ardupilot has a few internal states that control the heading, and when
        it's manually set via a CMD_CONDITION_YAW command over mavlink, it's
        possible for it to either be ignored, be accepted and then ignored, or
        be accepted, switch the drone's internal turning state to be manually
        controlled, and then be stuck that way (i.e. the drone won't auto-fly
        in a "straight" direction). Basically, be warned.
        """
        await self.await_ready_to_move()

        heading %= 360
        
        # NOTE that the system and component below are derived from commands
        # observed in SITL. could be wrong, and it's kind of magic undocumnted stuff.
        # doing more research.
        msg = self._vehicle.message_factory.command_long_encode(
            0, 0,                                       # target system, component
            mavutil.mavlink.MAV_CMD_CONDITION_YAW,      # command
            0,                                          # confirmation
            heading,                                    # yaw angle in deg
            0,                                          # yaw speed in deg/s
            0,                                          # direction to turn in (-1: ccw, 1: cw)
            0,                                          # never turn relative to our current heading
            0, 0, 0                                     # unused
            )
        self._vehicle.send_mavlink(msg)
        
        def _pointed_at_heading(self) -> bool:
            _TURN_TOLERANCE_DEG = 5
            turn_diff = min([abs(i) for i in [heading - self.heading, self.heading - (heading + 360)]])
            return turn_diff <= _TURN_TOLERANCE_DEG
        self._ready_to_move = _pointed_at_heading

        while not _pointed_at_heading(self): await asyncio.sleep(_POLLING_DELAY)

    async def takeoff(self, target_alt: float, min_alt_tolerance: float=0.95):
        """
        Make the drone take off to a specific altitude, and blocks until the
        drone has reached that altitude.

        Additionally waits to make sure that channel 4 of RCIN (used for yaw)
        is centered to avoid yaw during takeoff if the drone was *just* armed.
        """
        await self.await_ready_to_move()        

        # TODO the below logic needs to be tested at the field (and likely made less brittle)
        # wait for sticks to return to center by taking rolling avg (30 frames)
        rcin_4 = [-999] * 30 # use something obviously out of range
        def _rcin_4_listener(_, __, message):
            rcin_4.pop(0)
            rcin_4.append(message.chan4_raw)
        self._vehicle.add_message_listener("RC_CHANNELS", _rcin_4_listener)
        while not 1450 <= (sum(rcin_4) / len(rcin_4)) <= 1550: await asyncio.sleep(_POLLING_DELAY)
        self._vehicle.remove_message_listener("RC_CHANNELS", _rcin_4_listener)
        
        self._vehicle.simple_takeoff(target_alt)
        
        taken_off = lambda self: self.position.alt >= target_alt * min_alt_tolerance
        self._ready_to_move = taken_off

        while not taken_off(self): await asyncio.sleep(_POLLING_DELAY)
        await asyncio.sleep(5)

    async def land(self):
        """
        Land the drone at its current position and block while waiting for it
        to be disarmed. No further movement is allowed after the drone has been
        landed (for now, may be changed later).
        """
        await self.await_ready_to_move()

        self._abortable = False
        self._vehicle.mode = dronekit.VehicleMode("LAND")

        self._ready_to_move = lambda _: False
        while self.armed: await asyncio.sleep(_POLLING_DELAY)

    async def goto_coordinates(self,
            coordinates: util.Coordinate,
            tolerance: float=2,
            target_heading: float=None):
        if target_heading != None:
            await self.set_heading(target_heading)
        
        await self.await_ready_to_move()
        await self._stop()
        self._vehicle.simple_goto(coordinates.location())
        
        # TODO in the future we likely want to split alt into a different tolerance
        at_coords = lambda self: \
            coordinates.distance(self.position) <= tolerance
        self._ready_to_move = at_coords

        while not at_coords(self): await asyncio.sleep(_POLLING_DELAY)

    _velocity_loop_active: bool=False

    async def set_velocity(self,
            velocity_vector: util.VectorNED,
            global_relative: bool=True,
            duration: float=None):
        await self.await_ready_to_move()
        
        self._velocity_loop_active = False # TODO race condition bleh
        await asyncio.sleep(_POLLING_DELAY)
        
        if not global_relative:
            velocity_vector = velocity_vector.rotate_by_angle(-self.heading)

        msg = self._vehicle.message_factory.set_position_target_global_int_encode(
                0, 0, 0,                # unused, target sys, component
                mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT_INT,
                0b0000111111000111,     # bitmask to only set speed
                0, 0, 0,                # unused
                velocity_vector.north,
                velocity_vector.east,
                velocity_vector.down,
                0, 0, 0,                # (unsupported) accel
                0, 0                    # yaw/rate
                )
        
        self._ready_to_move = lambda _: True
        target_end = time.time() + duration if duration is not None else None
        
        async def _velocity_helper():
            while self._velocity_loop_active:
                if target_end is not None and time.time() > target_end:
                    self._velocity_loop_active = False
                self._vehicle.send_mavlink(msg)
                await asyncio.sleep(0.1)        # TODO tune for better perf
        self._velocity_loop_active = True
        asyncio.ensure_future(_velocity_helper())

    async def _stop(self):
        await super()._stop()
        await self.set_velocity(util.VectorNED(0, 0, 0))
        self._velocity_loop_active = False

class Rover(Vehicle):
    """
    Rover vehicle type. Implements all functionality that AERPAW's rovers
    expose to user scripts, which includes basic movement control (going to
    coords).

    `target_heading` is ignored for rovers, as they can't strafe.
    """
    async def goto_coordinates(self,
            coordinates: util.Coordinate,
            tolerance: float=2,
            target_heading: float=None):
        await self.await_ready_to_move()
        self._vehicle.simple_goto(util.Coordinate(coordinates.lat, coordinates.lon, 0).location())
        
        at_coords = lambda self: \
            coordinates.ground_distance(self.position) <= tolerance
        self._ready_to_move = at_coords

        while not at_coords(self): await asyncio.sleep(_POLLING_DELAY)

# TODO break this down further?:
# class LAM(Drone)
# class SAM(Drone)

Classes

class Drone (connection_string: str)

Drone vehicle type. Implements all functionality that AERPAW's drones expose to user scripts, which includes basic movement control (going to coords, turning, landing).

Expand source code
class Drone(Vehicle):
    """
    Drone vehicle type. Implements all functionality that AERPAW's drones
    expose to user scripts, which includes basic movement control (going to
    coords, turning, landing).
    """
    async def set_heading(self, heading: float):
        """
        Set the heading of the vehicle (in absolute deg).
        
        To turn a relative # of degrees, you can do something like
        `drone.set_heading(drone.heading + x)`

        NOTE that this function still needs to be tested kind of extensively.
        Ardupilot has a few internal states that control the heading, and when
        it's manually set via a CMD_CONDITION_YAW command over mavlink, it's
        possible for it to either be ignored, be accepted and then ignored, or
        be accepted, switch the drone's internal turning state to be manually
        controlled, and then be stuck that way (i.e. the drone won't auto-fly
        in a "straight" direction). Basically, be warned.
        """
        await self.await_ready_to_move()

        heading %= 360
        
        # NOTE that the system and component below are derived from commands
        # observed in SITL. could be wrong, and it's kind of magic undocumnted stuff.
        # doing more research.
        msg = self._vehicle.message_factory.command_long_encode(
            0, 0,                                       # target system, component
            mavutil.mavlink.MAV_CMD_CONDITION_YAW,      # command
            0,                                          # confirmation
            heading,                                    # yaw angle in deg
            0,                                          # yaw speed in deg/s
            0,                                          # direction to turn in (-1: ccw, 1: cw)
            0,                                          # never turn relative to our current heading
            0, 0, 0                                     # unused
            )
        self._vehicle.send_mavlink(msg)
        
        def _pointed_at_heading(self) -> bool:
            _TURN_TOLERANCE_DEG = 5
            turn_diff = min([abs(i) for i in [heading - self.heading, self.heading - (heading + 360)]])
            return turn_diff <= _TURN_TOLERANCE_DEG
        self._ready_to_move = _pointed_at_heading

        while not _pointed_at_heading(self): await asyncio.sleep(_POLLING_DELAY)

    async def takeoff(self, target_alt: float, min_alt_tolerance: float=0.95):
        """
        Make the drone take off to a specific altitude, and blocks until the
        drone has reached that altitude.

        Additionally waits to make sure that channel 4 of RCIN (used for yaw)
        is centered to avoid yaw during takeoff if the drone was *just* armed.
        """
        await self.await_ready_to_move()        

        # TODO the below logic needs to be tested at the field (and likely made less brittle)
        # wait for sticks to return to center by taking rolling avg (30 frames)
        rcin_4 = [-999] * 30 # use something obviously out of range
        def _rcin_4_listener(_, __, message):
            rcin_4.pop(0)
            rcin_4.append(message.chan4_raw)
        self._vehicle.add_message_listener("RC_CHANNELS", _rcin_4_listener)
        while not 1450 <= (sum(rcin_4) / len(rcin_4)) <= 1550: await asyncio.sleep(_POLLING_DELAY)
        self._vehicle.remove_message_listener("RC_CHANNELS", _rcin_4_listener)
        
        self._vehicle.simple_takeoff(target_alt)
        
        taken_off = lambda self: self.position.alt >= target_alt * min_alt_tolerance
        self._ready_to_move = taken_off

        while not taken_off(self): await asyncio.sleep(_POLLING_DELAY)
        await asyncio.sleep(5)

    async def land(self):
        """
        Land the drone at its current position and block while waiting for it
        to be disarmed. No further movement is allowed after the drone has been
        landed (for now, may be changed later).
        """
        await self.await_ready_to_move()

        self._abortable = False
        self._vehicle.mode = dronekit.VehicleMode("LAND")

        self._ready_to_move = lambda _: False
        while self.armed: await asyncio.sleep(_POLLING_DELAY)

    async def goto_coordinates(self,
            coordinates: util.Coordinate,
            tolerance: float=2,
            target_heading: float=None):
        if target_heading != None:
            await self.set_heading(target_heading)
        
        await self.await_ready_to_move()
        await self._stop()
        self._vehicle.simple_goto(coordinates.location())
        
        # TODO in the future we likely want to split alt into a different tolerance
        at_coords = lambda self: \
            coordinates.distance(self.position) <= tolerance
        self._ready_to_move = at_coords

        while not at_coords(self): await asyncio.sleep(_POLLING_DELAY)

    _velocity_loop_active: bool=False

    async def set_velocity(self,
            velocity_vector: util.VectorNED,
            global_relative: bool=True,
            duration: float=None):
        await self.await_ready_to_move()
        
        self._velocity_loop_active = False # TODO race condition bleh
        await asyncio.sleep(_POLLING_DELAY)
        
        if not global_relative:
            velocity_vector = velocity_vector.rotate_by_angle(-self.heading)

        msg = self._vehicle.message_factory.set_position_target_global_int_encode(
                0, 0, 0,                # unused, target sys, component
                mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT_INT,
                0b0000111111000111,     # bitmask to only set speed
                0, 0, 0,                # unused
                velocity_vector.north,
                velocity_vector.east,
                velocity_vector.down,
                0, 0, 0,                # (unsupported) accel
                0, 0                    # yaw/rate
                )
        
        self._ready_to_move = lambda _: True
        target_end = time.time() + duration if duration is not None else None
        
        async def _velocity_helper():
            while self._velocity_loop_active:
                if target_end is not None and time.time() > target_end:
                    self._velocity_loop_active = False
                self._vehicle.send_mavlink(msg)
                await asyncio.sleep(0.1)        # TODO tune for better perf
        self._velocity_loop_active = True
        asyncio.ensure_future(_velocity_helper())

    async def _stop(self):
        await super()._stop()
        await self.set_velocity(util.VectorNED(0, 0, 0))
        self._velocity_loop_active = False

Ancestors

Methods

async def land(self)

Land the drone at its current position and block while waiting for it to be disarmed. No further movement is allowed after the drone has been landed (for now, may be changed later).

Expand source code
async def land(self):
    """
    Land the drone at its current position and block while waiting for it
    to be disarmed. No further movement is allowed after the drone has been
    landed (for now, may be changed later).
    """
    await self.await_ready_to_move()

    self._abortable = False
    self._vehicle.mode = dronekit.VehicleMode("LAND")

    self._ready_to_move = lambda _: False
    while self.armed: await asyncio.sleep(_POLLING_DELAY)
async def set_heading(self, heading: float)

Set the heading of the vehicle (in absolute deg).

To turn a relative # of degrees, you can do something like drone.set_heading(drone.heading + x)

NOTE that this function still needs to be tested kind of extensively. Ardupilot has a few internal states that control the heading, and when it's manually set via a CMD_CONDITION_YAW command over mavlink, it's possible for it to either be ignored, be accepted and then ignored, or be accepted, switch the drone's internal turning state to be manually controlled, and then be stuck that way (i.e. the drone won't auto-fly in a "straight" direction). Basically, be warned.

Expand source code
async def set_heading(self, heading: float):
    """
    Set the heading of the vehicle (in absolute deg).
    
    To turn a relative # of degrees, you can do something like
    `drone.set_heading(drone.heading + x)`

    NOTE that this function still needs to be tested kind of extensively.
    Ardupilot has a few internal states that control the heading, and when
    it's manually set via a CMD_CONDITION_YAW command over mavlink, it's
    possible for it to either be ignored, be accepted and then ignored, or
    be accepted, switch the drone's internal turning state to be manually
    controlled, and then be stuck that way (i.e. the drone won't auto-fly
    in a "straight" direction). Basically, be warned.
    """
    await self.await_ready_to_move()

    heading %= 360
    
    # NOTE that the system and component below are derived from commands
    # observed in SITL. could be wrong, and it's kind of magic undocumnted stuff.
    # doing more research.
    msg = self._vehicle.message_factory.command_long_encode(
        0, 0,                                       # target system, component
        mavutil.mavlink.MAV_CMD_CONDITION_YAW,      # command
        0,                                          # confirmation
        heading,                                    # yaw angle in deg
        0,                                          # yaw speed in deg/s
        0,                                          # direction to turn in (-1: ccw, 1: cw)
        0,                                          # never turn relative to our current heading
        0, 0, 0                                     # unused
        )
    self._vehicle.send_mavlink(msg)
    
    def _pointed_at_heading(self) -> bool:
        _TURN_TOLERANCE_DEG = 5
        turn_diff = min([abs(i) for i in [heading - self.heading, self.heading - (heading + 360)]])
        return turn_diff <= _TURN_TOLERANCE_DEG
    self._ready_to_move = _pointed_at_heading

    while not _pointed_at_heading(self): await asyncio.sleep(_POLLING_DELAY)
async def takeoff(self, target_alt: float, min_alt_tolerance: float = 0.95)

Make the drone take off to a specific altitude, and blocks until the drone has reached that altitude.

Additionally waits to make sure that channel 4 of RCIN (used for yaw) is centered to avoid yaw during takeoff if the drone was just armed.

Expand source code
async def takeoff(self, target_alt: float, min_alt_tolerance: float=0.95):
    """
    Make the drone take off to a specific altitude, and blocks until the
    drone has reached that altitude.

    Additionally waits to make sure that channel 4 of RCIN (used for yaw)
    is centered to avoid yaw during takeoff if the drone was *just* armed.
    """
    await self.await_ready_to_move()        

    # TODO the below logic needs to be tested at the field (and likely made less brittle)
    # wait for sticks to return to center by taking rolling avg (30 frames)
    rcin_4 = [-999] * 30 # use something obviously out of range
    def _rcin_4_listener(_, __, message):
        rcin_4.pop(0)
        rcin_4.append(message.chan4_raw)
    self._vehicle.add_message_listener("RC_CHANNELS", _rcin_4_listener)
    while not 1450 <= (sum(rcin_4) / len(rcin_4)) <= 1550: await asyncio.sleep(_POLLING_DELAY)
    self._vehicle.remove_message_listener("RC_CHANNELS", _rcin_4_listener)
    
    self._vehicle.simple_takeoff(target_alt)
    
    taken_off = lambda self: self.position.alt >= target_alt * min_alt_tolerance
    self._ready_to_move = taken_off

    while not taken_off(self): await asyncio.sleep(_POLLING_DELAY)
    await asyncio.sleep(5)

Inherited members

class DummyVehicle (connection_string: str)

vehicle for things that don't need vehicles :)

hacky lol

Expand source code
class DummyVehicle:
    """
    vehicle for things that don't need vehicles :)

    hacky lol
    """
    def __init__(self, connection_string: str):
        pass

    def close(self):
        pass

    def _initialize_prearm(self, should_postarm_init):
        pass

    async def _initialize_postarm(self):
        pass

Methods

def close(self)
Expand source code
def close(self):
    pass
class Rover (connection_string: str)

Rover vehicle type. Implements all functionality that AERPAW's rovers expose to user scripts, which includes basic movement control (going to coords).

target_heading is ignored for rovers, as they can't strafe.

Expand source code
class Rover(Vehicle):
    """
    Rover vehicle type. Implements all functionality that AERPAW's rovers
    expose to user scripts, which includes basic movement control (going to
    coords).

    `target_heading` is ignored for rovers, as they can't strafe.
    """
    async def goto_coordinates(self,
            coordinates: util.Coordinate,
            tolerance: float=2,
            target_heading: float=None):
        await self.await_ready_to_move()
        self._vehicle.simple_goto(util.Coordinate(coordinates.lat, coordinates.lon, 0).location())
        
        at_coords = lambda self: \
            coordinates.ground_distance(self.position) <= tolerance
        self._ready_to_move = at_coords

        while not at_coords(self): await asyncio.sleep(_POLLING_DELAY)

Ancestors

Inherited members

class Vehicle (connection_string: str)

Overarching "generic vehicle" type. Implements all functionality, excluding movement commands (which are always vehicle specific).

Expand source code
class Vehicle:
    """
    Overarching "generic vehicle" type. Implements all functionality, excluding
    movement commands (which are *always* vehicle specific).
    """
    _vehicle: dronekit.Vehicle
    _has_heartbeat: bool
    
    # function used by "verb" functions to check and see if the vehicle can be
    # commanded to move. should be set to a new closure by verb functions to
    # redefine functionality
    _ready_to_move: Callable[[object], bool]=lambda _: True

    # temp hack to allow for dynamically making the drone abortable or not
    # aborting is triggered by mode changes, so we need to ignore the initial
    # takeoff and final landing changes
    _abortable: bool=False
    _aborted: bool=False

    _home_location: util.Coordinate

    def __init__(self, connection_string: str):
        self._vehicle = dronekit.connect(connection_string, wait_ready=True)
        
        self._vehicle.commands.download()
        self._vehicle.commands.wait_ready() # we need to do this to capture
                                            # things such as the home location
        
        self._has_heartbeat = False

        self._should_postarm_init = True
        
        # register required listeners after connecting
        def _heartbeat_listener(_, __, value):
            if value > 1 and self._has_heartbeat:
                self._has_heartbeat = False
            elif value < 1 and not self._has_heartbeat:
                self._has_heartbeat = True
        self._vehicle.add_attribute_listener("last_heartbeat", _heartbeat_listener)

        def _abort_listener(_, __, value):
            # TODO abort logic is more complicated :P
            # if value != "GUIDED":
            #     self._abort()
            return
        self._vehicle.add_attribute_listener("mode", _abort_listener)

        # wait for connection
        while not self._has_heartbeat:
            time.sleep(_POLLING_DELAY)

    # nouns
    @property
    def connected(self) -> bool:
        """
        True if receiving heartbeats, False otherwise
        """
        return self._has_heartbeat

    @property
    def position(self) -> util.Coordinate:
        """
        Get the current position of the Vehicle as a `util.Coordinate`
        """
        loc = self._vehicle.location.global_relative_frame
        return util.Coordinate(loc.lat, loc.lon, loc.alt)

    @property
    def battery(self) -> dronekit.Battery:
        """
        Get the status of the battery. Wraps `dronekit.Battery`, which makes
        the `voltage`, `current`, and `level` available
        """
        return self._vehicle.battery

    @property
    def gps(self) -> dronekit.GPSInfo:
        """
        Get the current GPS status (for gps_0 -- can be changed in the future).
        Wraps `dronekit.GPSInfo`, which exposes the `fix_type` (0-1: no fix,
        2: 2d fix, 3: 3d fix), and number of `satellites_visible`, among other
        things.
        """
        return self._vehicle.gps_0

    @property
    def armed(self) -> bool:
        return self._vehicle.armed

    @property
    def home_coords(self) -> util.Coordinate:
        return self._home_location
    
    @property
    def heading(self) -> float:
        return self._vehicle.heading
    
    # special things
    def done_moving(self) -> bool:
        """
        See if the vehicle is ready to move (i.e. if the last movement command
        has been completed). Also makes sure that the vehicle is connected and
        that we haven't aborted.
        
        This is more accurately a function that describes the vehicle's
        willingness to take a new command.
        """
        if not self.connected or self._aborted:
            return False

        # syntax hack. functions and methods are different and need to be called
        # differently to prevent them from being bound to self
        if hasattr(self._ready_to_move, "__func__"):        # method
            return self._ready_to_move.__func__(self)
        return self._ready_to_move(self)                    # function
    
    async def await_ready_to_move(self):
        """
        Helper function that blocks execution and waits for the vehicle to
        finish the current action/movement that it was instructed to do.

        Makes use of `Vehicle.done_moving`
        """
        while not self.done_moving(): await asyncio.sleep(_POLLING_DELAY)

    def _abort(self):
        # TODO this should be something different in the future.
        # the intent of it in the past has been blocking further execution of
        # more vehicle control logic.
        if self._abortable:
            print("Aborted.")
            self._abortable = False
            self._aborted = True

    # verbs
    def close(self):
        """
        Clean up the `Vehicle` object/any state
        """
        self._vehicle.close()

    async def set_armed(self, value: bool):
        """
        Arm or disarm this vehicle, and wait for it to be armed (if possible)

        Dronekit doesn't guarentee that the vehicle arms immediately (or at
        all!), so this will block execution until the vehicle has been armed.

        If the vehicle can't be armed, an Exception is raised.
        """
        # dronekit doesn't guarentee that the vehicle arms immediately (or at all!)
        # this pattern keeps the funky logic out of the experimenter's script
        # to make sure that things are safer overall
        if not self._vehicle.is_armable:
            raise Exception("Not ready to arm") # in this case, the script dies completely
                                                # obviously not optimal *unless* we are
                                                # certain that a scipt always arms once
        self._vehicle.armed = value
        while not self._vehicle.armed: await asyncio.sleep(_POLLING_DELAY)
    
    def _initialize_prearm(self, should_postarm_init):
        while not self._vehicle.system_status in ["STANDBY", "ACTIVE"]: time.sleep(_POLLING_DELAY)
        self._should_postarm_init = should_postarm_init

    async def _initialize_postarm(self):
        """
        Generic pre-mission manipulation of the vehicle into a state that is
        acceptable. MUST be called before anything else. Though this is done by
        the runner.
        """
        if not self._should_postarm_init:
            return

        while not self._vehicle.is_armable: await asyncio.sleep(_POLLING_DELAY)
        while not self.armed: await asyncio.sleep(_POLLING_DELAY)

        self._vehicle.mode = dronekit.VehicleMode("GUIDED")
        self._abortable = True
        self._home_location = self.position
    
    async def goto_coordinates(self,
            coordinates: util.Coordinate,
            tolerance: float=2,
            target_heading: float=None):
        """
        Make the vehicle go to provided coordinates.

        `tolerance` is the min distance away from the coordinates, in meters,
        that is acceptable.

        `target_heading`, when set, will make the drone point in the specified
        direction (absolute). Otherwise the drone will remain pointing in the
        current direction. This is only available on drones.

        This method is only available for vehicles built off the `Vehicle` type
        (ex: `Drone` or `Rover`)
        """
        raise Exception("Generic vehicles can't go to coordinates!")

    async def set_velocity(self,
            velocity_vector: util.VectorNED,
            global_relative: bool=True,
            duration: float=None):
        """
        Set a drone's velocity that it will use for `duration` seconds.

        The velocity vector provided will be inerpreted as being global
        relative *in direction* unless the `global_relative` parameter is set
        to False.

        The vehicle will maintain this velocity for `duration` seconds, if
        `duration` is provided, otherwise it will automatically maintain the
        specified velocity until another command is sent.
        """
        raise Exception("set_velocity not implemented")

    async def set_groundspeed(self, velocity: float):
        """
        Set a vehicle's cruise velocity as used by the autopilot when performing
        guided movement operations (ex: goto_coordinates). In m/s.

        NOTE:
            This is not always respected by the autopilot and will not succeed
            on rover type vehicles in simulation.
        """
        self._vehicle.groundspeed = velocity
    
    async def _stop(self):
        """
        Internal utility to stop any movement being run in the background. This
        ignores await_ready_to_move(). Requires set_velocity to be implemented
        for the given vehicle.

        TODO needs testing. setting velocity to 0\bar may not be best way to
        stop
        """
        self._ready_to_move = lambda _: True

Subclasses

Instance variables

var armed : bool
Expand source code
@property
def armed(self) -> bool:
    return self._vehicle.armed
var battery : dronekit.Battery

Get the status of the battery. Wraps dronekit.Battery, which makes the voltage, current, and level available

Expand source code
@property
def battery(self) -> dronekit.Battery:
    """
    Get the status of the battery. Wraps `dronekit.Battery`, which makes
    the `voltage`, `current`, and `level` available
    """
    return self._vehicle.battery
var connected : bool

True if receiving heartbeats, False otherwise

Expand source code
@property
def connected(self) -> bool:
    """
    True if receiving heartbeats, False otherwise
    """
    return self._has_heartbeat
var gps : dronekit.GPSInfo

Get the current GPS status (for gps_0 – can be changed in the future). Wraps dronekit.GPSInfo, which exposes the fix_type (0-1: no fix, 2: 2d fix, 3: 3d fix), and number of satellites_visible, among other things.

Expand source code
@property
def gps(self) -> dronekit.GPSInfo:
    """
    Get the current GPS status (for gps_0 -- can be changed in the future).
    Wraps `dronekit.GPSInfo`, which exposes the `fix_type` (0-1: no fix,
    2: 2d fix, 3: 3d fix), and number of `satellites_visible`, among other
    things.
    """
    return self._vehicle.gps_0
var heading : float
Expand source code
@property
def heading(self) -> float:
    return self._vehicle.heading
var home_coordsCoordinate
Expand source code
@property
def home_coords(self) -> util.Coordinate:
    return self._home_location
var positionCoordinate

Get the current position of the Vehicle as a util.Coordinate

Expand source code
@property
def position(self) -> util.Coordinate:
    """
    Get the current position of the Vehicle as a `util.Coordinate`
    """
    loc = self._vehicle.location.global_relative_frame
    return util.Coordinate(loc.lat, loc.lon, loc.alt)

Methods

async def await_ready_to_move(self)

Helper function that blocks execution and waits for the vehicle to finish the current action/movement that it was instructed to do.

Makes use of Vehicle.done_moving()

Expand source code
async def await_ready_to_move(self):
    """
    Helper function that blocks execution and waits for the vehicle to
    finish the current action/movement that it was instructed to do.

    Makes use of `Vehicle.done_moving`
    """
    while not self.done_moving(): await asyncio.sleep(_POLLING_DELAY)
def close(self)

Clean up the Vehicle object/any state

Expand source code
def close(self):
    """
    Clean up the `Vehicle` object/any state
    """
    self._vehicle.close()
def done_moving(self) ‑> bool

See if the vehicle is ready to move (i.e. if the last movement command has been completed). Also makes sure that the vehicle is connected and that we haven't aborted.

This is more accurately a function that describes the vehicle's willingness to take a new command.

Expand source code
def done_moving(self) -> bool:
    """
    See if the vehicle is ready to move (i.e. if the last movement command
    has been completed). Also makes sure that the vehicle is connected and
    that we haven't aborted.
    
    This is more accurately a function that describes the vehicle's
    willingness to take a new command.
    """
    if not self.connected or self._aborted:
        return False

    # syntax hack. functions and methods are different and need to be called
    # differently to prevent them from being bound to self
    if hasattr(self._ready_to_move, "__func__"):        # method
        return self._ready_to_move.__func__(self)
    return self._ready_to_move(self)                    # function
async def goto_coordinates(self, coordinates: Coordinate, tolerance: float = 2, target_heading: float = None)

Make the vehicle go to provided coordinates.

tolerance is the min distance away from the coordinates, in meters, that is acceptable.

target_heading, when set, will make the drone point in the specified direction (absolute). Otherwise the drone will remain pointing in the current direction. This is only available on drones.

This method is only available for vehicles built off the Vehicle type (ex: Drone or Rover)

Expand source code
async def goto_coordinates(self,
        coordinates: util.Coordinate,
        tolerance: float=2,
        target_heading: float=None):
    """
    Make the vehicle go to provided coordinates.

    `tolerance` is the min distance away from the coordinates, in meters,
    that is acceptable.

    `target_heading`, when set, will make the drone point in the specified
    direction (absolute). Otherwise the drone will remain pointing in the
    current direction. This is only available on drones.

    This method is only available for vehicles built off the `Vehicle` type
    (ex: `Drone` or `Rover`)
    """
    raise Exception("Generic vehicles can't go to coordinates!")
async def set_armed(self, value: bool)

Arm or disarm this vehicle, and wait for it to be armed (if possible)

Dronekit doesn't guarentee that the vehicle arms immediately (or at all!), so this will block execution until the vehicle has been armed.

If the vehicle can't be armed, an Exception is raised.

Expand source code
async def set_armed(self, value: bool):
    """
    Arm or disarm this vehicle, and wait for it to be armed (if possible)

    Dronekit doesn't guarentee that the vehicle arms immediately (or at
    all!), so this will block execution until the vehicle has been armed.

    If the vehicle can't be armed, an Exception is raised.
    """
    # dronekit doesn't guarentee that the vehicle arms immediately (or at all!)
    # this pattern keeps the funky logic out of the experimenter's script
    # to make sure that things are safer overall
    if not self._vehicle.is_armable:
        raise Exception("Not ready to arm") # in this case, the script dies completely
                                            # obviously not optimal *unless* we are
                                            # certain that a scipt always arms once
    self._vehicle.armed = value
    while not self._vehicle.armed: await asyncio.sleep(_POLLING_DELAY)
async def set_groundspeed(self, velocity: float)

Set a vehicle's cruise velocity as used by the autopilot when performing guided movement operations (ex: goto_coordinates). In m/s.

Note

This is not always respected by the autopilot and will not succeed on rover type vehicles in simulation.

Expand source code
async def set_groundspeed(self, velocity: float):
    """
    Set a vehicle's cruise velocity as used by the autopilot when performing
    guided movement operations (ex: goto_coordinates). In m/s.

    NOTE:
        This is not always respected by the autopilot and will not succeed
        on rover type vehicles in simulation.
    """
    self._vehicle.groundspeed = velocity
async def set_velocity(self, velocity_vector: VectorNED, global_relative: bool = True, duration: float = None)

Set a drone's velocity that it will use for duration seconds.

The velocity vector provided will be inerpreted as being global relative in direction unless the global_relative parameter is set to False.

The vehicle will maintain this velocity for duration seconds, if duration is provided, otherwise it will automatically maintain the specified velocity until another command is sent.

Expand source code
async def set_velocity(self,
        velocity_vector: util.VectorNED,
        global_relative: bool=True,
        duration: float=None):
    """
    Set a drone's velocity that it will use for `duration` seconds.

    The velocity vector provided will be inerpreted as being global
    relative *in direction* unless the `global_relative` parameter is set
    to False.

    The vehicle will maintain this velocity for `duration` seconds, if
    `duration` is provided, otherwise it will automatically maintain the
    specified velocity until another command is sent.
    """
    raise Exception("set_velocity not implemented")