From f45a195e9b02069051c773970821968463099d38 Mon Sep 17 00:00:00 2001 From: Vivian Zhou Date: Mon, 24 Jun 2024 21:54:12 -0700 Subject: [PATCH 1/2] add microservice document --- en/services/parachute.md | 167 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) create mode 100644 en/services/parachute.md diff --git a/en/services/parachute.md b/en/services/parachute.md new file mode 100644 index 000000000..5a3bae9fe --- /dev/null +++ b/en/services/parachute.md @@ -0,0 +1,167 @@ +# Parachute Protocol + +## Introduction + +The parachute protocol allows MAVLink control over the behavior of a parachute that is integrated or attached to the drone. Along with this, the parachute protocol also publishes status information for developers or users. + + +## MAVLink Parachute Implementations + +These parachutes have built-in MAVLink support: + +- AVSS Parachute for Skydio X10 + +## Message/Enum Summary + +| Message | Description | +| ------------------------------------------------------------------------------------------------------------------ | ------------------------- | +| [PARACHUTE_STATUS](../messages/common.md#PARACHUTE_STATUS) | Current status of the parachute. Recommended to publish this at a regular rate (nominally at 1Hz). +| [MAV_CMD_SET_PARACHUTE_ARM](../messages/common.md#MAV_CMD_SET_PARACHUTE_ARM) | Command to arm/disarm parachute module trigger sources.| + +| Enum Values | Description | +| ------------------------------------------------------------------------------------------------------------------ | ------------------------- | +| [MAV_TYPE_PARACHUTE](../messages/minimal.md#MAV_TYPE_PARACHUTE) | Type of the component (parachute). | +| [MAV_COMP_ID_PARACHUTE](../messages/minimal.md#MAV_COMP_ID_PARACHUTE) | ID of the component (parachute). + + +| Enum | Description | +| ------------------------------------------------------------------------------------------------------------------ | ------------------------- | +| [PARACHUTE_TRIGGER_FLAGS](../messages/common.md#PARACHUTE_TRIGGER_FLAGS) | Parachute trigger sources. | +| [PARACHUTE_DEPLOYMENT_TRIGGER](../messages/common.md#PARACHUTE_DEPLOYMENT_TRIGGER) | Parachute deployment trigger source. | +| [PARACHUTE_SAFETY_FLAGS](../messages/common.md#PARACHUTE_SAFETY_FLAGS) | Parachute module safety-related flags. | +| [PARACHUTE_ERROR_FLAGS](../messages/common.md#PARACHUTE_ERROR_FLAGS) | Fault/health indications. | + +## Implementation and Messages + +### Parachute Connection + +Parachutes are expected to follow the [Heartbeat/Connection Protocol](https://github.com/mavlink/mavlink-devguide/blob/master/en/services/heartbeat.md) and send a constant flow of heartbeats (nominally at 1Hz). Parachutes are identified via their type [MAV_TYPE_PARACHUTE](#MAV_TYPE_PARACHUTE). +Individual parachutes are distinguished via their unique component ID, which by default should be [MAV_COMP_ID_PARACHUTE](#MAV_COMP_ID_PARACHUTE) (though this is not mandated and any ID may be used). + +Once a heartbeat is received, the drone can then send a [MAV_CMD_REQUEST_MESSAGE](https://mavlink.io/en/messages/common.html#MAV_CMD_REQUEST_MESSAGE) command to the parachute to receive information about its status via [PARACHUTE_STATUS](#PARACHUTE_STATUS) or about its component information via [COMPONENT_INFORMATION_BASIC](.../messages/common.md#COMPONENT_INFORMATION_BASIC). + +[MAV_CMD_SET_PARACHUTE_ARM](#MAV_CMD_SET_PARACHUTE_ARM) can also then be sent to configure the arming status of the various possible trigger mechanisms of the parachute. + + +### STATUS +The [PARACHUTE_STATUS](#PARACHUTE_STATUS) message can be requested to receive information about the status of the parachute. This includes information such as uptime, errors, arm status, deployment status, safety status, arm altitiude, and parachute pack date. + +The parameter list can be found below: +| Parameter | Description | +|--------------------------|------------------------------------------------------------------------------------------------------------------------------------------------| +| `time_boot_ms` | Time since system boot up in milliseconds. | +| `error_status` | Bitmask detailing the [PARACHUTE_ERROR_FLAGS](#PARACHUTE_ERROR_FLAGS). | +| `arm_status` | Bitmask detailing the arming status via [PARACHUTE_TRIGGER_FLAGS](#PARACHUTE_TRIGGER_FLAGS). | +| `deployment_status` | Parachute deployment status. | +| `safety_status` | Parachute safety status. | +| `ats_arm_altitude` | Parachute Automatic Trigger System (ATS) auto-arming/disarming altitude in meters. | +| `parachute_packed_date` | Parachute packed date (YYYY-MM-DD) in ASCII characters, 0 terminated. All 0: field not provided. | + + +#### PARACHUTE_ERROR_FLAGS +[PARACHUTE_ERROR_FLAGS](#PARACHUTE_ERROR_FLAGS) is a bitmask that details the various critical errors that may occur within the parachute module or within the communication between the parachute and the drone. This is published in the [PARACHUTE_STATUS](#PARACHUTE_STATUS) message. + +A list of the various errors can be found below: +| Bit | Value | Parameter | Description | +|-----|----------|------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------| +| 0 | 1 | `PARACHUTE_ERROR_FLAGS_BAROMETER_ERROR` | There is an error with the parachute barometer. | +| 1 | 2 | `PARACHUTE_ERROR_FLAGS_IMU_ERROR` | There is an error with the parachute IMU. | +| 2 | 4 | `PARACHUTE_ERROR_FLAGS_RF_CONNECTION_ERROR` | There is an error with the parachute's RF connection that is used for manual control. | +| 3 | 8 | `PARACHUTE_ERROR_FLAGS_LOW_POWER` | Parachute module has low power. | +| 4 | 16 | `PARACHUTE_ERROR_FLAGS_FC_CONNECTION_ERROR` | There is an error with the connection between parachute and flight controller (FC). | +| 5 | 32 | `PARACHUTE_ERROR_FLAGS_EFTS_CONNECTION_ERROR` | There is an error with the connection between parachute and Electrical Flight Termination System (EFTS). | +| 6 | 64 | `PARACHUTE_ERROR_FLAGS_POD_CONNECTION_ERROR` | There is an error with the parachute pod. | +| 7 | 128 | `PARACHUTE_ERROR_FLAGS_EFTS_DIAGNOSE` | Parachute Electrical Flight Termination System (EFTS) diagnosis failed. | +| 8 | 256 | `PARACHUTE_ERROR_FLAGS_CHARGING_FAILED` | Parachute module charging failed. | +| 9 | 512 | `PARACHUTE_ERROR_FLAGS_EXTERNAL_POWER_ERROR` | There is an error with the parachute external power source. | +| 10 | 1024 | `PARACHUTE_ERROR_FLAGS_GS_CONNECTION_ERROR` | There is an error with the connection between parachute and Ground Station (GS). | +| 11 | 2048 | `PARACHUTE_ERROR_FLAGS_GPS_ERROR` | There is an error with the parachute's GPS. | +| 12 | 4096 | `PARACHUTE_ERROR_FLAGS_SUBSYSTEM_CONNECTION_ERROR` | There is an error with the connection between parachute and subsystem (e.g. remote controller, expansion board, etc.). | +| 13 | 8192 | `PARACHUTE_ERROR_FLAGS_SUBSYSTEM_FW_ERROR` | There is an error with the parachute subsystem firmware (e.g. wrong firmware version). | +| 14 | 16384 | `PARACHUTE_ERROR_FLAGS_RESERVED_1` | Reserved for future use. | +| 15 | 32768 | `PARACHUTE_ERROR_FLAGS_RESERVED_2` | Reserved for future use. | +| 16 | 65536 | `PARACHUTE_ERROR_FLAGS_LOGGING_ERROR` | There is an error with the parachute's internal logging system. | +| 17 | 131072 | `PARACHUTE_ERROR_FLAGS_MODULE_RETIRED` | This parachute module is retired (i.e. too many deployments). | +| 18 | 262144 | `PARACHUTE_ERROR_FLAGS_GLOW_WIRE_ERROR` | There is an error with the parachute glow wire. | +| 19 | 524288 | `PARACHUTE_ERROR_FLAGS_OFFBOARD_CONNECTION_ERROR` | There is an error with the MAVLink connection between parachute and offboard computer. | +| 20 | 1048576 | `PARACHUTE_ERROR_FLAGS_IMU_CALIBRATION_ERROR` | Parachute's internal IMU calibration failed. | + + +[PARACHUTE_ERROR_FLAGS_IMU_ERROR](#PARACHUTE_ERROR_FLAGS_IMU_ERROR) is typically used to indicate a hardware failure of the onboard IMU, and [PARACHUTE_ERROR_FLAGS_IMU_CALIBRATION_ERROR](#PARACHUTE_ERROR_FLAGS_IMU_CALIBRATION_ERROR) may indicate that IMU has no calibration or needs re-calibration. + +Depending on the parachute product, not all errors may be applicable. For example, [PARACHUTE_ERROR_FLAGS_LOW_POWER](#PARACHUTE_ERROR_FLAGS_LOW_POWER) would be used if there was an additional power source within the parachute module itself that has its power level monitored. + +#### PARACHUTE_DEPLOYMENT_TRIGGER +[PARACHUTE_DEPLOYMENT_TRIGGER](#PARACHUTE_DEPLOYMENT_TRIGGER) is an enum that details out the deployment status of the parachute. This enum is set to "0" when the parachute has not been deployed. All other enum values indicate that the parachute has deployed and by which source. This is published in the [PARACHUTE_STATUS](#PARACHUTE_STATUS) message. + +A list of the various enum values can be found below: + +| Value | Parameter | Description | +|-------|----------------------------------------------------|-------------------------------------------------------------------------| +| 0 | `PARACHUTE_DEPLOYMENT_TRIGGER_NONE` | None (parachute has not deployed) | +| 1 | `PARACHUTE_DEPLOYMENT_TRIGGER_MANUAL` | Manual | +| 2 | `PARACHUTE_DEPLOYMENT_TRIGGER_ATS` | Automatic trigger system (ATS) | +| 3 | `PARACHUTE_DEPLOYMENT_TRIGGER_DRONE` | Drone | +| 4 | `PARACHUTE_DEPLOYMENT_TRIGGER_MAVLINK` | MAVLink, e.g. from an offboard computer | +| 5 | `PARACHUTE_DEPLOYMENT_TRIGGER_GEOFENCE` | Geofence | +| 6 | `PARACHUTE_DEPLOYMENT_TRIGGER_FTS_PRECHECKING` | Flight Termination System (FTS) pre-checking protocol | + + +#### PARACHUTE_SAFETY_FLAGS +[PARACHUTE_SAFETY_FLAGS](#PARACHUTE_SAFETY_FLAGS) is a bitmask that details the safety status of the parachute. This is published in the [PARACHUTE_STATUS](#PARACHUTE_STATUS) message. + +A breakdown of the bitmask can be found below: + +| Value | Parameter | Description | +|-------|-------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------| +| 1 | `PARACHUTE_SAFETY_FLAGS_GROUND_CLEARED` | The parachute module has cleared a safe distance from the ground for deployment. | +| 2 | `PARACHUTE_SAFETY_FLAGS_ON_GROUND` | The parachute's own sensor has confirmed it is stably on the ground. | +| 4 | `PARACHUTE_SAFETY_FLAGS_GEOFENCE_MISSION_SET` | The parachute module has downloaded geofence mission successfully and can be triggered by geofence source. | + +[PARACHUTE_SAFETY_FLAGS_GROUND_CLEARED](#PARACHUTE_SAFETY_FLAGS_GROUND_CLEARED) flag is raised depending on the parachute and the inputs defined by the developer/user. This is typically used to block deployment until a desired altitiude is reached. + +### MAV_CMD_SET_PARACHUTE_ARM +This command can be used to arm/disarm the various parachute trigger sources. By configuring the arm/disarm setting of the various trigger sources, a user is able to change the behavior of the parachute when it is being used. The operation follows the normal [Command Protocol](https://github.com/mavlink/mavlink-devguide/blob/master/en/services/command.md) rules for command/acknowledgment. + +There are two parameters for this command. The first parameter is used to indicate the arm/disarm setting. Setting a bit to "0" indicates disarm and setting a bit to "1" indicates arm. The second parameter is a bitmask that indicates which trigger source is being configured. The bits of both parameters are mapped to [PARACHUTE_TRIGGER_FLAGS](#PARACHUTE_TRIGGER_FLAGS). The parachute will ignore all bit positions in the first parameter that are set to zero in the second parameter, which allows the user to granularly arm/disarm specific sources individually. + +For example, the user wants to arm the automatic trigger system and flight controller trigger, and the user wants to disarm the manual and geofence trigger. +- The first parameter would be `0bxxx0x110`. + - Bit 0 for `PARACHUTE_TRIGGER_FLAGS_MANUAL` is set to "0" to disarm. + - Bit 1 for `PARACHUTE_TRIGGER_FLAGS_ATS` is set to "1" to arm. + - Bit 2 for `PARACHUTE_TRIGGER_FLAGS_FC` is set to "1" to arm. + - Bit 4 for `PARACHUTE_TRIGGER_FLAGS_GEOFENCE` is set to "0" to disarm. +- The second parameter would be `0b00010111`. + - Since bit 0, 1, 2, and 4 are being configured and no other trigger sources are being configured, all of those bits are set to "1" to indicate that those specific trigger sources need their arm statuses to be updated. + - All other bits, such as bit 3, are ignored. The parachute should not change the arm/disarm status of the trigger source mapped to bit 3. + + +#### PARACHUTE_TRIGGER_FLAGS + +[PARACHUTE_TRIGGER_FLAGS](#PARACHUTE_TRIGGER_FLAGS) is a bitmask that is used in the parameters of the [MAV_CMD_SET_PARACHUTE_ARM](#MAV_CMD_SET_PARACHUTE_ARM) command and is published in the [PARACHUTE_STATUS](#PARACHUTE_STATUS) message. When published in the [PARACHUTE_STATUS](#PARACHUTE_STATUS) message, the bitmask details the various arm/disarm states of all the trigger flags. Please note that the implementation of the auto-arm and auto-disarm conditions are developer-specific. + +A list of the various flags can be found below: + +| Bit| Value | Parameter | Description | +|----|--------|------------------------------------------------|------------------------------------------------------------------------------------------------------------------------| +| 0 | 1 | `PARACHUTE_TRIGGER_FLAGS_MANUAL` | Manual trigger (ground based control via parachute-specific RF channel) | +| 1 | 2 | `PARACHUTE_TRIGGER_FLAGS_ATS` | Automatic trigger system (ATS) | +| 2 | 4 | `PARACHUTE_TRIGGER_FLAGS_FC` | Flight controller trigger (e.g. MAVLink from FC, PWM, DroneCan, RC Control) | +| 3 | 8 | `PARACHUTE_TRIGGER_FLAGS_OFFBOARD` | Offboard computer trigger (via MAVLink) | +| 4 | 16 | `PARACHUTE_TRIGGER_FLAGS_GEOFENCE` | Geofence trigger (by parachute). Parachute uses MAVLink mission protocol to fetch geofence. | +| 5 | 32 | `PARACHUTE_TRIGGER_FLAGS_FTS_PRECHECKING` | FTS (flight termination system) pre-checking protocol trigger | +| 6 | 64 | `PARACHUTE_TRIGGER_FLAGS_ATS_AUTO_ARM` | Auto-arming of parachute automatic trigger system (ATS). This allows a parachute to enable ATS after reaching a desired altitude. | +| 7 | 128 | `PARACHUTE_TRIGGER_FLAGS_ATS_AUTO_DISARM` | Auto-disarming of parachute automatic trigger system (ATS). This allows a parachute to disable ATS after detecting that it is below a desired altitude. | + + +## Test Script + +#### Description + +The test suite `test_parachute.py` runs a script to validate the emulator and can also be used to validate your own parachute module. The script `parachute.py` will emulate a standard parachute module that uses MAVLink v2 UDP connections. + +#### Instructions +1. Run parachute emulator `python3 parachute.py` +2. Run test `MAVLINK20=1 python3 test_parachute.py` (or `MAVLINK20=1 python3 -m unittest -v test_parachute.py` for more verbosity) + +To use the test suite on other parachute modules, set the relevant environment variables `MAVLINK_DIALECT`, `MAVLINK_UDPIN`, and `MAVLINK_UDPOUT`. The format is as follows: `MAVLINK20=1 MAVLINK_DIALECT=dialect MAVLINK_UDPIN=udpin MAVLINK_UDPOUT=udpout python3 test_parachute.py` \ No newline at end of file From 4ba3baf1f019678aba8a8bedbb857e2e150d1f6f Mon Sep 17 00:00:00 2001 From: Amy Chen Date: Mon, 24 Jun 2024 17:19:52 -0700 Subject: [PATCH 2/2] [parachute] Parachute testsuite The script `parachute.py` emulates a parachute module that has MAVLink v2 UDP connections. The script `test_parachute.py` runs a standard test suite and can be used to test the emulator with its default settings. Instructions: 1. Run simple parachute emulator `python3 parachute.py` 2. Run test `MAVLINK20=1 python3 test_parachute.py` (or `MAVLINK20=1 python3 -m unittest -v test_parachute.py` for more verbosity) To use the test suite on other parachute modules, set the relevant env variables `MAVLINK_DIALECT`, `MAVLINK_UDPIN`, and `MAVLINK_UDPOUT`. Example: `MAVLINK20=1 MAVLINK_DIALECT=dialect MAVLINK_UDPIN=udpin MAVLINK_UDPOUT=udpout python3 test_parachute.py` Topic: parachute_testsuite --- assets/services/parachute/parachute.py | 182 ++++++++++++ assets/services/parachute/test_parachute.py | 299 ++++++++++++++++++++ 2 files changed, 481 insertions(+) create mode 100644 assets/services/parachute/parachute.py create mode 100644 assets/services/parachute/test_parachute.py diff --git a/assets/services/parachute/parachute.py b/assets/services/parachute/parachute.py new file mode 100644 index 000000000..8404be33d --- /dev/null +++ b/assets/services/parachute/parachute.py @@ -0,0 +1,182 @@ +import datetime +import os +import threading +import time + +from pymavlink import mavutil +from pymavlink.dialects.v20 import common as mavlink + + +class ParachuteMavlinkUDPConnection: + """Class to handle parachute's MAVLink v2 UDP connections""" + + def __init__(self, udpin_conn: str, udpout_conn: str, dialect: str) -> None: + """Initialize UDP connections""" + # Setting MAVLink version and dialect + os.environ["MAVLINK20"] = "1" + os.environ["MAVLINK_DIALECT"] = dialect + # Setting up UDP connections + self.udpin = mavutil.mavlink_connection(udpin_conn, dialect) + self.udpout = mavutil.mavlink_connection(udpout_conn, dialect) + + def close(self): + """Close UDP connections""" + self.udpin.close() + self.udpout.close() + + +class Parachute: + """Parachute class that uses MAVLink v2 UDP to broadcast heartbeat/status and handle commands""" + + def __init__( + self, + dialect: str, + time_manufacture_s: int, + vendor_name: bytes, + model_name: bytes, + software_version: bytes, + hardware_version: bytes, + serial_number: bytes, + parachute_packed_date: bytes, + ) -> None: + # Initialize parachute info and status + self.info = mavlink.MAVLink_component_information_basic_message( + time_boot_ms=0, + capabilities=0, + time_manufacture_s=time_manufacture_s, + vendor_name=vendor_name, + model_name=model_name, + software_version=software_version, + hardware_version=hardware_version, + serial_number=serial_number, + ) + self.status = mavlink.MAVLink_parachute_status_message( + time_boot_ms=0, + error_status=0, + arm_status=0, + deployment_status=mavlink.PARACHUTE_DEPLOYMENT_TRIGGER_NONE, + safety_status=mavlink.PARACHUTE_SAFETY_FLAGS_GROUND_CLEARED, + ats_arm_altitude=50, + parachute_packed_date=parachute_packed_date, + ) + # Start parachute MAVLink UDP connections + self.mavlink_udp_connection = ParachuteMavlinkUDPConnection( + udpin_conn="udpin:localhost:14540", + udpout_conn="udpout:localhost:14541", + dialect=dialect, + ) + # Start publishing heartbeat at 1Hz + self.heartbeat_thread_event = threading.Event() + heartbeat_thread = threading.Thread( + target=self.stream_heartbeat, + daemon=True, + ) + heartbeat_thread.start() + self.heartbeat_thread_event.set() + # Start publish parachute status at 1Hz + self.parachute_status_thread_event = threading.Event() + parachute_status_thread = threading.Thread( + target=self.stream_parachute_status, + daemon=True, + ) + parachute_status_thread.start() + self.parachute_status_thread_event.set() + + def stream_heartbeat(self) -> None: + """Stream parachute heartbeat at 1Hz""" + while True: + self.heartbeat_thread_event.wait() + self.mavlink_udp_connection.udpout.mav.heartbeat_send( + mavlink.MAV_TYPE_PARACHUTE, mavlink.MAV_AUTOPILOT_INVALID, 0, 0, 0 + ) + time.sleep(1) + + def stream_parachute_status(self) -> None: + """Stream parachute status at 1Hz""" + while True: + self.parachute_status_thread_event.wait() + self.mavlink_udp_connection.udpout.mav.send(self.status) + time.sleep(1) + + def listen_for_commands(self) -> None: + """Listen for MAVLink commands""" + while True: + msg = self.mavlink_udp_connection.udpin.recv_match(type="COMMAND_LONG", blocking=True) + if msg: + self.handle_command_long(msg) + + def handle_command_long(self, msg: mavlink.MAVLink_command_long_message) -> None: + """Handle MAVLink command long messages""" + if msg.command == mavlink.MAV_CMD_REQUEST_MESSAGE: + self.handle_command_request_message(msg) + elif msg.command == mavlink.MAV_CMD_DO_PARACHUTE: + self.handle_command_do_parachute(msg) + elif msg.command == mavlink.MAV_CMD_SET_PARACHUTE_ARM: + self.handle_command_set_parachute_arm(msg) + else: + self.mavlink_udp_connection.udpout.mav.command_ack_send( + msg.command, result=mavlink.MAV_RESULT_UNSUPPORTED + ) + + def handle_command_request_message(self, msg: mavlink.MAVLink_command_long_message) -> None: + """Handle MAVLink request message command""" + if msg.param1 == mavlink.MAVLINK_MSG_ID_PARACHUTE_STATUS: + self.mavlink_udp_connection.udpout.mav.command_ack_send( + msg.command, result=mavlink.MAV_RESULT_ACCEPTED + ) + self.mavlink_udp_connection.udpout.mav.send(self.status) + elif msg.param1 == mavlink.MAVLINK_MSG_ID_COMPONENT_INFORMATION_BASIC: + self.mavlink_udp_connection.udpout.mav.command_ack_send( + msg.command, result=mavlink.MAV_RESULT_ACCEPTED + ) + self.mavlink_udp_connection.udpout.mav.send(self.info) + else: + self.mavlink_udp_connection.udpout.mav.command_ack_send( + msg.command, result=mavlink.MAV_RESULT_DENIED + ) + + def handle_command_do_parachute(self, msg: mavlink.MAVLink_command_long_message) -> None: + """Handle MAVLink do parachute command""" + result = mavlink.MAV_RESULT_DENIED + if msg.param1 == mavlink.PARACHUTE_RELEASE: + if self.status.arm_status & mavlink.PARACHUTE_TRIGGER_FLAGS_FC: + if self.status.safety_status & mavlink.PARACHUTE_SAFETY_FLAGS_GROUND_CLEARED: + result = mavlink.MAV_RESULT_ACCEPTED + self.status.deployment_status = mavlink.PARACHUTE_DEPLOYMENT_TRIGGER_DRONE + else: + result = mavlink.MAV_RESULT_TEMPORARILY_REJECTED + else: + result = mavlink.MAV_RESULT_FAILED + self.mavlink_udp_connection.udpout.mav.command_ack_send(msg.command, result=result) + + def handle_command_set_parachute_arm(self, msg: mavlink.MAVLink_command_long_message) -> None: + """Handle MAVLink set parachute arm command""" + arm_flags = int(msg.param1) + bitmask = int(msg.param2) + valid_parachute_trigger_flags = ((mavlink.PARACHUTE_TRIGGER_FLAGS_ENUM_END - 1) << 1) - 1 + if arm_flags & ~valid_parachute_trigger_flags or bitmask & ~valid_parachute_trigger_flags: + self.mavlink_udp_connection.udpout.mav.command_ack_send( + msg.command, result=mavlink.MAV_RESULT_DENIED + ) + else: + new_arm_status = self.status.arm_status & ~bitmask + new_arm_status |= arm_flags & bitmask + self.status.arm_status = new_arm_status + self.mavlink_udp_connection.udpout.mav.command_ack_send( + msg.command, result=mavlink.MAV_RESULT_ACCEPTED + ) + + +if __name__ == "__main__": + # Create parachute object and start listening for commands + parachute = Parachute( + dialect="common", + time_manufacture_s=int(time.time()), + vendor_name=b"Vendor", + model_name=b"Model", + software_version=b"1.2.3", + hardware_version=b"4.5.6", + serial_number=b"123456", + parachute_packed_date=datetime.date.today().strftime("%Y-%m-%d").encode(), + ) + parachute.listen_for_commands() diff --git a/assets/services/parachute/test_parachute.py b/assets/services/parachute/test_parachute.py new file mode 100644 index 000000000..da2171215 --- /dev/null +++ b/assets/services/parachute/test_parachute.py @@ -0,0 +1,299 @@ +import os +import unittest + +from pymavlink import mavutil +from pymavlink.dialects.v20 import common as mavlink + + +class UnittestMavlinkUDPConnection: + """Class to handle MAVLink UDP connections""" + + def __init__(self): + """Initialize UDP connections for unittests based on environment variables""" + dialect = os.getenv("MAVLINK_DIALECT", "common") + udpin = os.getenv("MAVLINK_UDPIN", "udpin:localhost:14541") + udpout = os.getenv("MAVLINK_UDPOUT", "udpout:localhost:14540") + self.udpin = mavutil.mavlink_connection(udpin, dialect=dialect) + self.udpout = mavutil.mavlink_connection(udpout, dialect=dialect) + + def close(self): + """Close UDP connections""" + self.udpin.close() + self.udpout.close() + + +class ParachuteMavlinkTestCaseBase(unittest.TestCase): + """Base class for parachute MAVLink test cases""" + + MSG_TIMEOUT = 3 + + @classmethod + def setUpClass(cls): + cls.parachute_connection = UnittestMavlinkUDPConnection() + + @classmethod + def tearDownClass(cls): + cls.parachute_connection.close() + + def encode_and_send_mavlink_command_long( + self, command: int, param1: float, param2: float = 0 + ) -> None: + """Encode and send a MAVLink COMMAND_LONG message""" + msg = self.parachute_connection.udpout.mav.command_long_encode( + self.parachute_connection.udpout.target_system, # Target System + self.parachute_connection.udpout.target_component, # Target Component + command, # Command + 0, # Confirmation + param1, # param1 + param2, # param2 + 0, # param3 (unused) + 0, # param4 (unused) + 0, # param5 (unused) + 0, # param6 (unused) + 0, # param7: Response Target - flight-stack default + ) + self.parachute_connection.udpout.mav.send(msg) + + def assert_mavlink_command_ack(self, command: int, result: int): + """Assert that a COMMAND_ACK message is recieved""" + ack = self.parachute_connection.udpin.recv_match( + type="COMMAND_ACK", blocking=True, timeout=self.MSG_TIMEOUT + ) + self.assertIsNotNone(ack, "Did not recieve a COMMAND_ACK message") + self.assertEqual( + ack.command, command, f"Did not recieve a COMMAND_ACK message for {command}" + ) + self.assertEqual(ack.result, result, f"Did not recieve a {result} COMMAND_ACK message") + + def receive_parachute_status_with_assert(self) -> mavlink.MAVLink_parachute_status_message: + """Assert that a PARACHUTE_STATUS message is recieved""" + msg = self.parachute_connection.udpin.recv_match( + type="PARACHUTE_STATUS", blocking=True, timeout=self.MSG_TIMEOUT + ) + self.assertIsNotNone(msg, "Did not recieve a PARACHUTE_STATUS message") + return msg + + +class ParachuteBasicMavlinkMessagesTestCase(ParachuteMavlinkTestCaseBase): + """Test cases for recieving basic MAVLink messages""" + + def test_heartbeat(self): + """Test recieving a MAVLink heartbeat message from parachute""" + msg = self.parachute_connection.udpin.recv_match( + type="HEARTBEAT", blocking=True, timeout=self.MSG_TIMEOUT + ) + self.assertIsNotNone(msg, "Did not recieve a HEARTBEAT message") + self.assertEqual( + msg.type, mavlink.MAV_TYPE_PARACHUTE, "HEARTBEAT message type is not parachute" + ) + + def test_component_information_basic_request(self): + """Test requesting and receiving a parachute information basic message""" + # Request component information basic message + self.encode_and_send_mavlink_command_long( + mavlink.MAV_CMD_REQUEST_MESSAGE, + mavlink.MAVLINK_MSG_ID_COMPONENT_INFORMATION_BASIC, + ) + self.assert_mavlink_command_ack( + mavlink.MAV_CMD_REQUEST_MESSAGE, mavlink.MAV_RESULT_ACCEPTED + ) + msg = self.parachute_connection.udpin.recv_match( + type="COMPONENT_INFORMATION_BASIC", blocking=True, timeout=self.MSG_TIMEOUT + ) + self.assertIsNotNone(msg, "Did not recieve a COMPONENT_INFORMATION_BASIC message") + + def test_parachute_status(self): + """Test recieving a parachute status message""" + self.receive_parachute_status_with_assert() + + +class MavlinkSetParachuteArmTestCase(ParachuteMavlinkTestCaseBase): + """Test cases for arming parachute""" + + def setUp(self): + self.receive_parachute_status_with_assert() + + def tearDown(self): + # Disarm all triggers + self.encode_and_send_mavlink_command_long( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, + 0, + ((mavlink.PARACHUTE_TRIGGER_FLAGS_ENUM_END - 1) << 1) - 1, + ) + self.assert_mavlink_command_ack( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, mavlink.MAV_RESULT_ACCEPTED + ) + msg = self.receive_parachute_status_with_assert() + self.assertFalse(msg.arm_status, f"Parachute arm status not disarmed: {msg}") + + def test_arm_ats(self): + """Test arming parachute with just ATS trigger""" + self.encode_and_send_mavlink_command_long( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, + mavlink.PARACHUTE_TRIGGER_FLAGS_ATS, + mavlink.PARACHUTE_TRIGGER_FLAGS_ATS, + ) + self.assert_mavlink_command_ack( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, mavlink.MAV_RESULT_ACCEPTED + ) + msg = self.receive_parachute_status_with_assert() + self.assertTrue( + msg.arm_status == mavlink.PARACHUTE_TRIGGER_FLAGS_ATS, + "Parachute is not armed with just ATS trigger", + ) + + def test_arm_fc(self): + """Test arming parachute with just FC trigger""" + self.encode_and_send_mavlink_command_long( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, + mavlink.PARACHUTE_TRIGGER_FLAGS_FC, + mavlink.PARACHUTE_TRIGGER_FLAGS_FC, + ) + self.assert_mavlink_command_ack( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, mavlink.MAV_RESULT_ACCEPTED + ) + msg = self.receive_parachute_status_with_assert() + self.assertTrue( + msg.arm_status == mavlink.PARACHUTE_TRIGGER_FLAGS_FC, + "Parachute is not armed with just FC trigger", + ) + + def test_arm_ats_and_fc(self): + """Test arming parachute with both ATS and FC triggers at the same time""" + self.encode_and_send_mavlink_command_long( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, + mavlink.PARACHUTE_TRIGGER_FLAGS_ATS | mavlink.PARACHUTE_TRIGGER_FLAGS_FC, + mavlink.PARACHUTE_TRIGGER_FLAGS_ATS | mavlink.PARACHUTE_TRIGGER_FLAGS_FC, + ) + self.assert_mavlink_command_ack( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, mavlink.MAV_RESULT_ACCEPTED + ) + msg = self.receive_parachute_status_with_assert() + self.assertTrue( + msg.arm_status + == (mavlink.PARACHUTE_TRIGGER_FLAGS_ATS | mavlink.PARACHUTE_TRIGGER_FLAGS_FC), + "Parachute is not armed with both ATS and FC triggers", + ) + + def test_arm_ats_then_fc(self): + """Test arming parachute with ATS trigger and then FC trigger""" + self.encode_and_send_mavlink_command_long( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, + mavlink.PARACHUTE_TRIGGER_FLAGS_ATS, + mavlink.PARACHUTE_TRIGGER_FLAGS_ATS, + ) + self.assert_mavlink_command_ack( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, mavlink.MAV_RESULT_ACCEPTED + ) + msg = self.receive_parachute_status_with_assert() + self.assertTrue( + msg.arm_status == mavlink.PARACHUTE_TRIGGER_FLAGS_ATS, + "Parachute is not armed with just ATS trigger", + ) + self.encode_and_send_mavlink_command_long( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, + mavlink.PARACHUTE_TRIGGER_FLAGS_FC, + mavlink.PARACHUTE_TRIGGER_FLAGS_FC, + ) + self.assert_mavlink_command_ack( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, mavlink.MAV_RESULT_ACCEPTED + ) + msg = self.receive_parachute_status_with_assert() + self.assertTrue( + msg.arm_status + == (mavlink.PARACHUTE_TRIGGER_FLAGS_ATS | mavlink.PARACHUTE_TRIGGER_FLAGS_FC), + "Parachute is not armed with both ATS and FC triggers", + ) + + def test_arm_invalid_upper_bound(self): + """Test arming parachute with invalid upper bound trigger flag""" + self.encode_and_send_mavlink_command_long( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, + mavlink.PARACHUTE_TRIGGER_FLAGS_ENUM_END << 1, + mavlink.PARACHUTE_TRIGGER_FLAGS_ENUM_END << 1, + ) + self.assert_mavlink_command_ack( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, mavlink.MAV_RESULT_DENIED + ) + msg = self.receive_parachute_status_with_assert() + self.assertFalse(msg.arm_status, f"Upper bound trigger flag set in arm status: {msg}") + + +class MavlinkDeployParachuteTestCase(ParachuteMavlinkTestCaseBase): + """Test cases for deploying parachute""" + + def setUp(self): + msg = self.receive_parachute_status_with_assert() + self.assertFalse(msg.arm_status, f"Parachute arm status not disarmed: {msg}") + self.assertFalse(msg.deployment_status, f"Parachute already deployed: {msg}") + + def tearDown(self): + # Disarm all triggers + self.encode_and_send_mavlink_command_long( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, + 0, + ((mavlink.PARACHUTE_TRIGGER_FLAGS_ENUM_END - 1) << 1) - 1, + ) + self.assert_mavlink_command_ack( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, mavlink.MAV_RESULT_ACCEPTED + ) + msg = self.receive_parachute_status_with_assert() + self.assertFalse(msg.arm_status, f"Parachute arm status not disarmed: {msg}") + + def test_deploy_parachute_with_ats(self): + """Test DO_PARACHUTE should not deploy parachute with ATS trigger""" + # Arm just the ATS trigger + self.encode_and_send_mavlink_command_long( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, + mavlink.PARACHUTE_TRIGGER_FLAGS_ATS, + mavlink.PARACHUTE_TRIGGER_FLAGS_ATS, + ) + self.assert_mavlink_command_ack( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, mavlink.MAV_RESULT_ACCEPTED + ) + msg = self.receive_parachute_status_with_assert() + self.assertTrue( + msg.arm_status == mavlink.PARACHUTE_TRIGGER_FLAGS_ATS, + "Parachute is not armed with just ATS trigger", + ) + # Deploy parachute + self.encode_and_send_mavlink_command_long( + mavlink.MAV_CMD_DO_PARACHUTE, + mavlink.PARACHUTE_RELEASE, + ) + self.assert_mavlink_command_ack(mavlink.MAV_CMD_DO_PARACHUTE, mavlink.MAV_RESULT_FAILED) + msg = self.receive_parachute_status_with_assert() + self.assertFalse(msg.deployment_status, f"Parachute deployed: {msg}") + + def test_deploy_parachute_with_fc(self): + """Test DO_PARACHUTE should deploy parachute with FC trigger armed""" + # Arm FC trigger + self.encode_and_send_mavlink_command_long( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, + mavlink.PARACHUTE_TRIGGER_FLAGS_FC, + mavlink.PARACHUTE_TRIGGER_FLAGS_FC, + ) + self.assert_mavlink_command_ack( + mavlink.MAV_CMD_SET_PARACHUTE_ARM, mavlink.MAV_RESULT_ACCEPTED + ) + msg = self.receive_parachute_status_with_assert() + self.assertTrue( + msg.arm_status == mavlink.PARACHUTE_TRIGGER_FLAGS_FC, + "Parachute is not armed with just FC trigger", + ) + # Deploy parachute + self.encode_and_send_mavlink_command_long( + mavlink.MAV_CMD_DO_PARACHUTE, + mavlink.PARACHUTE_RELEASE, + ) + self.assert_mavlink_command_ack(mavlink.MAV_CMD_DO_PARACHUTE, mavlink.MAV_RESULT_ACCEPTED) + msg = self.receive_parachute_status_with_assert() + self.assertEqual( + msg.deployment_status, + mavlink.PARACHUTE_DEPLOYMENT_TRIGGER_DRONE, + f"Parachute did not deployed: {msg}", + ) + + +if __name__ == "__main__": + unittest.main()