From 6284a28aaad8aacdf6c916a3861128606fa315f9 Mon Sep 17 00:00:00 2001 From: Xiyu Oh Date: Tue, 24 Dec 2024 14:03:07 +0800 Subject: [PATCH] Enable multiple signal config of each type Signed-off-by: Xiyu Oh --- configs/mir_config.yaml | 22 +- .../rmf_wait_until.py | 407 +++++++++--------- .../dispatch_multistop.py | 30 +- 3 files changed, 246 insertions(+), 213 deletions(-) diff --git a/configs/mir_config.yaml b/configs/mir_config.yaml index dd2a274..d841b3a 100644 --- a/configs/mir_config.yaml +++ b/configs/mir_config.yaml @@ -116,14 +116,26 @@ plugins: actions: ["wait_until"] timeout: 60 # Optional, in seconds. Timeout if there is no move-off signal received, after which robot will move off regardless. Default to 60 seconds. update_gap: 30 # Optional, in seconds. The fleet adapter will log an update using this interval. Default to 30 seconds. - signal_type: # Defines the default configs for various move off signal types. If this field is not provided, the robot will wait the full duration of the default timeout. Signal configs can be overridden by task description. + signals: # Configure various move off signal types. If this field is not provided in the fleet config or task description, the robot will wait the full duration of the default timeout. # Currently three signal types are supported: [mission, plc, custom]. - mission: + sample_mission_1: # Only a unique name + signal_type: "mission" # Specifies the actual type mission_name: "some_mission_name" # If this mission is found on the robot, it will be queued when the action starts, and the robot will move off when the mission is completed. retry_count: 10 # Optional field for signal type "mission". The fleet adapter will re-attempt queueing the mission for this number of tries. resubmit_on_abort: False # Optional field for signal type "mission". Set to True to resubmit mission if the mission gets aborted by the robot. Default to False. - plc: + sample_mission_2: + signal_type: "mission" # Specifies the actual type + mission_name: "some_other_mission_name" + sample_plc_1: + signal_type: "plc" # Specifies the actual type register: 20 # Fill in with PLC register number. Robot moves off when this PLC register returns True - custom: + sample_plc_2: + signal_type: "plc" # Specifies the actual type + register: 21 + custom_1: + signal_type: "custom" # Specifies the actual type module: "fleet_adapter_mir_actions.rmf_move_off_on_alert" # Fill in with path to custom module written. Robot moves off when module provides the user-defined move off signal. - default: "custom" # Specifies a default signal type out of those listed above. If the task description does not provide any signal config, the fleet adapter will use this default signal type. + custom_2: + signal_type: "custom" # Specifies the actual type + module: "fleet_adapter_mir_actions.rmf_move_off_on_something_else" + default_signal: "custom_1" # Specifies a default signal type out of those listed above. If the task description does not provide any signal config, the fleet adapter will use this default signal type. diff --git a/fleet_adapter_mir_actions/fleet_adapter_mir_actions/rmf_wait_until.py b/fleet_adapter_mir_actions/fleet_adapter_mir_actions/rmf_wait_until.py index 63c0b99..deb9c49 100644 --- a/fleet_adapter_mir_actions/fleet_adapter_mir_actions/rmf_wait_until.py +++ b/fleet_adapter_mir_actions/fleet_adapter_mir_actions/rmf_wait_until.py @@ -24,88 +24,77 @@ class ActionFactory(MirActionFactory): def __init__(self, context: ActionContext): MirActionFactory.__init__(self, context) - self.move_off = None + self.custom_modules = {} # Raise error if config file is invalid - if 'signal_type' in context.action_config: - signal_type = context.action_config['signal_type'] - if 'mission' in signal_type: - if 'mission_name' not in signal_type['mission']: + supported_signal_types = {'mission', 'plc', 'custom'} + signals = context.action_config.get('signals') + if signals is not None: + for signal_name, signal_config in signals.items(): + signal_type = signal_config.get('signal_type') + if signal_type is None: raise KeyError( - f'WaitUntil MirAction requires a default mission name ' - f'for signal type [mission], but mission name is not ' - f'provided in the action config! Unable to ' - f'instantiate an ActionFactory.') - if 'retry_count' in signal_type['mission']: - if signal_type['mission']['retry_count'] < 0: - raise ValueError( - f'WaitUntil MirAction takes in a retry count for ' - f'signal type [mission], but retry count provided ' - f'is invalid! Unable to instantiate an ' - f'ActionFactory.') - if 'plc' in signal_type: - plc_register = signal_type['plc'].get('register') - if plc_register is None: - raise KeyError( - f'WaitUntil MirAction requires a default PLC register ' - f'for signal type [plc], but no PLC register was ' - f'provided in the action config! Unable to ' - f'instantiate an ActionFactory.') - elif not isinstance(plc_register, int): - raise TypeError( - f'WaitUntil MirAction requires a default PLC register ' - f'number for signal type [plc], but the value ' - f'provided [{plc_register}] is not an integer! ' - f'Unable to instantiate an ActionFactory.') - # If user provides a custom MoveOff module, import it - if 'custom' in signal_type: - if 'module' not in signal_type['custom']: - raise KeyError( - f'WaitUntil MirAction requires a custom module for ' - f'signal type [custom], but path to module is not ' - f'provided in the action config! Unable to ' - f'instantiate an ActionFactory.') - try: - move_off_module = context.action_config['module'] - move_off_plugin = importlib.import_module(move_off_module) - self.move_off = move_off_plugin.MoveOff(self.context) - except ImportError: - self.context.node.get_logger().warn( - f'Unable to import module {move_off_module}! The ' - f'WaitUntil ActionFactory will be instantiated ' - f'without supporting any user-defined signal module.' + f'WaitUntil MirAction signal config requires a ' + f'defined signal_type, but signal_type is not ' + f'provided for {signal_name}! Unable to instantiate ' + f'WaitUntil MirActionFactory.' ) - supported_signal_types = {'mission', 'plc', 'custom'} - default_signal = signal_type.get('default') - if default_signal is None: - self.context.node.get_logger().warn( - f'WaitUntil ActionFactory instantiated for robot ' - f'[{self.context.name}], but no default signal type has ' - f'been provided in the action config! If no signal type ' - f'is populated in the task description, there will be no ' - f'move off signal available and the robot will wait for ' - f'the full duration of the timeout when this action is ' - f'triggered.' - ) - elif default_signal not in supported_signal_types: + elif signal_type not in supported_signal_types: + raise ValueError( + f'WaitUntil MirAction signal config requires a ' + f'defined signal_type, but signal_type provided ' + f'{signal_name} is not supported! We currently ' + f'support the following signal types: ' + f'{supported_signal_types}. Unable to instantiate ' + f'WaitUntil MirActionFactory.' + ) + + # Pass signal config into verify functions to validate types + # and catch missing values + match signal_type: + case 'mission': + self.verify_mission( + signal_config.get('mission_name'), + signal_config.get('retry_count'), + signal_config.get('resubmit_on_abort') + ) + case 'plc': + self.verify_plc(signal_config.get('register')) + case 'custom': + self.verify_custom_module(signal_config.get('module')) + else: + # If the user did not provide any valid signal config, log a + # warning to remind users to provide signal type config in any task + # description they submit + self.context.node.get_logger().warn( + f'WaitUntil ActionFactory is instantiated for robot ' + f'[{self.context.name}], but no valid signals config has been ' + f'provided in the action config! Any move off signal config ' + f'will have to be provided in the task description submitted ' + f'to RMF.' + ) + return + + # Register default signal type + default_signal = context.action_config.get('default_signal') + if default_signal is not None: + if default_signal not in supported_signal_types: raise ValueError( f'User provided a default signal type {default_signal} ' f'in the action config for WaitUntil MirAction, but the ' f'default signal type is not supported!') - elif default_signal not in signal_type: + elif default_signal not in signals.keys(): raise ValueError( f'User provided a default signal type {default_signal} ' f'in the action config for WaitUntil MirAction, but the ' f'default signal type is not configured!') else: - # If the user did not provide any default signal config, log a - # warning to remind users to provide signal type config in any task - # description they submit self.context.node.get_logger().warn( - f'WaitUntil ActionFactory is instantiated for robot ' - f'[{self.context.name}], but no signal_type config has been ' - f'provided in the action config! Any move off signal config ' - f'will have to be provided in the task description submitted ' - f'to RMF.' + f'WaitUntil ActionFactory instantiated for robot ' + f'[{self.context.name}], but no default signal type has been ' + f'provided in the action config! If no signal type is ' + f'populated in the task description, there will be no move ' + f'off signal available and the robot will wait for the full ' + f'duration of the timeout when this action is triggered.' ) def supports_action(self, category: str) -> bool: @@ -124,7 +113,59 @@ def perform_action( match category: case 'wait_until': return WaitUntil( - description, execution, self.context, self.move_off) + description, execution, self.context, + self.custom_modules) + + def verify_mission( + self, + signal_name: str, + mission_name: str | None, + retry_count: int | None, + resubmit_on_abort: bool | None): + if mission_name is None: + raise KeyError( + f'WaitUntil MirAction requires a mission name for the ' + f'[mission] signal type, but mission name is not provided in ' + f'the action config for {signal_name}! Unable to instantiate ' + f'WaitUntil MirActionFactory.' + ) + if retry_count is not None and retry_count < 0: + self.context.node.warn( + f'WaitUntil MirAction takes in a retry count for the ' + f'[mission] signal type, but the value provided ' + f'[{retry_count}] is invalid! Ignoring provided value and ' + f'using the default retry count of 10 for {signal_name}.' + ) + + def verify_plc(self, signal_name: str, plc_register: int | None): + if plc_register is None: + raise KeyError( + f'WaitUntil MirAction requires a default PLC register for ' + f'signal type [plc], but no PLC register was provided in the ' + f'action config for {signal_name}! Unable to instantiate ' + f'WaitUntil MirActionFactory.' + ) + + def verify_custom_module( + self, + signal_name: str, + module: str | None): + if module is None: + raise KeyError( + f'WaitUntil MirAction requires a custom module for the ' + f'[custom] signal type, but path to module is not provided in ' + f'the action config for {signal_name}! Unable to instantiate ' + f'WaitUntil MirActionFactory.' + ) + try: + move_off_plugin = importlib.import_module(module) + self.custom_modules[signal_name] = \ + move_off_plugin.MoveOff(self.context) + except ImportError: + self.context.node.get_logger().warn( + f'Unable to import module {module}! Unable to instantiate ' + f'WaitUntil MirActionFactory.' + ) class WaitUntil(MirAction): @@ -133,11 +174,11 @@ def __init__( description: dict, execution, context: ActionContext, - move_off + custom_modules: dict ): MirAction.__init__(self, context, execution) - self.move_off_cb = self.create_move_off_cb(description, move_off) + self.move_off_cb = self.create_move_off_cb(description, custom_modules) if self.move_off_cb is None: # Insufficient information provided to configure the check move off # callback, mark action as completed and continue task @@ -157,7 +198,8 @@ def __init__( context.action_config.get('update_gap', 60)) # seconds self.wait_timeout = description.get( 'timeout', context.action_config.get('timeout', 60)) # seconds - self.signal_config = context.action_config.get('signal_type') + self.signal_config = context.action_config.get('signals') + self.default_signal = context.action_config.get('default_signal') self.start_time = self.context.node.get_clock().now().nanoseconds / 1e9 self.context.node.get_logger().info( @@ -209,7 +251,7 @@ def update_action(self): return False - def create_move_off_cb(self, description: dict, move_off): + def create_move_off_cb(self, description: dict, custom_modules: dict): signal_cb = None signal_type = None @@ -218,9 +260,8 @@ def create_move_off_cb(self, description: dict, move_off): # action config. if 'signal_type' in description: signal_type = description['signal_type'] - elif self.signal_config is not None and \ - 'default' in self.signal_config: - signal_type = self.signal_config['default'] + elif self.default_signal is not None: + signal_type = self.default_signal else: # There is no move off signal provided, we will just wait for the # duration of the configured timeout @@ -233,130 +274,40 @@ def create_move_off_cb(self, description: dict, move_off): signal_cb = lambda: False return signal_cb - default_config = self.signal_config.get(signal_type) - task_config = description.get('signal_config') - # Configure the specified move off signal + + signal_config = None + if signal_type in self.signal_config: + # Signal is preconfigured in the fleet action config + signal_config = self.signal_config[signal_type] + signal_type = signal_config['signal_type'] + else: + signal_config = description.get('signal_config') + if signal_config is None: + self.context.node.get_logger().error( + f'The submitted signal type/config is invalid for ' + f'{signal_type}!' + ) + return None + match signal_type: - case "mission": - mission_name = None - resubmit_on_abort = False - retry_count = 10 - - # Check for default mission config values - if default_config is not None: - mission_name = default_config['mission_name'] - if 'resubmit_on_abort' in default_config: - resubmit_on_abort = default_config['resubmit_on_abort'] - if 'retry_count' in default_config: - retry_count = default_config['retry_count'] - - # Override default values with task config only if provided - if task_config is not None: - if 'mission_name' in task_config: - mission_name = task_config['mission_name'] - if 'resubmit_on_abort' in task_config: - resubmit_on_abort = task_config['resubmit_on_abort'] - if 'retry_count' in task_config: - retry_count = task_config['retry_count'] - - # Check if mission config values are valid - if mission_name is None: - self.context.node.get_logger().info( - f'MoveOff signal type [mission] was selected for ' - f'robot [{self.context.name}], but no mission name ' - f'was provided! Please ensure that the required ' - f'fields are provided in the fleet config.' - ) - return None - if mission_name not in self.context.api.known_missions: - self.context.node.get_logger().info( - f'Mission {mission_name} not found on robot ' - f'{self.context.name}!' - ) - return None - mission_actions = \ - self.context.api.missions_mission_id_actions_get( - self.context.api.known_missions[mission_name]['guid'] - ) - if not mission_actions: - self.context.node.get_logger().info( - f'Mission {mission_name} actions not found on robot ' - f'{self.context.name}!' - ) - return None - # Queue the waiting mission for this robot - count = 0 # we should attempt (retry_count + 1) times in total - mission_queue_id = None - while count <= retry_count and not mission_queue_id: - count += 1 - self.context.node.get_logger().info( - f'Queueing mission {mission_name} for robot ' - f'[{self.context.name}]...' - ) - try: - mission_queue_id = \ - self.context.api.queue_mission_by_name( - mission_name) - if mission_queue_id is not None: - break - except Exception as err: - self.context.node.get_logger().info( - f'Failed to queue mission {mission_name}: {err}. ' - f'Retrying...' - ) - time.sleep(1) - if not mission_queue_id: - self.context.node.get_logger().info( - f'Unable to queue mission {mission_name} for robot ' - f'[{self.context.name}]!' - ) - return None - self.context.node.get_logger().info( - f'Mission {mission_name} queued for [{self.context.name}] ' - f'with mission queue id {mission_queue_id}.' - ) - signal_cb = lambda: self.check_mission_complete( - mission_name, mission_queue_id, resubmit_on_abort) - self.context.node.get_logger().info( - f'Configuring robot [{self.context.name}] move off signal' - f': robot will wait until mission {mission_name} with ' - f'mission queue id {mission_queue_id} is completed.' - ) - case "plc": - register = None - # Check for default plc config values - if default_config is not None: - register = default_config['register'] - # Override default values with task config only if provided - if task_config is not None: - if 'register' in task_config: - register = task_config['register'] - - if register is None: - self.context.node.get_logger().info( - f'MoveOff signal type [plc] was selected for ' - f'robot [{self.context.name}], but no PLC register ' - f'was provided! Please ensure that the required ' - f'fields are provided in the fleet config.' - ) - return None - elif not isinstance(register, int): - self.context.node.get_logger().info( - f'MoveOff signal type [plc] was selected for ' - f'robot [{self.context.name}], but a non-integer PLC ' - f'register number was provided! Please ensure that ' - f'the correct data type is provided in the fleet ' - f'config/task description.' - ) - return None - signal_cb = lambda: self.check_plc_register(register) + case 'mission': + mission_name = signal_config['mission_name'] + resubmit_on_abort = signal_config.get( + 'resubmit_on_abort', False) + retry_count = signal_config.get('retry_count', 10) + signal_cb = self.create_mission_move_off_cb( + mission_name, retry_count, resubmit_on_abort) + case 'plc': + register = signal_config['register'] self.context.node.get_logger().info( f'Configuring robot [{self.context.name}] move off ' f'signal: robot will wait until the PLC register ' f'{register} returns True.' ) - case "custom": - if move_off is None: + signal_cb = lambda: self.check_plc_register(register) + case 'custom': + module = custom_modules.get(signal_type) + if module is None: self.context.node.get_logger().info( f'MoveOff signal type [custom] was selected for ' f'robot [{self.context.name}], but no valid move off ' @@ -364,15 +315,13 @@ def create_move_off_cb(self, description: dict, move_off): f'required fields are provided in the fleet config.' ) return None - # Trigger the begin waiting callback - move_off.begin_waiting(description) - # Define the move off callback to be called on every update - signal_cb = lambda: move_off.is_move_off_ready() + module.begin_waiting(description) self.context.node.get_logger().info( f'Configuring robot [{self.context.name}] move off ' f'behavior: robot will wait until the custom move off ' f'behavior signals that the robot is ready to move off.' ) + signal_cb = lambda: module.is_move_off_ready() case _: self.context.node.get_logger().info( f'Invalid move off signal type [{signal_type}] provided, ' @@ -381,6 +330,66 @@ def create_move_off_cb(self, description: dict, move_off): ) return signal_cb + def create_mission_move_off_cb( + self, + mission_name: str, + retry_count: int, + resubmit_on_abort: bool): + if mission_name not in self.context.api.known_missions: + self.context.node.get_logger().info( + f'Mission {mission_name} not found on robot ' + f'{self.context.name}!' + ) + return None + mission_actions = \ + self.context.api.missions_mission_id_actions_get( + self.context.api.known_missions[mission_name]['guid'] + ) + if not mission_actions: + self.context.node.get_logger().info( + f'Mission {mission_name} actions not found on robot ' + f'{self.context.name}!' + ) + return None + # Queue the waiting mission for this robot + count = 0 # we should attempt (retry_count + 1) times in total + mission_queue_id = None + while count <= retry_count and not mission_queue_id: + count += 1 + self.context.node.get_logger().info( + f'Queueing mission {mission_name} for robot ' + f'[{self.context.name}]...' + ) + try: + mission_queue_id = \ + self.context.api.queue_mission_by_name( + mission_name) + if mission_queue_id is not None: + break + except Exception as err: + self.context.node.get_logger().info( + f'Failed to queue mission {mission_name}: {err}. ' + f'Retrying...' + ) + time.sleep(1) + if not mission_queue_id: + self.context.node.get_logger().info( + f'Unable to queue mission {mission_name} for robot ' + f'[{self.context.name}]!' + ) + return None + self.context.node.get_logger().info( + f'Mission {mission_name} queued for [{self.context.name}] ' + f'with mission queue id {mission_queue_id}.' + ) + self.context.node.get_logger().info( + f'Configuring robot [{self.context.name}] move off signal' + f': robot will wait until mission {mission_name} with ' + f'mission queue id {mission_queue_id} is completed.' + ) + return lambda: self.check_mission_complete( + mission_name, mission_queue_id, resubmit_on_abort) + def check_mission_complete( self, mission_name, diff --git a/fleet_adapter_mir_tasks/fleet_adapter_mir_tasks/dispatch_multistop.py b/fleet_adapter_mir_tasks/fleet_adapter_mir_tasks/dispatch_multistop.py index a0d1593..4c260df 100644 --- a/fleet_adapter_mir_tasks/fleet_adapter_mir_tasks/dispatch_multistop.py +++ b/fleet_adapter_mir_tasks/fleet_adapter_mir_tasks/dispatch_multistop.py @@ -52,8 +52,8 @@ def __init__(self, argv=sys.argv): help='Number of seconds between logging updates') parser.add_argument('-s', '--signal_type', required=True, type=str, help='Move off signal type') - parser.add_argument('-m', '--mission_name', required=False, default='', - type=str, help='Mission name') + parser.add_argument('-m', '--mission_name', type=str, + help='Mission name') parser.add_argument('-r', '--resubmit_on_abort', type=bool, help='Resubmit mission if aborted by robot') parser.add_argument('-rc', '--retry_count', required=False, default=-1, @@ -132,22 +132,34 @@ def __init__(self, argv=sys.argv): signal_config = {} match signal_type: case "mission": - if self.args.mission != '': - signal_config['mission_name'] = self.args.mission_name + if self.args.mission_name is None: + raise ValueError( + f'No mission name provided for [mission] signal ' + f'type!' + ) + signal_config['mission_name'] = self.args.mission_name if self.args.resubmit_on_abort is not None: signal_config['resubmit_on_abort'] = \ self.args.resubmit_on_abort if self.args.retry_count > -1: signal_config['retry_count'] = self.args.retry_count case "plc": - if self.args.plc_register is not None: - signal_config['register'] = self.args.plc_register + if self.args.plc_register is None: + raise ValueError( + f'No PLC register provided for [plc] signal type!' + ) + signal_config['register'] = self.args.plc_register case "custom": - pass - case _: raise ValueError( - f'Invalid move off signal type provided!' + f'[custom] signal type is not supported via task ' + f'description! Please provide the path to module in ' + f'the fleet action config.' ) + case _: + # The signal type provided, if valid, points to a + # configured signal type. We pass it to the action for + # validation. + pass # Add wait activity wait_activity = [{ "category": "perform_action",