"""Mission Interpreter and Executer."""
# Copyright 2025 Universidad Politécnica de Madrid
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
__authors__ = 'Pedro Arias Pérez'
__copyright__ = 'Copyright (c) 2025 Universidad Politécnica de Madrid'
__license__ = 'BSD-3-Clause'
import logging
from threading import Thread
import time
from as2_msgs.msg import BehaviorStatus
from as2_python_api.behavior_actions.behavior_handler import BehaviorHandler
from as2_python_api.drone_interface import DroneInterfaceBase
from as2_python_api.mission_interpreter.mission import InterpreterStatus, Mission
from as2_python_api.mission_interpreter.mission_stack import MissionStack
from rclpy.executors import Executor, SingleThreadedExecutor
logging.basicConfig(level=logging.INFO,
format='[%(levelname)s] [%(asctime)s] [%(name)s]: %(message)s',
datefmt='%s')
[docs]
class MissionInterpreter:
"""Mission Interpreter and Executer."""
def __init__(self, use_sim_time: bool = False, verbose: bool = False,
executor: Executor = SingleThreadedExecutor) -> None:
self._verbose = verbose
self._logger = logging.getLogger('MissionInterpreter')
self._logger.setLevel(logging.DEBUG if verbose else logging.INFO)
self._missions: dict[int, Mission] = {}
self._current_mid: int = None
self._mission_stack: MissionStack = None
self._use_sim_time: bool = use_sim_time
self._executor: Executor = executor
self._drone: DroneInterfaceBase = None
self.exec_thread: Thread = None
self.current_behavior: BehaviorHandler = None
self.stopped: bool = False
self._logger.debug('Mission interpreter ready')
def __del__(self) -> None:
self.shutdown()
[docs]
def shutdown(self) -> None:
"""Shutdown properly."""
self.drone.shutdown()
self.stopped = True
if self.exec_thread:
self.exec_thread.join()
self._logger.info('Shutdown')
@property
def mission(self) -> Mission:
"""Mission."""
return self._missions.get(self._current_mid, None)
@mission.setter
def mission(self, mid: int) -> None:
"""Set mission to be executed by ID."""
mission = self._missions.get(mid, None)
if mission is None:
self._logger.error(f'Mission {mid} not found, please load it first')
return
if self._current_mid != mid:
self._current_mid = mid
self._mission_stack = None
@property
def drone(self) -> DroneInterfaceBase:
"""Build a DroneInterface based on the mission requirements."""
if self.mission is None:
return None
if not self._drone or self._drone.drone_id != self.mission.target:
drone = DroneInterfaceBase(
drone_id=self.mission.target,
verbose=self._verbose,
use_sim_time=self._use_sim_time,
executor=self._executor
)
self._drone = drone
self.load_modules(self.mission)
return self._drone
[docs]
def load_modules(self, mission: Mission):
needed_modules = {item.behavior for item in mission.plan}
for module_name in needed_modules.difference(set(self.drone.modules)):
print(f'module {module_name} loaded')
self.drone.load_module(f'{module_name}_module')
@property
def mission_stack(self) -> MissionStack:
"""Mission stack."""
if self.mission is None:
return None
if self._mission_stack is None:
self._mission_stack = self.mission.stack
return self._mission_stack
@property
def status(self) -> InterpreterStatus:
"""Mission status."""
state = BehaviorStatus.IDLE
if self.mission is None:
return InterpreterStatus()
if self.current_behavior is not None:
state = self.current_behavior.status
return InterpreterStatus(state=state, pending_items=len(self.mission_stack.pending),
done_items=len(self.mission_stack.done),
current_item=self.mission_stack.current,
feedback_current=self.feedback_dict)
@property
def feedback(self):
"""Get current behavior feedback."""
return None if self.current_behavior is None else self.current_behavior.feedback
@property
def feedback_dict(self):
"""Get current behavior feedback dictionary."""
feedback = None if self.current_behavior is None else self.current_behavior.feedback
if feedback is None:
return None
fb_dict = {}
if isinstance(feedback, dict):
fb_dict = feedback
else:
for k, _ in feedback.get_fields_and_field_types().items():
fb_dict[k] = getattr(feedback, k)
return fb_dict
[docs]
def load_mission(self, mid: int, mission: Mission) -> None:
"""Reset Mission Interpreter with other mission."""
self._missions[mid] = mission
if self._current_mid is None:
self.mission = mid
self.drone # Load drone
self.load_modules(mission)
self._logger.info(f'Mission {mid} loaded')
self._logger.info(self._missions)
[docs]
def start_mission(self, mid: int) -> bool:
"""Start mission in different thread."""
if self.exec_thread:
self._logger.warning('Mission being performed, start not allowed')
return False
self.mission = mid
self.exec_thread = Thread(target=self.__perform_mission)
self.exec_thread.start()
return True
[docs]
def stop_mission(self, mid: int) -> bool:
"""Stop mission."""
if not self.exec_thread:
self._logger.debug('No mission being executed, already stopped')
return True
if mid != self._current_mid:
self._logger.error('Stop requested for another mission, not the one executing')
return False
self.stopped = True
return self.current_behavior.stop()
[docs]
def next_item(self, mid: int) -> bool:
"""Advance to next item in mission."""
if not self.exec_thread:
self._logger.warning('No mission being executed, next item not allowed')
return False
if mid != self._current_mid:
self._logger.error('Next item requested for another mission, not the one executing')
return False
return self.current_behavior.stop()
[docs]
def pause_mission(self, mid: int) -> bool:
"""Pause mission."""
if not self.exec_thread:
self._logger.warning('No mission being executed, pause not allowed')
return False
if mid != self._current_mid:
self._logger.error('Pause requested for another mission, not the one executing')
return False
return self.current_behavior.pause()
[docs]
def resume_mission(self, mid: int) -> bool:
"""Resume mission."""
if not self.exec_thread:
self._logger.warning('No mission being executed, resume not allowed')
return False
if mid != self._current_mid:
self._logger.error('Resume requested for another mission, not the one executing')
return False
return self.current_behavior.resume(wait_result=False)
[docs]
def modify_current(self) -> bool:
"""Modify current item in mission."""
raise NotImplementedError
[docs]
def append_mission(self, mid: int, mission: Mission) -> None:
"""Insert mission at the end of the stack."""
if mid != self._current_mid:
self._logger.error('Append requested for another mission, not the one executing')
return
self._mission_stack.add(mission.stack)
[docs]
def insert_mission(self, mid: int, mission: Mission) -> None:
"""Insert mission in front of the stack."""
if mid != self._current_mid:
self._logger.error('Insert requested for another mission, not the one executing')
return
self._mission_stack.insert(mission.stack)
def __perform_mission(self) -> None:
"""Perform a mission."""
while self.mission_stack.pending and not self.stopped:
mission_item = self.mission_stack.next_item()
behavior = mission_item.behavior
method = mission_item.method
args = mission_item.args
self.current_behavior = getattr(self.drone, behavior)
current_method = getattr(self.current_behavior, method)
try:
# TODO(pariaspe): check current_method result
res = current_method(**args)
self._logger.debug(f'Behavior {behavior} method {method} result: {res}')
except BehaviorHandler.GoalRejected:
self._logger.error(f'Goal rejected by behavior {behavior}')
break
except BehaviorHandler.BehaviorNotAvailable:
self._logger.error(f'Behavior {behavior} not available')
break
self.mission_stack.next_item() # current done or stopped
self.exec_thread = False
self.current_behavior = None
[docs]
def reset(self, mid: int, mission: Mission) -> None:
"""Reset Mission Interpreter with other mission."""
self.stop_mission(self._current_mid)
if self.exec_thread:
self.exec_thread.join()
self._mission_stack = None
self.exec_thread = None
self.current_behavior = None
self.stopped = False
self._current_mid = None
self.load_mission(mid, mission)
# def abort_mission(self):
# """Abort current mission, and start safety mission."""
# if self._abort_mission is None:
# self._logger.critical(
# 'Abort command received but not abort mission available. ' +
# 'Change to manual control!')
# return
# self.reset(self._abort_mission)
# try:
# self.start_mission()
# except AttributeError:
# self._logger.error('Trying to start mission but no mission is loaded.')
[docs]
def test():
"""
A doctest in a docstring.
>>> test()
test called with height=1.0, speed=2.0 and wait=True
test called with height=98.0, speed=99.0 and wait=True
"""
dummy_mission = """
{
"target": "drone_0",
"plan": [
{
"behavior": "dummy",
"args": {
"arg1": 1.0,
"arg2": 2.0,
"wait": "True"
}
},
{
"behavior": "dummy",
"args": {
"arg2": 98.0,
"arg1": 99.0,
"wait": "False"
}
}
]
}"""
mission = Mission.parse_raw(dummy_mission)
import rclpy
rclpy.init()
interpreter = MissionInterpreter(verbose=True)
interpreter.load_mission(0, mission)
interpreter.start_mission(0)
time.sleep(3)
interpreter.next_item()
interpreter.shutdown()
rclpy.shutdown()
if __name__ == '__main__':
# import doctest
# doctest.testmod()
test()