diff --git a/smach_ros/smach_ros/introspection.py b/smach_ros/smach_ros/introspection.py index 038d645..f44a51f 100644 --- a/smach_ros/smach_ros/introspection.py +++ b/smach_ros/smach_ros/introspection.py @@ -2,10 +2,10 @@ import rclpy from rclpy.node import Node from rclpy.clock import ROSClock -from rclpy.executors import SingleThreadedExecutor from std_msgs.msg import Header import base64 +import time import pickle import threading from functools import partial @@ -24,14 +24,6 @@ class IntrospectionClient(Node): def __init__(self, node_name='introspection_client', **kwargs): Node.__init__(self, node_name, **kwargs) - self._executor = SingleThreadedExecutor() - self._executor.add_node(self) - self._spinner = threading.Thread(target=self._executor.spin) - self._spinner.start() - - def __del__(self): - self._executor.shutdown() - self._spinner.join() def get_servers(self): """Get the base names that are broadcasting smach states.""" @@ -73,7 +65,7 @@ def set_initial_state(self, initial_status_msg = SmachContainerInitialStatusCmd( path = path, initial_states = initial_states, - local_data = bytearray(pickle.dumps(initial_userdata._data, 2))) + local_data = base64.b64encode(pickle.dumps(initial_userdata._data, 2))) # A status message to receive confirmation that the state was set properly msg_response = SmachContainerStatus() @@ -100,13 +92,11 @@ def local_cb(msg_response, msg): 1) init_pub.publish(initial_status_msg) - clock = ROSClock() - rate = self.create_rate(4, clock) - start_time = clock.now() + start_time = self.get_clock().now() # Block until we get a new state back if timeout is not None: - while clock.now() - start_time < timeout: + while self.get_clock().now() - start_time < timeout: # Send the initial state command init_pub.publish(initial_status_msg) @@ -124,7 +114,7 @@ def local_cb(msg_response, msg): if state_match and ud_match: return True - rate.sleep() + time.sleep(0.25) return False class ContainerProxy(): @@ -216,15 +206,15 @@ def _publish_status(self, info_str=''): with self._status_pub_lock: path = self._path - #print str(structure_msg) - # Construct status message - #print self._container.get_active_states() + # Transform userdata to dictionary for pickling + keys = list(self._container.userdata.keys()) + data = {key: self._container.userdata[key] for key in keys} state_msg = SmachContainerStatus( header=Header(stamp = ROSClock().now().to_msg()), path=path, initial_states=self._container.get_initial_states(), active_states=self._container.get_active_states(), - local_data=bytearray(pickle.dumps(self._container.userdata._data, 2)), + local_data=base64.b64encode(pickle.dumps(data, 2)), info=info_str) # Publish message self._status_pub.publish(state_msg) @@ -249,7 +239,7 @@ def _init_cmd_cb(self, msg): if msg.path == self._path: if all(s in self._container.get_children() for s in initial_states): ud = smach.UserData() - ud._data = pickle.loads(msg.local_data) + ud._data = pickle.loads(base64.b64decode(msg.local_data)) self._server_node.get_logger().debug("Setting initial state in smach path: '"+self._path+"' to '"+str(initial_states)+"' with userdata: "+str(ud._data)) # Set the initial state @@ -277,15 +267,6 @@ def __init__(self, server_name, state, path): self._state = state self._path = path - self._executor = SingleThreadedExecutor() - self._executor.add_node(self) - self._spinner = threading.Thread(target=self._executor.spin) - self._spinner.start() - - def __del__(self): - self._executor.shutdown() - self._spinner.join() - def start(self): # Construct proxies self.construct(self._server_name, self._state, self._path) diff --git a/smach_ros/test/introspection_test.py b/smach_ros/test/introspection_test.py index 7acc7ee..b9db743 100755 --- a/smach_ros/test/introspection_test.py +++ b/smach_ros/test/introspection_test.py @@ -6,10 +6,8 @@ import unittest -from smach import * -from smach_ros import * - -from smach_msgs.msg import * +from smach import StateMachine, UserData +from smach_ros import RosState, IntrospectionServer, IntrospectionClient ### Custom state classe class Setter(RosState): @@ -41,8 +39,6 @@ def test_introspection(self): node = rclpy.create_node("sm_node") node.get_logger().set_level(rclpy.logging.LoggingSeverity.DEBUG) executor = SingleThreadedExecutor() - def spin(): - rclpy.spin(node, executor=executor) # Construct state machine sm = StateMachine(['done']) @@ -67,13 +63,22 @@ def spin(): # Run introspector intro_server = IntrospectionServer('intro_test', sm, '/intro_test') intro_server.get_logger().set_level(rclpy.logging.LoggingSeverity.DEBUG) - server_thread = threading.Thread(target=intro_server.start) - server_thread.start() + intro_server.start() intro_client = IntrospectionClient() intro_client.get_logger().set_level(rclpy.logging.LoggingSeverity.DEBUG) servers = intro_client.get_servers() + executor.add_node(node) + executor.add_node(intro_server) + executor.add_node(intro_client) + + def spin(executor): + executor.spin() + + spinner = threading.Thread(target=spin, args=(executor,)) + spinner.start() + rate = intro_client.create_rate(10) while '/intro_test' not in servers and rclpy.ok(): servers = intro_client.get_servers() @@ -93,9 +98,6 @@ def spin(): assert init_set - spinner = threading.Thread(target=spin) - spinner.start() - outcome = sm.execute() assert outcome == 'done' @@ -106,9 +108,11 @@ def spin(): node.get_logger().info("Client destroyed") intro_server.destroy_node() node.get_logger().info("Server destroyed") - #executor.shutdown() - #spinner.join() + + executor.shutdown() node.destroy_node() + spinner.join() + def main(): rclpy.init()