Module aerpawlib.runner
Collection of execution frameworks that can be extended to make scripts
runnable using aerpawlib. The most basic framework is Runner
– any custom
frameworks must extend it to be executable.
Expand source code
"""
Collection of execution frameworks that can be extended to make scripts
runnable using aerpawlib. The most basic framework is `Runner` -- any custom
frameworks *must* extend it to be executable.
"""
import asyncio
from enum import Enum, auto
import inspect
from typing import Callable, Dict, List
import zmq
import zmq.asyncio
from .vehicle import Vehicle
from .zmqutil import *
class Runner:
"""
Base execution framework -- generally, this should only be used to build
new runners. Only exposes the base functions needed to interface with
aerpawlib's runner.
"""
async def run(self, _: Vehicle):
"""
Run the script that's been loaded in -- this is what implements the
core logic of whatever's being run. The `aerpawlib.vehicle.Vehicle`
passed in will have been initialized externally by the launch script.
For other runners provided with aerpawlib (ex: `StateMachine`,
`BasicRunner`), you should avoid overriding this.
"""
pass
def initialize_args(self, _: List[str]):
"""
Can be overridden to parse and handle extra command line arguments
passed to the runner. The easiest way to interact with them is through
`argparse` -- check out `examples/squareoff_logging.py` for an example
of this.
This is expected to be overridden by the user script if it is used.
"""
pass
def cleanup(self):
"""
Any additional user-provided functionality that is called when the
script exits should be implemented here.
This is expected to be overridden by the user script if it is used.
"""
pass
_Runnable = Callable[[Runner, Vehicle], str]
def entrypoint(func):
"""
Decorator used to identify the entry point used by `BasicRunner` driven
scripts.
The function decorated by this is expected to be `async`
"""
func._entrypoint = True
return func
class BasicRunner(Runner):
"""
BasicRunners have a single entry point (specified by `entrypoint`) that is
executed when the script is run. The function provided can be anything, as
it will be run in parallel to background services used by aerpawlib.
For an example of a minimum viable `BasicRunner`, check out
`examples/basic_runner.py`
"""
def _build(self):
for _, method in inspect.getmembers(self):
if not inspect.ismethod(method):
continue
if hasattr(method, "_entrypoint"):
self._entry = method
async def run(self, vehicle: Vehicle):
self._build()
if hasattr(self, "_entry"):
await self._entry.__func__(self, vehicle)
else:
raise Exception("No @entrypoint declared")
class _StateType(Enum):
STANDARD = auto()
TIMED = auto()
class _State:
_name: str
_func: _Runnable
def __init__(self, func: _Runnable, name: str):
self._name = name
self._func = func
async def run(self, runner: Runner, vehicle: Vehicle) -> str:
if self._func._state_type == _StateType.STANDARD:
return await self._func.__func__(runner, vehicle)
elif self._func._state_type == _StateType.TIMED:
running = True
async def _bg():
nonlocal running
last_state = ""
while running:
last_state = await self._func.__func__(runner, vehicle)
if not self._func._state_loop:
running = False
await asyncio.sleep(_STATE_DELAY)
return last_state
r = asyncio.ensure_future(_bg())
# order here is important and stops a race condition
await asyncio.sleep(self._func._state_duration)
running = False
next_state = await r
return next_state
return ""
def state(name: str, first: bool=False):
"""
Decorator used to specify a state used by a `StateMachine`. Functions
decorated using this are expected to be given a name and return a string
that is the name of the next state to transition to/run. If no next state
is returned, then the `StateMachine` will (gracefully) stop.
The function decorated by this is expected to be `async`
"""
if name == "":
raise Exception("state name can't be \"\"")
def decorator(func):
func._is_state = True
func._state_name = name
func._state_first = first
func._state_type = _StateType.STANDARD
return func
return decorator
def timed_state(name: str, duration: float, loop=False, first: bool=False):
"""
Timed state. Will wait `duration` seconds before transitioning to the next
state. If `loop` is true, the decorated function will be continuously
called until the duration has expired.
This guarentees that the state will persist for *at least* `duration`
seconds, so timed states will continue running for longer than that if the
decorated function lasts longer than the timer.
The function decorated by this is expected to be `async`
"""
def decorator(func):
func._is_state = True
func._state_name = name
func._state_first = first
func._state_type = _StateType.TIMED
func._state_duration = duration
func._state_loop = loop
return func
return decorator
# TODO do something w/ this? why not just export all states perhaps
def expose_zmq(name: str):
if name == "":
raise Exception("state must be exported with some binding")
def decorator(func):
func._is_exposed_zmq = True
func._zmq_name = name
return func
return decorator
def expose_field_zmq(name: str):
"""
Make a field requestable by querying it through zmq. The function decorated
by this will be called and the return value will be sent.
"""
# expose a function as a thing that can be called to get a field w/ zmq
if name == "":
raise Exception("field must be exposed with some name")
def decorator(func):
func._is_exposed_field_zmq = True
func._zmq_name = name
return func
return decorator
_BackgroundTask = Callable[[Runner, Vehicle], None]
def background(func):
"""
Designate a function to be run in parallel to a given StateMachine.
By `asyncio`'s design, background function's don't have to take
thread-safety into consideration.
The function decorated by this is expected to be `async`
When using @background, make sure to incorporate some kind of delay.
Without any delay, an @background thread can eat all the python
instance's CPU
"""
func._is_background = True
return func
_InitializationTask = Callable[[Runner, Vehicle], None]
def at_init(func):
"""
Designate a function to be run at vehicle initialization and before vehicle
arming (or at least the prompt to arm). The vehicle will not be armable
until after all `at_init` functions finish executing.
The function designated should have a signature that accepts a Vehicle.
The function decorated by this is expected to be `async`. If there are
multiple functions marked with `at_init`, they will all be run at the same
time.
"""
func._run_at_init = True
return func
_STATE_DELAY = 0.01 # s between each time the state update is called
class StateMachine(Runner):
"""
A `StateMachine` is a type of runner that consists of various states
declared using the `state` decorator. Each state, when run, should return a
string that is the name of the next state to execute. If no next state is
provided, it's assumed that the state machine is done running.
`StateMachine` also supports `background` tasks, which are run in parallel
to any state execution.
For an example of a `StateMachine` in action, check out
`examples/squareoff_logging.py`
"""
_states: Dict[str, _State]
_background_tasks: List[_BackgroundTask]
_initialization_tasks: List[_InitializationTask]
_entrypoint: str
_current_state: str
_override_next_state_transition: bool
_running: bool
def _build(self):
self._states = {}
self._background_tasks = []
self._initialization_tasks = []
for _, method in inspect.getmembers(self):
if not inspect.ismethod(method):
continue
if hasattr(method, "_is_state"):
self._states[method._state_name] = _State(method, method._state_name)
if method._state_first and not hasattr(self, "_entrypoint"):
self._entrypoint = method._state_name
elif method._state_first and hasattr(self, "_entrypoint"):
raise Exception("There may only be one initial state")
if hasattr(method, "_is_background"):
self._background_tasks.append(method)
if hasattr(method, "_run_at_init"):
self._initialization_tasks.append(method)
if not self._entrypoint:
raise Exception("There is no initial state")
async def _start_background_tasks(self, vehicle: Vehicle):
for task in self._background_tasks:
async def _task_runner(t=task):
while self._running:
await t.__func__(self, vehicle)
asyncio.ensure_future(_task_runner())
async def run(self, vehicle: Vehicle, build_before_running=True):
if build_before_running:
self._build()
assert self._entrypoint
self._current_state = self._entrypoint
self._override_next_state_transition = False
self._next_state_overr = ""
self._running = True
if len(self._initialization_tasks) != 0:
await asyncio.wait({f(vehicle) for f in self._initialization_tasks})
print("[aerpawlib] Initialization tasks complete. Waiting for safety pilot to arm")
await vehicle._initialize_postarm()
await self._start_background_tasks(vehicle)
await asyncio.sleep(1) # wait for background tasks to start :p
while self._running:
if self._current_state not in self._states:
print(self._current_state)
print(self._states)
raise Exception("Illegal state")
next_state = await self._states[self._current_state].run(self, vehicle)
if self._override_next_state_transition:
self._override_next_state_transition = False
self._current_state = self._next_state_overr
else:
self._current_state = next_state
if self._current_state is None:
self.stop()
await asyncio.sleep(_STATE_DELAY)
self.cleanup()
def stop(self):
"""
Call `stop` to stop the execution of the `StateMachine` after
completion of the current state. This is equivalent to returning `None`
at the end of a state's execution.
"""
self._running = False
class ZmqStateMachine(StateMachine):
"""
A `ZmqStateMachine` is a more complex state machine that interacts with zmq
in the background. Specifically, it is capable of having its state controlled
via zmq requets, as well as make zmq requests to other state machines through
the network. zmq will NOT work without using this type of runner.
For examples of how to structure programs using this, check out
`examples/zmq_preplanned_orbit` and `examples/zmq_runner`.
"""
_exported_states: Dict[str, _State]
def _build(self):
super()._build()
self._exported_states = {}
self._exported_fields = {}
for _, method in inspect.getmembers(self):
if not inspect.ismethod(method):
continue
if hasattr(method, "_is_exposed_zmq"):
self._exported_states[method._zmq_name] = _State(method, method._zmq_name)
elif hasattr(method, "_is_exposed_field_zmq"):
self._exported_fields[method._zmq_name] = method
_zmq_identifier: str
_zmq_proxy_server: str
def _initialize_zmq_bindings(self, vehicle_identifier: str, proxy_server_addr: str):
self._zmq_identifier = vehicle_identifier
self._zmq_proxy_server = proxy_server_addr
self._zmq_context = zmq.asyncio.Context()
@background
async def _zmg_bg_sub(self, vehicle: Vehicle):
socket = zmq.asyncio.Socket(context=self._zmq_context, io_loop=asyncio.get_event_loop(), socket_type=zmq.SUB)
socket.connect(f"tcp://{self._zmq_proxy_server}:{ZMQ_PROXY_OUT_PORT}")
socket.setsockopt_string(zmq.SUBSCRIBE, "")
self._zmq_messages_handling = asyncio.Queue()
self._zmq_received_fields = {} # indexed by [identifier][field]
print("starting sub")
while self._running:
message = await socket.recv_pyobj()
# await self._zmq_messages_handling.put(message)
if message["identifier"] != self._zmq_identifier:
continue
print(f"recv {message}")
asyncio.ensure_future(self._zmq_handle_request(vehicle, message))
async def _zmq_handle_request(self, vehicle: Vehicle, message):
if message["msg_type"] == ZMQ_TYPE_TRANSITION:
next_state = message["next_state"]
self._next_state_overr = next_state
self._override_next_state_transition = True
elif message["msg_type"] == ZMQ_TYPE_FIELD_REQUEST:
field = message["field"]
return_val = None
if field in self._exported_fields:
return_val = await self._exported_fields[field](vehicle)
await self._reply_queried_field(message["from"], field, return_val)
elif message["msg_type"] == ZMQ_TYPE_FIELD_CALLBACK:
field = message["field"]
value = message["value"]
msg_from = message["from"]
self._zmq_received_fields[msg_from][field] = value
@background
async def _zmq_bg_pub(self, _: Vehicle):
# pub side of things is just sending from a queue
self._zmq_messages_sending = asyncio.Queue()
socket = zmq.asyncio.Socket(context=self._zmq_context, io_loop=asyncio.get_event_loop(), socket_type=zmq.PUB)
socket.connect(f"tcp://{self._zmq_proxy_server}:{ZMQ_PROXY_IN_PORT}")
while self._running:
msg_sending = await self._zmq_messages_sending.get()
await socket.send_pyobj(msg_sending)
async def run(self, vehicle: Vehicle, zmq_proxy=False):
# must build in advance to get zmq stuff up
# avoid late rebuild by parent
self._build()
# if we're the base station, enable zmq_proxying
# TODO
# if zmq_proxy:
# self._background_tasks.append(self._zmq_bg_proxy)
# self._background_tasks.extend([self._zmq_bg_pub, self._zmq_bg_sub])
if None in [self._zmq_identifier, self._zmq_proxy_server]:
raise Exception("initialize_zmq_bindings must be used w/ a zmq runner")
await super().run(vehicle, build_before_running=False)
async def transition_runner(self, identifier: str, state: str):
"""
Use zmq to transition a runner within the network specified by `identifier`.
The state to be transitioned to is specified with `state`
"""
# transition a runner via zmq that is not this one
# exposed to scripts via runner
transition_obj = {
"msg_type": ZMQ_TYPE_TRANSITION,
"from": self._zmq_identifier,
"identifier": identifier,
"next_state": state,
}
await self._zmq_messages_sending.put(transition_obj)
async def query_field(self, identifier: str, field: str):
"""
Query a field from a zmq runner within the network specified by `identifier`.
The field to be queried is given by `field`. The value returned will be returned
by this function, and can be any python object.
"""
# make a runner publish some registered field to the zmq system
# send a request and register a callback var
# when the callback is received, continue this functioen
# TODO None vs nil??
if identifier not in self._zmq_received_fields:
self._zmq_received_fields[identifier] = {}
self._zmq_received_fields[identifier][field] = None
query_obj = {
"msg_type": ZMQ_TYPE_FIELD_REQUEST,
"from": self._zmq_identifier,
"identifier": identifier,
"field": field,
}
await self._zmq_messages_sending.put(query_obj)
while self._zmq_received_fields[identifier][field] == None:
await asyncio.sleep(0.01)
return self._zmq_received_fields[identifier][field]
async def _reply_queried_field(self, identifier: str, field: str, value):
reply_obj = {
"msg_type": ZMQ_TYPE_FIELD_CALLBACK,
"from": self._zmq_identifier,
"identifier": identifier,
"field": field,
"value": value,
}
await self._zmq_messages_sending.put(reply_obj)
# helper functions for working with asyncio code
# this is cheap but lets us write code in a more "researcher friendly" way
in_background = asyncio.ensure_future
sleep = asyncio.sleep
Functions
def at_init(func)
-
Designate a function to be run at vehicle initialization and before vehicle arming (or at least the prompt to arm). The vehicle will not be armable until after all
at_init()
functions finish executing.The function designated should have a signature that accepts a Vehicle.
The function decorated by this is expected to be
async
. If there are multiple functions marked withat_init()
, they will all be run at the same time.Expand source code
def at_init(func): """ Designate a function to be run at vehicle initialization and before vehicle arming (or at least the prompt to arm). The vehicle will not be armable until after all `at_init` functions finish executing. The function designated should have a signature that accepts a Vehicle. The function decorated by this is expected to be `async`. If there are multiple functions marked with `at_init`, they will all be run at the same time. """ func._run_at_init = True return func
def background(func)
-
Designate a function to be run in parallel to a given StateMachine. By
asyncio
's design, background function's don't have to take thread-safety into consideration.The function decorated by this is expected to be
async
When using @background, make sure to incorporate some kind of delay. Without any delay, an @background thread can eat all the python instance's CPU
Expand source code
def background(func): """ Designate a function to be run in parallel to a given StateMachine. By `asyncio`'s design, background function's don't have to take thread-safety into consideration. The function decorated by this is expected to be `async` When using @background, make sure to incorporate some kind of delay. Without any delay, an @background thread can eat all the python instance's CPU """ func._is_background = True return func
def entrypoint(func)
-
Decorator used to identify the entry point used by
BasicRunner
driven scripts.The function decorated by this is expected to be
async
Expand source code
def entrypoint(func): """ Decorator used to identify the entry point used by `BasicRunner` driven scripts. The function decorated by this is expected to be `async` """ func._entrypoint = True return func
def expose_field_zmq(name: str)
-
Make a field requestable by querying it through zmq. The function decorated by this will be called and the return value will be sent.
Expand source code
def expose_field_zmq(name: str): """ Make a field requestable by querying it through zmq. The function decorated by this will be called and the return value will be sent. """ # expose a function as a thing that can be called to get a field w/ zmq if name == "": raise Exception("field must be exposed with some name") def decorator(func): func._is_exposed_field_zmq = True func._zmq_name = name return func return decorator
def expose_zmq(name: str)
-
Expand source code
def expose_zmq(name: str): if name == "": raise Exception("state must be exported with some binding") def decorator(func): func._is_exposed_zmq = True func._zmq_name = name return func return decorator
def state(name: str, first: bool = False)
-
Decorator used to specify a state used by a
StateMachine
. Functions decorated using this are expected to be given a name and return a string that is the name of the next state to transition to/run. If no next state is returned, then theStateMachine
will (gracefully) stop.The function decorated by this is expected to be
async
Expand source code
def state(name: str, first: bool=False): """ Decorator used to specify a state used by a `StateMachine`. Functions decorated using this are expected to be given a name and return a string that is the name of the next state to transition to/run. If no next state is returned, then the `StateMachine` will (gracefully) stop. The function decorated by this is expected to be `async` """ if name == "": raise Exception("state name can't be \"\"") def decorator(func): func._is_state = True func._state_name = name func._state_first = first func._state_type = _StateType.STANDARD return func return decorator
def timed_state(name: str, duration: float, loop=False, first: bool = False)
-
Timed state. Will wait
duration
seconds before transitioning to the next state. Ifloop
is true, the decorated function will be continuously called until the duration has expired.This guarentees that the state will persist for at least
duration
seconds, so timed states will continue running for longer than that if the decorated function lasts longer than the timer.The function decorated by this is expected to be
async
Expand source code
def timed_state(name: str, duration: float, loop=False, first: bool=False): """ Timed state. Will wait `duration` seconds before transitioning to the next state. If `loop` is true, the decorated function will be continuously called until the duration has expired. This guarentees that the state will persist for *at least* `duration` seconds, so timed states will continue running for longer than that if the decorated function lasts longer than the timer. The function decorated by this is expected to be `async` """ def decorator(func): func._is_state = True func._state_name = name func._state_first = first func._state_type = _StateType.TIMED func._state_duration = duration func._state_loop = loop return func return decorator
Classes
class BasicRunner
-
BasicRunners have a single entry point (specified by
entrypoint()
) that is executed when the script is run. The function provided can be anything, as it will be run in parallel to background services used by aerpawlib.For an example of a minimum viable
BasicRunner
, check outexamples/basic_runner.py
Expand source code
class BasicRunner(Runner): """ BasicRunners have a single entry point (specified by `entrypoint`) that is executed when the script is run. The function provided can be anything, as it will be run in parallel to background services used by aerpawlib. For an example of a minimum viable `BasicRunner`, check out `examples/basic_runner.py` """ def _build(self): for _, method in inspect.getmembers(self): if not inspect.ismethod(method): continue if hasattr(method, "_entrypoint"): self._entry = method async def run(self, vehicle: Vehicle): self._build() if hasattr(self, "_entry"): await self._entry.__func__(self, vehicle) else: raise Exception("No @entrypoint declared")
Ancestors
Inherited members
class Runner
-
Base execution framework – generally, this should only be used to build new runners. Only exposes the base functions needed to interface with aerpawlib's runner.
Expand source code
class Runner: """ Base execution framework -- generally, this should only be used to build new runners. Only exposes the base functions needed to interface with aerpawlib's runner. """ async def run(self, _: Vehicle): """ Run the script that's been loaded in -- this is what implements the core logic of whatever's being run. The `aerpawlib.vehicle.Vehicle` passed in will have been initialized externally by the launch script. For other runners provided with aerpawlib (ex: `StateMachine`, `BasicRunner`), you should avoid overriding this. """ pass def initialize_args(self, _: List[str]): """ Can be overridden to parse and handle extra command line arguments passed to the runner. The easiest way to interact with them is through `argparse` -- check out `examples/squareoff_logging.py` for an example of this. This is expected to be overridden by the user script if it is used. """ pass def cleanup(self): """ Any additional user-provided functionality that is called when the script exits should be implemented here. This is expected to be overridden by the user script if it is used. """ pass
Subclasses
Methods
def cleanup(self)
-
Any additional user-provided functionality that is called when the script exits should be implemented here.
This is expected to be overridden by the user script if it is used.
Expand source code
def cleanup(self): """ Any additional user-provided functionality that is called when the script exits should be implemented here. This is expected to be overridden by the user script if it is used. """ pass
def initialize_args(self, _: List[str])
-
Can be overridden to parse and handle extra command line arguments passed to the runner. The easiest way to interact with them is through
argparse
– check outexamples/squareoff_logging.py
for an example of this.This is expected to be overridden by the user script if it is used.
Expand source code
def initialize_args(self, _: List[str]): """ Can be overridden to parse and handle extra command line arguments passed to the runner. The easiest way to interact with them is through `argparse` -- check out `examples/squareoff_logging.py` for an example of this. This is expected to be overridden by the user script if it is used. """ pass
async def run(self, _: Vehicle)
-
Run the script that's been loaded in – this is what implements the core logic of whatever's being run. The
Vehicle
passed in will have been initialized externally by the launch script.For other runners provided with aerpawlib (ex:
StateMachine
,BasicRunner
), you should avoid overriding this.Expand source code
async def run(self, _: Vehicle): """ Run the script that's been loaded in -- this is what implements the core logic of whatever's being run. The `aerpawlib.vehicle.Vehicle` passed in will have been initialized externally by the launch script. For other runners provided with aerpawlib (ex: `StateMachine`, `BasicRunner`), you should avoid overriding this. """ pass
class StateMachine
-
A
StateMachine
is a type of runner that consists of various states declared using thestate()
decorator. Each state, when run, should return a string that is the name of the next state to execute. If no next state is provided, it's assumed that the state machine is done running.StateMachine
also supportsbackground()
tasks, which are run in parallel to any state execution.For an example of a
StateMachine
in action, check outexamples/squareoff_logging.py
Expand source code
class StateMachine(Runner): """ A `StateMachine` is a type of runner that consists of various states declared using the `state` decorator. Each state, when run, should return a string that is the name of the next state to execute. If no next state is provided, it's assumed that the state machine is done running. `StateMachine` also supports `background` tasks, which are run in parallel to any state execution. For an example of a `StateMachine` in action, check out `examples/squareoff_logging.py` """ _states: Dict[str, _State] _background_tasks: List[_BackgroundTask] _initialization_tasks: List[_InitializationTask] _entrypoint: str _current_state: str _override_next_state_transition: bool _running: bool def _build(self): self._states = {} self._background_tasks = [] self._initialization_tasks = [] for _, method in inspect.getmembers(self): if not inspect.ismethod(method): continue if hasattr(method, "_is_state"): self._states[method._state_name] = _State(method, method._state_name) if method._state_first and not hasattr(self, "_entrypoint"): self._entrypoint = method._state_name elif method._state_first and hasattr(self, "_entrypoint"): raise Exception("There may only be one initial state") if hasattr(method, "_is_background"): self._background_tasks.append(method) if hasattr(method, "_run_at_init"): self._initialization_tasks.append(method) if not self._entrypoint: raise Exception("There is no initial state") async def _start_background_tasks(self, vehicle: Vehicle): for task in self._background_tasks: async def _task_runner(t=task): while self._running: await t.__func__(self, vehicle) asyncio.ensure_future(_task_runner()) async def run(self, vehicle: Vehicle, build_before_running=True): if build_before_running: self._build() assert self._entrypoint self._current_state = self._entrypoint self._override_next_state_transition = False self._next_state_overr = "" self._running = True if len(self._initialization_tasks) != 0: await asyncio.wait({f(vehicle) for f in self._initialization_tasks}) print("[aerpawlib] Initialization tasks complete. Waiting for safety pilot to arm") await vehicle._initialize_postarm() await self._start_background_tasks(vehicle) await asyncio.sleep(1) # wait for background tasks to start :p while self._running: if self._current_state not in self._states: print(self._current_state) print(self._states) raise Exception("Illegal state") next_state = await self._states[self._current_state].run(self, vehicle) if self._override_next_state_transition: self._override_next_state_transition = False self._current_state = self._next_state_overr else: self._current_state = next_state if self._current_state is None: self.stop() await asyncio.sleep(_STATE_DELAY) self.cleanup() def stop(self): """ Call `stop` to stop the execution of the `StateMachine` after completion of the current state. This is equivalent to returning `None` at the end of a state's execution. """ self._running = False
Ancestors
Subclasses
Methods
def stop(self)
-
Call
stop
to stop the execution of theStateMachine
after completion of the current state. This is equivalent to returningNone
at the end of a state's execution.Expand source code
def stop(self): """ Call `stop` to stop the execution of the `StateMachine` after completion of the current state. This is equivalent to returning `None` at the end of a state's execution. """ self._running = False
Inherited members
class ZmqStateMachine
-
A
ZmqStateMachine
is a more complex state machine that interacts with zmq in the background. Specifically, it is capable of having its state controlled via zmq requets, as well as make zmq requests to other state machines through the network. zmq will NOT work without using this type of runner.For examples of how to structure programs using this, check out
examples/zmq_preplanned_orbit
andexamples/zmq_runner
.Expand source code
class ZmqStateMachine(StateMachine): """ A `ZmqStateMachine` is a more complex state machine that interacts with zmq in the background. Specifically, it is capable of having its state controlled via zmq requets, as well as make zmq requests to other state machines through the network. zmq will NOT work without using this type of runner. For examples of how to structure programs using this, check out `examples/zmq_preplanned_orbit` and `examples/zmq_runner`. """ _exported_states: Dict[str, _State] def _build(self): super()._build() self._exported_states = {} self._exported_fields = {} for _, method in inspect.getmembers(self): if not inspect.ismethod(method): continue if hasattr(method, "_is_exposed_zmq"): self._exported_states[method._zmq_name] = _State(method, method._zmq_name) elif hasattr(method, "_is_exposed_field_zmq"): self._exported_fields[method._zmq_name] = method _zmq_identifier: str _zmq_proxy_server: str def _initialize_zmq_bindings(self, vehicle_identifier: str, proxy_server_addr: str): self._zmq_identifier = vehicle_identifier self._zmq_proxy_server = proxy_server_addr self._zmq_context = zmq.asyncio.Context() @background async def _zmg_bg_sub(self, vehicle: Vehicle): socket = zmq.asyncio.Socket(context=self._zmq_context, io_loop=asyncio.get_event_loop(), socket_type=zmq.SUB) socket.connect(f"tcp://{self._zmq_proxy_server}:{ZMQ_PROXY_OUT_PORT}") socket.setsockopt_string(zmq.SUBSCRIBE, "") self._zmq_messages_handling = asyncio.Queue() self._zmq_received_fields = {} # indexed by [identifier][field] print("starting sub") while self._running: message = await socket.recv_pyobj() # await self._zmq_messages_handling.put(message) if message["identifier"] != self._zmq_identifier: continue print(f"recv {message}") asyncio.ensure_future(self._zmq_handle_request(vehicle, message)) async def _zmq_handle_request(self, vehicle: Vehicle, message): if message["msg_type"] == ZMQ_TYPE_TRANSITION: next_state = message["next_state"] self._next_state_overr = next_state self._override_next_state_transition = True elif message["msg_type"] == ZMQ_TYPE_FIELD_REQUEST: field = message["field"] return_val = None if field in self._exported_fields: return_val = await self._exported_fields[field](vehicle) await self._reply_queried_field(message["from"], field, return_val) elif message["msg_type"] == ZMQ_TYPE_FIELD_CALLBACK: field = message["field"] value = message["value"] msg_from = message["from"] self._zmq_received_fields[msg_from][field] = value @background async def _zmq_bg_pub(self, _: Vehicle): # pub side of things is just sending from a queue self._zmq_messages_sending = asyncio.Queue() socket = zmq.asyncio.Socket(context=self._zmq_context, io_loop=asyncio.get_event_loop(), socket_type=zmq.PUB) socket.connect(f"tcp://{self._zmq_proxy_server}:{ZMQ_PROXY_IN_PORT}") while self._running: msg_sending = await self._zmq_messages_sending.get() await socket.send_pyobj(msg_sending) async def run(self, vehicle: Vehicle, zmq_proxy=False): # must build in advance to get zmq stuff up # avoid late rebuild by parent self._build() # if we're the base station, enable zmq_proxying # TODO # if zmq_proxy: # self._background_tasks.append(self._zmq_bg_proxy) # self._background_tasks.extend([self._zmq_bg_pub, self._zmq_bg_sub]) if None in [self._zmq_identifier, self._zmq_proxy_server]: raise Exception("initialize_zmq_bindings must be used w/ a zmq runner") await super().run(vehicle, build_before_running=False) async def transition_runner(self, identifier: str, state: str): """ Use zmq to transition a runner within the network specified by `identifier`. The state to be transitioned to is specified with `state` """ # transition a runner via zmq that is not this one # exposed to scripts via runner transition_obj = { "msg_type": ZMQ_TYPE_TRANSITION, "from": self._zmq_identifier, "identifier": identifier, "next_state": state, } await self._zmq_messages_sending.put(transition_obj) async def query_field(self, identifier: str, field: str): """ Query a field from a zmq runner within the network specified by `identifier`. The field to be queried is given by `field`. The value returned will be returned by this function, and can be any python object. """ # make a runner publish some registered field to the zmq system # send a request and register a callback var # when the callback is received, continue this functioen # TODO None vs nil?? if identifier not in self._zmq_received_fields: self._zmq_received_fields[identifier] = {} self._zmq_received_fields[identifier][field] = None query_obj = { "msg_type": ZMQ_TYPE_FIELD_REQUEST, "from": self._zmq_identifier, "identifier": identifier, "field": field, } await self._zmq_messages_sending.put(query_obj) while self._zmq_received_fields[identifier][field] == None: await asyncio.sleep(0.01) return self._zmq_received_fields[identifier][field] async def _reply_queried_field(self, identifier: str, field: str, value): reply_obj = { "msg_type": ZMQ_TYPE_FIELD_CALLBACK, "from": self._zmq_identifier, "identifier": identifier, "field": field, "value": value, } await self._zmq_messages_sending.put(reply_obj)
Ancestors
Methods
async def query_field(self, identifier: str, field: str)
-
Query a field from a zmq runner within the network specified by
identifier
. The field to be queried is given byfield
. The value returned will be returned by this function, and can be any python object.Expand source code
async def query_field(self, identifier: str, field: str): """ Query a field from a zmq runner within the network specified by `identifier`. The field to be queried is given by `field`. The value returned will be returned by this function, and can be any python object. """ # make a runner publish some registered field to the zmq system # send a request and register a callback var # when the callback is received, continue this functioen # TODO None vs nil?? if identifier not in self._zmq_received_fields: self._zmq_received_fields[identifier] = {} self._zmq_received_fields[identifier][field] = None query_obj = { "msg_type": ZMQ_TYPE_FIELD_REQUEST, "from": self._zmq_identifier, "identifier": identifier, "field": field, } await self._zmq_messages_sending.put(query_obj) while self._zmq_received_fields[identifier][field] == None: await asyncio.sleep(0.01) return self._zmq_received_fields[identifier][field]
async def transition_runner(self, identifier: str, state: str)
-
Use zmq to transition a runner within the network specified by
identifier
. The state to be transitioned to is specified withstate()
Expand source code
async def transition_runner(self, identifier: str, state: str): """ Use zmq to transition a runner within the network specified by `identifier`. The state to be transitioned to is specified with `state` """ # transition a runner via zmq that is not this one # exposed to scripts via runner transition_obj = { "msg_type": ZMQ_TYPE_TRANSITION, "from": self._zmq_identifier, "identifier": identifier, "next_state": state, } await self._zmq_messages_sending.put(transition_obj)
Inherited members