-
Notifications
You must be signed in to change notification settings - Fork 183
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: publish top level msg when error is received (#317)
- Loading branch information
1 parent
24fb79a
commit 9e44dd8
Showing
4 changed files
with
145 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
import unittest | ||
|
||
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus | ||
from launch import LaunchDescription | ||
from launch_ros.actions import Node | ||
from launch_testing.actions import ReadyToTest | ||
import pytest | ||
import rclpy | ||
|
||
|
||
@pytest.mark.launch_test | ||
def generate_test_description(): | ||
# Launch the aggregator | ||
parameters = [{'analyzers.test.type': 'diagnostic_aggregator/GenericAnalyzer'}, | ||
{'analyzers.test.path': 'Test'}, | ||
{'analyzers.test.contains': ['test']}, | ||
{'critical': True}] | ||
|
||
aggregator_cmd = Node( | ||
package='diagnostic_aggregator', | ||
executable='aggregator_node', | ||
name='diagnostic_aggregator', | ||
parameters=parameters) | ||
|
||
ld = LaunchDescription() | ||
|
||
# Launch the node | ||
ld.add_action(aggregator_cmd) | ||
ld.add_action(ReadyToTest()) | ||
return ld | ||
|
||
|
||
class TestProcessOutput(unittest.TestCase): | ||
|
||
@ classmethod | ||
def setUpClass(cls): | ||
# Initialize the ROS context for the test node | ||
rclpy.init() | ||
|
||
@ classmethod | ||
def tearDownClass(cls): | ||
# Shutdown the ROS context | ||
rclpy.shutdown() | ||
|
||
def setUp(self): | ||
# Create a ROS node for tests | ||
self.node = rclpy.create_node('test_node') | ||
self.publisher = self.node.create_publisher(DiagnosticArray, '/diagnostics', 1) | ||
self.subscriber = self.node.create_subscription( | ||
DiagnosticStatus, | ||
'/diagnostics_toplevel_state', | ||
lambda msg: self.received_state.append(msg.level), | ||
1) | ||
self.received_state = [] | ||
|
||
def tearDown(self): | ||
self.node.destroy_node() | ||
|
||
def publish_message(self, level): | ||
msg = DiagnosticArray() | ||
msg.status.append(DiagnosticStatus()) | ||
msg.status[0].level = level | ||
msg.status[0].name = 'test status' | ||
while len(self.received_state) == 0: | ||
self.publisher.publish(msg) | ||
rclpy.spin_once(self.node) | ||
|
||
return self.node.get_clock().now() | ||
|
||
def test_critical_publisher(self): | ||
# Publish the ok message and wait till the toplevel state is received | ||
state = DiagnosticStatus.OK | ||
time_0 = self.publish_message(state) | ||
|
||
assert (self.received_state[0] == state), \ | ||
'Received state is not the same as the sent state' | ||
self.received_state.clear() | ||
|
||
# Publish the ok message and expect the toplevel state after 1 second period | ||
time_1 = self.publish_message(state) | ||
assert (time_1 - time_0 > rclpy.duration.Duration(seconds=0.99)), \ | ||
'OK message received too early' | ||
assert (self.received_state[0] == state), \ | ||
'Received state is not the same as the sent state' | ||
self.received_state.clear() | ||
|
||
# Publish the message and expect the critical error message immediately | ||
state = DiagnosticStatus.ERROR | ||
time_2 = self.publish_message(state) | ||
|
||
assert (time_2 - time_1 < rclpy.duration.Duration(seconds=0.1)), \ | ||
'Critical error message not received within 0.1 second' | ||
assert (self.received_state[0] == state), \ | ||
'Received state is not the same as the sent state' | ||
self.received_state.clear() | ||
|
||
# Next error message should be sent at standard 1 second rate | ||
time_3 = self.publish_message(state) | ||
|
||
assert (time_3 - time_1 > rclpy.duration.Duration(seconds=0.99)), \ | ||
'Periodic error message received too early' | ||
assert (self.received_state[0] == state), \ | ||
'Received state is not the same as the sent state' |