Source code for as2_python_api.mission_interpreter.ros2_adapter

"""Mission interpreter ROS 2 adapter."""

# Copyright 2024 Universidad Politécnica de Madrid
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#    * Redistributions of source code must retain the above copyright
#      notice, this list of conditions and the following disclaimer.
#
#    * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#
#    * Neither the name of the the copyright holder nor the names of its
#      contributors may be used to endorse or promote products derived from
#      this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.


__authors__ = 'Pedro Arias Pérez'
__copyright__ = 'Copyright (c) 2024 Universidad Politécnica de Madrid'
__license__ = 'BSD-3-Clause'

import argparse

from as2_msgs.msg import MissionUpdate
from as2_python_api.mission_interpreter.mission import Mission
from as2_python_api.mission_interpreter.mission_interpreter import MissionInterpreter
import rclpy
from rclpy.executors import Executor, MultiThreadedExecutor, SingleThreadedExecutor
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.qos import qos_profile_system_default, QoSHistoryPolicy, QoSProfile, \
    QoSReliabilityPolicy
from std_msgs.msg import String


[docs] class Adapter(Node): """ROS 2 Adapter to mission interpreter.""" def __init__(self, drone_id: str, timer_freq: float, use_sim_time: bool = False, add_namespace: bool = False, executor: Executor = SingleThreadedExecutor): super().__init__('adapter', namespace=drone_id) self.param_use_sim_time = Parameter('use_sim_time', Parameter.Type.BOOL, use_sim_time) self.set_parameters([self.param_use_sim_time]) self.namespace = drone_id self.interpreter = MissionInterpreter(use_sim_time=use_sim_time, executor=executor) self.last_mid: int = None qos_profile = QoSProfile( reliability=QoSReliabilityPolicy.RELIABLE, history=QoSHistoryPolicy.KEEP_LAST, depth=1 ) topic_prefix = '' if add_namespace else '/' self.mission_update_sub = self.create_subscription( MissionUpdate, topic_prefix + 'mission_update', self.mission_update_callback, qos_profile_system_default) self.mission_status_pub = self.create_publisher( String, topic_prefix + 'mission_status', qos_profile) self.mission_state_timer = self.create_timer( 1 / timer_freq, self.status_timer_callback) self.get_logger().info('Mission Interpreter Adapter ready')
[docs] def status_timer_callback(self): """Publish new mission status.""" msg = String() try: msg.data = self.interpreter.status.json() except TypeError as e: self.get_logger().warn(f'Failed to deserialize status: {e}') else: self.mission_status_pub.publish(msg)
[docs] def mission_update_callback(self, msg: MissionUpdate): """Mission update callback.""" if msg.drone_id != self.namespace: self.get_logger().info( f'Received mission update for {msg.drone_id} but I am {self.namespace}') return if msg.action == MissionUpdate.EXECUTE: self.execute_callback(msg.mission_id, Mission.parse_raw(msg.mission)) elif msg.action == MissionUpdate.LOAD: mission = Mission.parse_raw(msg.mission) self.interpreter.load_mission(msg.mission_id, mission) # Send updated status self.status_timer_callback() self.get_logger().info(f'Mission: {msg.mission_id} loaded.') self.get_logger().info(f'Mission: {mission}') self.last_mid = msg.mission_id elif msg.action == MissionUpdate.START: self.start_callback(msg.mission_id) elif msg.action == MissionUpdate.PAUSE: self.interpreter.pause_mission(msg.mission_id) elif msg.action == MissionUpdate.RESUME: self.interpreter.resume_mission(msg.mission_id) elif msg.action == MissionUpdate.STOP: self.interpreter.stop_mission(msg.mission_id) self.interpreter.reset(msg.mission_id, self.interpreter._missions[msg.mission_id]) elif msg.action == MissionUpdate.NEXT_ITEM: self.interpreter.next_item(msg.mission_id) else: self.get_logger().error(f'Unimplemented action: {msg.action}')
[docs] def execute_callback(self, mid: int, mission: Mission): """Load and start mission.""" self.interpreter.reset(mid, mission) self.start_callback(mid)
[docs] def start_callback(self, mid: int): """Start mission on interpreter.""" try: # TODO: where to arm and offboard? Avoid calling interpreter.drone property directly if not self.interpreter._drone.info['armed']: self.interpreter._drone.arm() if not self.interpreter._drone.info['offboard']: self.interpreter._drone.offboard() self.get_logger().info(f'Starting mission: {mid}') self.interpreter.start_mission(mid) except AttributeError: self.get_logger().error('Trying to start mission but no mission is loaded.')
[docs] def main(): """Run node.""" parser = argparse.ArgumentParser() parser.add_argument('--n', type=str, default='drone0', help='Namespace') parser.add_argument('--timer_freq', type=float, default=0.5, help='Status timer frequency') parser.add_argument('--use_sim_time', action='store_true', help='Use sim time') parser.add_argument( '--add_namespace', action='store_true', help='Add namespace to topics') parser.add_argument( '--use_multi_threaded_executor', action='store_true', help='Use MultiThreadedExecutor in' + ' Drone Interface') argument_parser = parser.parse_args() rclpy.init() if argument_parser.use_multi_threaded_executor: executor_class = MultiThreadedExecutor else: executor_class = SingleThreadedExecutor adapter = Adapter( drone_id=argument_parser.n, timer_freq=argument_parser.timer_freq, use_sim_time=argument_parser.use_sim_time, add_namespace=argument_parser.add_namespace, executor=executor_class) rclpy.spin(adapter) adapter.destroy_node() rclpy.shutdown()
if __name__ == '__main__': main()