"""Behavior handler. Abstract class to handle behaviors."""
# Copyright 2022 Universidad Politécnica de Madrid
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
__authors__ = 'Miguel Fernández Cortizas, Pedro Arias Pérez, David Pérez Saura, Rafael Pérez Seguí'
__copyright__ = 'Copyright (c) 2022 Universidad Politécnica de Madrid'
__license__ = 'BSD-3-Clause'
import abc
from time import sleep
from action_msgs.msg import GoalStatus
from as2_msgs.msg import BehaviorStatus
from rclpy.action import ActionClient
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_srvs.srv import Trigger
[docs]
class BehaviorHandler(abc.ABC):
"""Behavior handler."""
TIMEOUT = 1 # seconds
[docs]
class BehaviorNotAvailable(Exception):
"""Behavior not available exception."""
[docs]
class GoalRejected(Exception):
"""Goal rejected exception."""
[docs]
class ResultUnknown(Exception):
"""Result unknown exception."""
def __init__(self, node: 'Node', action_msg, behavior_name) -> None:
self._node = node
self.__status = BehaviorStatus.IDLE
self.__feedback = None
self.__result = None
self.__action_client = ActionClient(node, action_msg, behavior_name)
self.__pause_client = self._node.create_client(
Trigger, behavior_name + '/_behavior/pause')
self.__resume_client = self._node.create_client(
Trigger, behavior_name + '/_behavior/resume')
self.__stop_client = self._node.create_client(
Trigger, behavior_name + '/_behavior/stop')
self.__status_sub = self._node.create_subscription(
BehaviorStatus, behavior_name + '/_behavior/behavior_status',
self.__status_callback, QoSProfile(depth=1))
# Wait for Action and Servers availability
if not self.__action_client.wait_for_server(timeout_sec=self.TIMEOUT) or \
not self.__pause_client.wait_for_service(timeout_sec=self.TIMEOUT) or \
not self.__resume_client.wait_for_service(timeout_sec=self.TIMEOUT) or \
not self.__stop_client.wait_for_service(timeout_sec=self.TIMEOUT):
raise self.BehaviorNotAvailable(f'{behavior_name} Not Available')
[docs]
def destroy(self) -> None:
"""Clean exit."""
self._node.destroy_subscription(self.__status_sub)
self._node.destroy_client(self.__resume_client)
self._node.destroy_client(self.__pause_client)
self.__action_client.destroy()
@property
def status(self) -> int:
"""
Behavior internal status.
:return: IDLE, PAUSED, RUNNING
:rtype: int
"""
return self.__status
@property
def feedback(self):
"""
Behavior feedback.
:return: rclpy.Feedback
"""
return self.__feedback
@property
def result_status(self):
"""
Behavior result status.
:return: rclpy.GoalStatus
"""
return self.__result.status
@property
def result(self):
"""
Behavior result.
:raises self.ResultUnknown: on result not ready
:return: rclpy.Result
"""
if self.result_status not in [GoalStatus.STATUS_SUCCEEDED, GoalStatus.STATUS_CANCELED]:
raise self.ResultUnknown('Result not received yet')
return self.__result.result
[docs]
def is_running(self) -> bool:
"""
Check if behavior is running.
:return: running or not
"""
return self.__status == BehaviorStatus.RUNNING
[docs]
def start(self, goal_msg, wait_result: bool = True) -> bool:
"""
Start behavior.
:param goal_msg: behavior goal
:type goal_msg: Goal
:param wait_result: wait to behavior end, defaults to True
:type wait_result: bool, optional
:raises self.GoalRejected: on goal rejection
:return: succeeded or not
:rtype: bool
"""
# Sending goal
send_goal_future = self.__action_client.send_goal_async(
goal_msg, feedback_callback=self.__feedback_callback)
# Waiting to sending goal result
while not send_goal_future.done():
sleep(0.1)
# Check if goal is accepted
self.__goal_handle = send_goal_future.result()
if not self.__goal_handle.accepted:
raise self.GoalRejected('Goal Rejected')
# Modify status
self.__status = BehaviorStatus.RUNNING
if wait_result:
return self.wait_to_result()
return True
# TODO
[docs]
def modify(self, goal_msg):
raise NotImplementedError
[docs]
def pause(self) -> bool:
"""
Pause current behavior.
:return: pause succeed or not
:rtype: bool
"""
# TODO: extend to all behavior status
if self.status != BehaviorStatus.RUNNING:
return True
response = self.__pause_client.call(Trigger.Request())
if response.success:
self.__status = BehaviorStatus.PAUSED
return response.success
[docs]
def resume(self, wait_result: bool = True) -> bool:
"""
Continue with current behavior.
:param wait_result: wait to behavior end, defaults to True
:type wait_result: bool, optional
:return: resume succeed or not
:rtype: bool
"""
# TODO: extend to all behavior status
if self.status != BehaviorStatus.PAUSED:
return True
response = self.__resume_client.call(Trigger.Request())
if response.success:
self.__status = BehaviorStatus.RUNNING
if wait_result:
return self.wait_to_result()
return response.success
[docs]
def stop(self) -> bool:
"""
Stop current behavior.
:return: stop succeed or not
:rtype: bool
"""
if self.status == BehaviorStatus.IDLE:
return True
response = self.__stop_client.call(Trigger.Request())
if response.success:
self.__status = BehaviorStatus.IDLE
return response.success
[docs]
def wait_to_result(self) -> bool:
"""
Wait to inner action to finish.
:raises GoalFailed: When behavior result not succeeded
:return: succeeded or not
:rtype: bool
"""
# Getting result
result_future = self.__goal_handle.get_result_async()
while not result_future.done():
sleep(0.1)
# Check action result
self.__result = result_future.result()
if self.result_status != GoalStatus.STATUS_SUCCEEDED:
self._node.get_logger().debug(
f'Goal failed with status code: {self.result_status}')
return False
self._node.get_logger().debug(f'Result: {self.result}')
return True
def __feedback_callback(self, feedback_msg) -> None:
"""Feedback callback."""
self.__feedback = feedback_msg.feedback
self._node.get_logger().debug(
f'Received feedback: {feedback_msg.feedback}')
def __status_callback(self, status_msg: BehaviorStatus) -> None:
"""Behavior status callback."""
self.__status = status_msg.status