Source code for as2_python_api.mission_interpreter.mission

"""Mission message definitions."""

from __future__ import annotations

# Copyright 2025 Universidad Politécnica de Madrid
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#    * Redistributions of source code must retain the above copyright
#      notice, this list of conditions and the following disclaimer.
#
#    * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#
#    * Neither the name of the the copyright holder nor the names of its
#      contributors may be used to endorse or promote products derived from
#      this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.


__authors__ = 'Pedro Arias Pérez'
__copyright__ = 'Copyright (c) 2025 Universidad Politécnica de Madrid'
__license__ = 'BSD-3-Clause'

from enum import IntEnum
import inspect

from typing import Any, List
try:
    from pydantic.v1 import BaseModel
except ModuleNotFoundError:
    from pydantic import BaseModel

from as2_msgs.msg import BehaviorStatus
from as2_python_api.mission_interpreter.mission_stack import MissionStack
from as2_python_api.tools.utils import get_module_call_signature


[docs] class InterpreterState(IntEnum): """Interpreter state.""" IDLE = BehaviorStatus.IDLE RUNNING = BehaviorStatus.RUNNING PAUSED = BehaviorStatus.PAUSED
[docs] class MissionItem(BaseModel): """Mission Item data model. It represents a behavior call.""" behavior: str method: str = '__call__' args: dict def __str__(self): return f'{self.behavior}: {self.method}: {self.args}' @property def args_extended(self) -> List: """Check if module exist and return full list of arguments, default.""" signature = get_module_call_signature(self.behavior) args = [] for param in signature.parameters: try: # if param found in mission, append it args.append(self.args[param]) except KeyError as exc: # if param not found in mission if param == 'self': pass elif signature.parameters[param].default != inspect.Parameter.empty: # append default args.append(signature.parameters[param].default) else: raise exc return args
[docs] class Mission(BaseModel): """Mission data model.""" target: str plan: List[MissionItem] = [] @property def stack(self) -> MissionStack: """ Return mission stack. :raises exc: if behavior arg doesn't exist :rtype: MissionStack """ return MissionStack(mission_stack=self.plan) def __str__(self): return self.json()
[docs] class InterpreterStatus(BaseModel): """Mission status.""" state: InterpreterState = BehaviorStatus.IDLE pending_items: int = 0 done_items: int = 0 current_item: MissionItem = None feedback_current: Any = None @property def total_items(self) -> int: """Total amount of items in mission, done + current + pending.""" count_current = 1 if self.current_item is None: count_current = 0 return self.done_items + count_current + self.pending_items def __str__(self): count_current = 1 if self.current_item is None: count_current = 0 s = f'[{self.state}] [{self.done_items+count_current}/{self.total_items}] ' + \ f'{self.current_item}' return s def __eq__(self, other): """Override the default implementation, check all attributes except feedback.""" if isinstance(other, InterpreterStatus): return self.state == other.state and self.pending_items == other.pending_items \ and self.done_items == other.done_items and self.current_item == other.current_item return False
if __name__ == '__main__': import unittest class TestMission(unittest.TestCase): """Mission testing.""" def test_mission_model(self): """Two test dummy mission.""" dummy_mission = """ { "target": "drone_0", "plan": [ { "behavior": "dummy", "args": { "arg1": 1.0, "arg2": 2.0, "wait": "False" } }, { "behavior": "dummy", "args": { "arg2": 98.0, "arg1": 99.0, "wait": "False" } } ] }""" item0 = MissionItem(behavior='dummy', args={'arg1': 1.0, 'arg2': 2.0, 'wait': 'False'}) item1 = MissionItem(behavior='dummy', args={'arg1': 99.0, 'arg2': 98.0, 'wait': 'False'}) other_mission = Mission(target='drone_0', plan=[item0, item1]) self.assertEqual(Mission.parse_raw(dummy_mission), other_mission) class TestInterpreterStatus(unittest.TestCase): """Interpreter Status testing.""" # TODO: WIP test def _test_status(self): """Test dummy status.""" status = InterpreterStatus(state='RUNNING', current_item='go_to', feedback_current={ 'actual_speed': 2.983, 'actual_distance_to_goal': 4.563}, done_items=1, pending_items=1) print(status) print(status.json()) unittest.main()