Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete spinner in Introspection tools and fix endoding #3

Open
wants to merge 7 commits into
base: ros2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 10 additions & 29 deletions smach_ros/smach_ros/introspection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import rclpy
from rclpy.node import Node
from rclpy.clock import ROSClock
from rclpy.executors import SingleThreadedExecutor
from std_msgs.msg import Header

import base64
import time
import pickle
import threading
from functools import partial
Expand All @@ -24,14 +24,6 @@
class IntrospectionClient(Node):
def __init__(self, node_name='introspection_client', **kwargs):
Node.__init__(self, node_name, **kwargs)
self._executor = SingleThreadedExecutor()
self._executor.add_node(self)
self._spinner = threading.Thread(target=self._executor.spin)
self._spinner.start()

def __del__(self):
self._executor.shutdown()
self._spinner.join()

def get_servers(self):
"""Get the base names that are broadcasting smach states."""
Expand Down Expand Up @@ -73,7 +65,7 @@ def set_initial_state(self,
initial_status_msg = SmachContainerInitialStatusCmd(
path = path,
initial_states = initial_states,
local_data = bytearray(pickle.dumps(initial_userdata._data, 2)))
local_data = base64.b64encode(pickle.dumps(initial_userdata._data, 2)))

# A status message to receive confirmation that the state was set properly
msg_response = SmachContainerStatus()
Expand All @@ -100,13 +92,11 @@ def local_cb(msg_response, msg):
1)
init_pub.publish(initial_status_msg)

clock = ROSClock()
rate = self.create_rate(4, clock)
start_time = clock.now()
start_time = self.get_clock().now()

# Block until we get a new state back
if timeout is not None:
while clock.now() - start_time < timeout:
while self.get_clock().now() - start_time < timeout:
# Send the initial state command
init_pub.publish(initial_status_msg)

Expand All @@ -124,7 +114,7 @@ def local_cb(msg_response, msg):

if state_match and ud_match:
return True
rate.sleep()
time.sleep(0.25)
return False

class ContainerProxy():
Expand Down Expand Up @@ -216,15 +206,15 @@ def _publish_status(self, info_str=''):
with self._status_pub_lock:
path = self._path

#print str(structure_msg)
# Construct status message
#print self._container.get_active_states()
# Transform userdata to dictionary for pickling
keys = list(self._container.userdata.keys())
data = {key: self._container.userdata[key] for key in keys}
state_msg = SmachContainerStatus(
header=Header(stamp = ROSClock().now().to_msg()),
path=path,
initial_states=self._container.get_initial_states(),
active_states=self._container.get_active_states(),
local_data=bytearray(pickle.dumps(self._container.userdata._data, 2)),
local_data=base64.b64encode(pickle.dumps(data, 2)),
info=info_str)
# Publish message
self._status_pub.publish(state_msg)
Expand All @@ -249,7 +239,7 @@ def _init_cmd_cb(self, msg):
if msg.path == self._path:
if all(s in self._container.get_children() for s in initial_states):
ud = smach.UserData()
ud._data = pickle.loads(msg.local_data)
ud._data = pickle.loads(base64.b64decode(msg.local_data))
self._server_node.get_logger().debug("Setting initial state in smach path: '"+self._path+"' to '"+str(initial_states)+"' with userdata: "+str(ud._data))

# Set the initial state
Expand Down Expand Up @@ -277,15 +267,6 @@ def __init__(self, server_name, state, path):
self._state = state
self._path = path

self._executor = SingleThreadedExecutor()
self._executor.add_node(self)
self._spinner = threading.Thread(target=self._executor.spin)
self._spinner.start()

def __del__(self):
self._executor.shutdown()
self._spinner.join()

def start(self):
# Construct proxies
self.construct(self._server_name, self._state, self._path)
Expand Down
30 changes: 17 additions & 13 deletions smach_ros/test/introspection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@

import unittest

from smach import *
from smach_ros import *

from smach_msgs.msg import *
from smach import StateMachine, UserData
from smach_ros import RosState, IntrospectionServer, IntrospectionClient

### Custom state classe
class Setter(RosState):
Expand Down Expand Up @@ -41,8 +39,6 @@ def test_introspection(self):
node = rclpy.create_node("sm_node")
node.get_logger().set_level(rclpy.logging.LoggingSeverity.DEBUG)
executor = SingleThreadedExecutor()
def spin():
rclpy.spin(node, executor=executor)

# Construct state machine
sm = StateMachine(['done'])
Expand All @@ -67,13 +63,22 @@ def spin():
# Run introspector
intro_server = IntrospectionServer('intro_test', sm, '/intro_test')
intro_server.get_logger().set_level(rclpy.logging.LoggingSeverity.DEBUG)
server_thread = threading.Thread(target=intro_server.start)
server_thread.start()
intro_server.start()

intro_client = IntrospectionClient()
intro_client.get_logger().set_level(rclpy.logging.LoggingSeverity.DEBUG)
servers = intro_client.get_servers()

executor.add_node(node)
executor.add_node(intro_server)
executor.add_node(intro_client)

def spin(executor):
executor.spin()

spinner = threading.Thread(target=spin, args=(executor,))
spinner.start()

rate = intro_client.create_rate(10)
while '/intro_test' not in servers and rclpy.ok():
servers = intro_client.get_servers()
Expand All @@ -93,9 +98,6 @@ def spin():

assert init_set

spinner = threading.Thread(target=spin)
spinner.start()

outcome = sm.execute()

assert outcome == 'done'
Expand All @@ -106,9 +108,11 @@ def spin():
node.get_logger().info("Client destroyed")
intro_server.destroy_node()
node.get_logger().info("Server destroyed")
#executor.shutdown()
#spinner.join()

executor.shutdown()
node.destroy_node()
spinner.join()


def main():
rclpy.init()
Expand Down
Loading