diff --git a/board/safety/safety_volkswagen_common.h b/board/safety/safety_volkswagen_common.h index ce2bf580fe..eb2938f1b1 100644 --- a/board/safety/safety_volkswagen_common.h +++ b/board/safety/safety_volkswagen_common.h @@ -2,6 +2,7 @@ #define SAFETY_VOLKSWAGEN_COMMON_H const uint16_t FLAG_VOLKSWAGEN_LONG_CONTROL = 1; +const uint16_t FLAG_VW_GAS_INTERCEPTOR = 16; bool volkswagen_longitudinal = false; bool volkswagen_set_button_prev = false; diff --git a/board/safety/safety_volkswagen_pq.h b/board/safety/safety_volkswagen_pq.h index 9e6c59f09f..2a29c2eed2 100644 --- a/board/safety/safety_volkswagen_pq.h +++ b/board/safety/safety_volkswagen_pq.h @@ -19,6 +19,8 @@ const LongitudinalLimits VOLKSWAGEN_PQ_LONG_LIMITS = { .min_accel = -3500, .inactive_accel = 3010, // VW sends one increment above the max range when inactive }; +const int VOLKSWAGEN_GAS_INTERCEPTOR_THRSLD = 475; +#define VOLKSWAGEN_GET_INTERCEPTOR(msg) (((GET_BYTE((msg), 0) << 8) + GET_BYTE((msg), 1) + (GET_BYTE((msg), 2) << 8) + GET_BYTE((msg), 3)) / 2U) // avg between 2 tracks #define MSG_LENKHILFE_3 0x0D0 // RX from EPS, for steering angle and driver steering torque #define MSG_HCA_1 0x0D2 // TX by OP, Heading Control Assist steering torque @@ -30,12 +32,14 @@ const LongitudinalLimits VOLKSWAGEN_PQ_LONG_LIMITS = { #define MSG_MOTOR_5 0x480 // RX from ECU, for ACC main switch state #define MSG_ACC_GRA_ANZEIGE 0x56A // TX by OP, ACC HUD #define MSG_LDW_1 0x5BE // TX by OP, Lane line recognition and text alerts +#define MSG_GAS_1 0x200 // TX by OP, Gas Control +#define MSG_GAS_SENSOR 0x201 // RX by OP, GAS Sensor // Transmit of GRA_Neu is allowed on bus 0 and 2 to keep compatibility with gateway and camera integration const CanMsg VOLKSWAGEN_PQ_STOCK_TX_MSGS[] = {{MSG_HCA_1, 0, 5}, {MSG_LDW_1, 0, 8}, {MSG_GRA_NEU, 0, 4}, {MSG_GRA_NEU, 2, 4}}; const CanMsg VOLKSWAGEN_PQ_LONG_TX_MSGS[] = {{MSG_HCA_1, 0, 5}, {MSG_LDW_1, 0, 8}, - {MSG_ACC_SYSTEM, 0, 8}, {MSG_ACC_GRA_ANZEIGE, 0, 8}}; + {MSG_ACC_SYSTEM, 0, 8}, {MSG_ACC_GRA_ANZEIGE, 0, 8}, {MSG_GAS_1, 0, 6}}; RxCheck volkswagen_pq_rx_checks[] = { {.msg = {{MSG_LENKHILFE_3, 0, 6, .check_checksum = true, .max_counter = 15U, .frequency = 100U}, { 0 }, { 0 }}}, @@ -44,8 +48,10 @@ RxCheck volkswagen_pq_rx_checks[] = { {.msg = {{MSG_MOTOR_3, 0, 8, .check_checksum = false, .max_counter = 0U, .frequency = 100U}, { 0 }, { 0 }}}, {.msg = {{MSG_MOTOR_5, 0, 8, .check_checksum = true, .max_counter = 0U, .frequency = 50U}, { 0 }, { 0 }}}, {.msg = {{MSG_GRA_NEU, 0, 4, .check_checksum = true, .max_counter = 15U, .frequency = 30U}, { 0 }, { 0 }}}, + {.msg = {{MSG_GAS_SENSOR, 0, 6, .check_checksum = false, .max_counter = 15U, .frequency = 50U}, { 0 }, { 0 }}}, }; + static uint32_t volkswagen_pq_get_checksum(const CANPacket_t *to_push) { int addr = GET_ADDR(to_push); @@ -60,6 +66,8 @@ static uint8_t volkswagen_pq_get_counter(const CANPacket_t *to_push) { counter = (uint8_t)(GET_BYTE(to_push, 1) & 0xF0U) >> 4; } else if (addr == MSG_GRA_NEU) { counter = (uint8_t)(GET_BYTE(to_push, 2) & 0xF0U) >> 4; + } else if (addr == MSG_GAS_SENSOR) { + counter = GET_BYTE(to_push, 4) & 0x0FU; } else { } @@ -89,6 +97,7 @@ static safety_config volkswagen_pq_init(uint16_t param) { volkswagen_resume_button_prev = false; #ifdef ALLOW_DEBUG + enable_gas_interceptor = GET_FLAG(param, FLAG_VW_GAS_INTERCEPTOR); volkswagen_longitudinal = GET_FLAG(param, FLAG_VOLKSWAGEN_LONG_CONTROL); #endif return volkswagen_longitudinal ? BUILD_SAFETY_CFG(volkswagen_pq_rx_checks, VOLKSWAGEN_PQ_LONG_TX_MSGS) : \ @@ -153,9 +162,15 @@ static void volkswagen_pq_rx_hook(const CANPacket_t *to_push) { pcm_cruise_check(cruise_engaged); } } + // Signal: GAS_INTERCEPT_SENSOR + if (((addr == MSG_GAS_SENSOR) && enable_gas_interceptor)) { + int gas_interceptor = VOLKSWAGEN_GET_INTERCEPTOR(to_push); + gas_pressed = gas_interceptor > VOLKSWAGEN_GAS_INTERCEPTOR_THRSLD; + gas_interceptor_prev = gas_interceptor; + } // Signal: Motor_3.Fahrpedal_Rohsignal - if (addr == MSG_MOTOR_3) { + if ((addr == MSG_MOTOR_3)&& !enable_gas_interceptor) { gas_pressed = (GET_BYTE(to_push, 2)); } @@ -201,7 +216,12 @@ static bool volkswagen_pq_tx_hook(const CANPacket_t *to_send) { tx = false; } } - + // GAS: safety check (interceptor) + if (addr == MSG_GAS_1) { + if (longitudinal_interceptor_checks(to_send)) { + tx = false; + } + } // FORCE CANCEL: ensuring that only the cancel button press is sent when controls are off. // This avoids unintended engagements while still allowing resume spam if (addr == MSG_GRA_NEU) { diff --git a/python/__init__.py b/python/__init__.py index 2870f66be4..25d8410b47 100644 --- a/python/__init__.py +++ b/python/__init__.py @@ -222,6 +222,7 @@ class Panda: FLAG_TESLA_RAVEN = 4 FLAG_VOLKSWAGEN_LONG_CONTROL = 1 + FLAG_VW_GAS_INTERCEPTOR = 16 FLAG_CHRYSLER_RAM_DT = 1 FLAG_CHRYSLER_RAM_HD = 2 diff --git a/tests/safety/test_volkswagen_pq.py b/tests/safety/test_volkswagen_pq.py index f2bc317868..f683fd0a80 100755 --- a/tests/safety/test_volkswagen_pq.py +++ b/tests/safety/test_volkswagen_pq.py @@ -15,6 +15,8 @@ MSG_MOTOR_5 = 0x480 # RX from ECU, for ACC main switch state MSG_ACC_GRA_ANZEIGE = 0x56A # TX by OP, ACC HUD MSG_LDW_1 = 0x5BE # TX by OP, Lane line recognition and text alerts +MSG_GAS_1 = 0x200 # TX by OP, Gas Control +MSG_GAS_SENSOR = 0x201 # RX by OP, GAS Sensor class TestVolkswagenPqSafety(common.PandaCarSafetyTest, common.DriverTorqueSteeringSafetyTest): @@ -115,7 +117,30 @@ def test_torque_measurements(self): self.assertEqual(0, self.safety.get_torque_driver_max()) self.assertEqual(0, self.safety.get_torque_driver_min()) +class TestVolkswagenPqGasInterceptorBase(common.GasInterceptorSafetyTest, TestVolkswagenPqSafety): + TX_MSGS = [[MSG_GAS_1, 0], [MSG_GAS_SENSOR, 0]] + INTERCEPTOR_THRESHOLD = 475 + + def setUp(self): + super().setUp() + self.safety.set_safety_hooks(Panda.SAFETY_VOLKSWAGEN_PQ, self.safety.get_current_safety_param() | + Panda.FLAG_VW_GAS_INTERCEPTOR) + self.safety.init_tests() + + def test_stock_longitudinal(self): + # If stock longitudinal is set, the gas interceptor safety param should not be respected + self.safety.set_safety_hooks(Panda.SAFETY_VOLKSWAGEN_PQ, self.safety.get_current_safety_param() | + Panda.FLAG_VOLKSWAGEN_LONG_CONTROL) + self.safety.init_tests() + + # Spot check a few gas interceptor tests: (1) reading interceptor, + # (2) behavior around interceptor, and (3) txing interceptor msgs + for test in (self.test_prev_gas_interceptor, self.test_disengage_on_gas_interceptor, + self.test_gas_interceptor_safety_check): + with self.subTest(test=test.__name__): + with self.assertRaises(AssertionError): + test() class TestVolkswagenPqStockSafety(TestVolkswagenPqSafety): # Transmit of GRA_Neu is allowed on bus 0 and 2 to keep compatibility with gateway and camera integration TX_MSGS = [[MSG_HCA_1, 0], [MSG_GRA_NEU, 0], [MSG_GRA_NEU, 2], [MSG_LDW_1, 0]] @@ -137,7 +162,8 @@ def test_spam_cancel_safety_check(self): self.safety.set_controls_allowed(1) self.assertTrue(self._tx(self._button_msg(resume=True))) - +class TestVolkswagenPqStockSafetyGasInterceptor(TestVolkswagenPqGasInterceptorBase, TestVolkswagenPqStockSafety): + pass class TestVolkswagenPqLongSafety(TestVolkswagenPqSafety, common.LongitudinalAccelSafetyTest): TX_MSGS = [[MSG_HCA_1, 0], [MSG_LDW_1, 0], [MSG_ACC_SYSTEM, 0], [MSG_ACC_GRA_ANZEIGE, 0]] FWD_BLACKLISTED_ADDRS = {2: [MSG_HCA_1, MSG_LDW_1, MSG_ACC_SYSTEM, MSG_ACC_GRA_ANZEIGE]} @@ -195,5 +221,8 @@ def test_torque_cmd_enable_variants(self): self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_RATE_UP, steer_req=1, hca_status=enabled_status)), f"torque cmd rejected with {enabled_status=}") +class TestVolkswagenPqLongSafetyGasInterceptor(TestVolkswagenPqGasInterceptorBase, TestVolkswagenPqLongSafety): + pass + if __name__ == "__main__": unittest.main()