diff --git a/ros2topic/test/test_cli.py b/ros2topic/test/test_cli.py index dd52f9eac..82b9fdfae 100644 --- a/ros2topic/test/test_cli.py +++ b/ros2topic/test/test_cli.py @@ -30,7 +30,6 @@ import launch_testing.asserts import launch_testing.markers import launch_testing.tools -import launch_testing.tools.text import launch_testing_ros.tools import pytest @@ -189,7 +188,6 @@ def launch_topic_command(self, arguments): output_filter=rmw_implementation_filter ) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_list_topics(self): with self.launch_topic_command(arguments=['list']) as topic_command: @@ -211,7 +209,6 @@ def test_list_topics(self): strict=True ) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_list_all_topics(self): with self.launch_topic_command( @@ -236,7 +233,6 @@ def test_list_all_topics(self): strict=True ) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_list_with_types(self): with self.launch_topic_command(arguments=['list', '-t']) as topic_command: @@ -258,7 +254,6 @@ def test_list_with_types(self): strict=True ) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_list_count(self): with self.launch_topic_command(arguments=['list', '-c']) as topic_command: @@ -268,7 +263,6 @@ def test_list_count(self): assert len(output_lines) == 1 assert int(output_lines[0]) == 9 - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_topic_endpoint_info(self): with self.launch_topic_command(arguments=['info', '/chatter']) as topic_command: @@ -284,46 +278,33 @@ def test_topic_endpoint_info(self): strict=True ) - # @launch_testing.markers.retry_on_failure(times=5) + @launch_testing.markers.retry_on_failure(times=5) def test_topic_endpoint_info_verbose(self): - with self.launch_topic_command(arguments=['info', '-v', '/chatter']) as topic_command: assert topic_command.wait_for_shutdown(timeout=10) assert topic_command.exit_code == launch_testing.asserts.EXIT_OK - # print(topic_command.output) - expected_lines=[ - 'Type: std_msgs/msg/String', - '', - 'Publisher count: 1', - '', - re.compile(r'Node name: \w+'), - 'Node namespace: /', - 'Topic type: std_msgs/msg/String', - re.compile(r'Endpoint type: (INVALID|PUBLISHER|SUBSCRIPTION)'), - re.compile(r'GID: [\w\.]+'), - 'QoS profile:', - re.compile(r' Reliability: (RELIABLE|BEST_EFFORT|SYSTEM_DEFAULT|UNKNOWN)'), - re.compile(r' Durability: (VOLATILE|TRANSIENT_LOCAL|SYSTEM_DEFAULT|UNKNOWN)'), - ' Lifespan: Infinite', - ' Deadline: Infinite', - re.compile(r' Liveliness: (AUTOMATIC|MANUAL_BY_TOPIC|SYSTEM_DEFAULT|UNKNOWN)'), - ' Liveliness lease duration: Infinite', - '', - 'Subscription count: 0', - '' - ] - # actual_lines = topic_command.output.splitlines() - # for expected_line, actual_line in zip(expected_lines, actual_lines): - # if hasattr(expected_line, 'match'): - # assert expected_line.match(actual_line) - # else: - # assert expected_line == actual_line - # assert expected_line == actual_line - # match = launch_testing.tools.text.build_line_match(expected_line) - # assert match([actual_line], start=0) is not None - assert launch_testing.tools.expect_output( - expected_lines=expected_lines, + expected_lines=[ + 'Type: std_msgs/msg/String', + '', + 'Publisher count: 1', + '', + re.compile(r'Node name: \w+'), + 'Node namespace: /', + 'Topic type: std_msgs/msg/String', + re.compile(r'Endpoint type: (INVALID|PUBLISHER|SUBSCRIPTION)'), + re.compile(r'GID: [\w\.]+'), + 'QoS profile:', + re.compile(r' Reliability: (RELIABLE|BEST_EFFORT|SYSTEM_DEFAULT|UNKNOWN)'), + re.compile(r' Durability: (VOLATILE|TRANSIENT_LOCAL|SYSTEM_DEFAULT|UNKNOWN)'), + ' Lifespan: Infinite', + ' Deadline: Infinite', + re.compile(r' Liveliness: (AUTOMATIC|MANUAL_BY_TOPIC|SYSTEM_DEFAULT|UNKNOWN)'), + ' Liveliness lease duration: Infinite', + '', + 'Subscription count: 0', + '' + ], text=topic_command.output, strict=True ) @@ -339,7 +320,6 @@ def test_info_on_unknown_topic(self): strict=True ) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_topic_type(self): with self.launch_topic_command(arguments=['type', '/chatter']) as topic_command: @@ -351,7 +331,6 @@ def test_topic_type(self): strict=True ) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_hidden_topic_type(self): with self.launch_topic_command( @@ -361,7 +340,6 @@ def test_hidden_topic_type(self): assert topic_command.exit_code == 1 assert topic_command.output == '' - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_find_topic_type(self): with self.launch_topic_command( @@ -373,7 +351,6 @@ def test_find_topic_type(self): expected_lines=['/rosout'], text=topic_command.output, strict=True ) - @unittest.skip('poop') def test_find_not_a_topic_typename(self): with self.launch_topic_command( arguments=['find', 'rcl_interfaces/msg/NotAMessageTypeName'] @@ -382,7 +359,6 @@ def test_find_not_a_topic_typename(self): assert topic_command.exit_code == launch_testing.asserts.EXIT_OK assert not topic_command.output - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_topic_echo(self): with self.launch_topic_command( @@ -396,7 +372,6 @@ def test_topic_echo(self): ), timeout=10) assert topic_command.wait_for_shutdown(timeout=10) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_no_str_topic_echo(self): with self.launch_topic_command( @@ -410,7 +385,6 @@ def test_no_str_topic_echo(self): ), timeout=10) assert topic_command.wait_for_shutdown(timeout=10) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_csv_topic_echo(self): with self.launch_topic_command( @@ -423,7 +397,6 @@ def test_csv_topic_echo(self): ), timeout=10) assert topic_command.wait_for_shutdown(timeout=10) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_no_arr_topic_echo_on_array_message(self): with self.launch_topic_command( @@ -468,7 +441,6 @@ def test_no_arr_topic_echo_on_array_message(self): ), timeout=10), 'Output does not match: ' + topic_command.output assert topic_command.wait_for_shutdown(timeout=10) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_no_arr_topic_echo_on_seq_message(self): with self.launch_topic_command( @@ -513,7 +485,6 @@ def test_no_arr_topic_echo_on_seq_message(self): ), timeout=10) assert topic_command.wait_for_shutdown(timeout=10) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_no_arr_topic_echo_on_bounded_seq_message(self): with self.launch_topic_command( @@ -559,7 +530,6 @@ def test_no_arr_topic_echo_on_bounded_seq_message(self): ), timeout=10) assert topic_command.wait_for_shutdown(timeout=10) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_truncate_length_topic_echo(self): with self.launch_topic_command( @@ -573,7 +543,6 @@ def test_truncate_length_topic_echo(self): ), timeout=10) assert topic_command.wait_for_shutdown(timeout=10) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_topic_echo_field(self): with self.launch_topic_command( @@ -587,7 +556,6 @@ def test_topic_echo_field(self): ), timeout=10) assert topic_command.wait_for_shutdown(timeout=10) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_topic_echo_field_nested(self): with self.launch_topic_command( @@ -603,7 +571,6 @@ def test_topic_echo_field_nested(self): ), timeout=10) assert topic_command.wait_for_shutdown(timeout=10) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_topic_echo_field_not_a_member(self): with self.launch_topic_command( @@ -616,7 +583,6 @@ def test_topic_echo_field_not_a_member(self): ), timeout=10) assert topic_command.wait_for_shutdown(timeout=10) - @unittest.skip('poop') def test_topic_echo_field_invalid(self): with self.launch_topic_command( arguments=['echo', '/arrays', '--field', '/'], @@ -638,7 +604,6 @@ def test_topic_echo_field_invalid(self): ), timeout=10) assert topic_command.wait_for_shutdown(timeout=10) - @unittest.skip('poop') def test_topic_echo_no_publisher(self): with self.launch_topic_command( arguments=['echo', '/this_topic_has_no_pub'], @@ -653,7 +618,6 @@ def test_topic_echo_no_publisher(self): strict=True ) - @unittest.skip('poop') def test_topic_pub(self): with self.launch_topic_command( arguments=[ @@ -680,7 +644,6 @@ def test_topic_pub(self): ), timeout=10) assert topic_command.wait_for_shutdown(timeout=10) - @unittest.skip('poop') def test_topic_pub_once(self): with self.launch_topic_command( arguments=[ @@ -708,7 +671,6 @@ def test_topic_pub_once(self): ), timeout=10) assert topic_command.exit_code == launch_testing.asserts.EXIT_OK - @unittest.skip('poop') def test_topic_pub_print_every_two(self): with self.launch_topic_command( arguments=[ @@ -738,7 +700,6 @@ def test_topic_pub_print_every_two(self): ), timeout=10) assert topic_command.wait_for_shutdown(timeout=10) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_topic_delay(self): average_delay_line_pattern = re.compile(r'average delay: (\d+.\d{3})') @@ -757,7 +718,6 @@ def test_topic_delay(self): average_delay = float(average_delay_line_pattern.match(head_line).group(1)) assert math.isclose(average_delay, 0.0, abs_tol=10e-3) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_topic_hz(self): average_rate_line_pattern = re.compile(r'average rate: (\d+.\d{3})') @@ -776,7 +736,6 @@ def test_topic_hz(self): average_rate = float(average_rate_line_pattern.match(head_line).group(1)) assert math.isclose(average_rate, 1., rel_tol=1e-2) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_filtered_topic_hz(self): average_rate_line_pattern = re.compile(r'average rate: (\d+.\d{3})') @@ -802,7 +761,6 @@ def test_filtered_topic_hz(self): average_rate = float(average_rate_line_pattern.match(head_line).group(1)) assert math.isclose(average_rate, 0.5, rel_tol=1e-2) - @unittest.skip('poop') @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_topic_bw(self): with self.launch_topic_command(arguments=['bw', '/defaults']) as topic_command: