-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
thread leaks when using rosrust::api::Ros::new #202
Comments
Got it working with the first suggestion from this stackoverflow post. I also fixed a TODO in the source (see below), which probably saves a bit CPU as well.
Patch for rosrust: diff --git a/rosrust/Cargo.toml b/rosrust/Cargo.toml
index eb9f3ed..6214cd0 100644
--- a/rosrust/Cargo.toml
+++ b/rosrust/Cargo.toml
@@ -18,11 +18,12 @@ hostname = "0.3.1"
regex = "1.5.4"
serde = "1.0.127"
serde_derive = "1.0.127"
-xml-rpc = "0.0.12"
+xml-rpc = {path = "../../xml-rpc-rs"}
yaml-rust = "0.4.5"
crossbeam = "0.8.1"
socket2 = "0.4.1"
colored = "2.0.0"
+libc = "0.2"
[dependencies.ros_message]
path = "../ros_message"
diff --git a/rosrust/src/api/ros.rs b/rosrust/src/api/ros.rs
index a229efd..2f0e630 100644
--- a/rosrust/src/api/ros.rs
+++ b/rosrust/src/api/ros.rs
@@ -115,30 +115,45 @@ impl Ros {
}));
let param_cache = Arc::new(Mutex::new(Default::default()));
- let slave = Slave::new(
- master_uri,
- hostname,
- bind_host,
- 0,
- &name,
- Arc::clone(¶m_cache),
- Arc::clone(&shutdown_manager),
- )?;
- let master = Master::new(master_uri, &name, slave.uri())?;
-
- Ok(Ros {
- master: Arc::new(master),
- slave: Arc::new(slave),
- param_cache,
- hostname: String::from(hostname),
- bind_address: String::from(bind_host),
- resolver,
- name,
- clock: Arc::new(RealClock::default()),
- static_subs: Vec::new(),
- logger,
- shutdown_manager,
- })
+
+ let gen_slave_and_master = {
+ let name = name.clone();
+ let shutdown_manager = shutdown_manager.clone();
+ let logger = logger.clone();
+ let param_cache = param_cache.clone();
+ move || {
+ let slave = Slave::new(
+ master_uri,
+ hostname,
+ bind_host,
+ 0,
+ &name,
+ Arc::clone(¶m_cache),
+ Arc::clone(&shutdown_manager),
+ )?;
+ let master = Master::new(master_uri, &name, slave.uri())?;
+ Ok((slave, master))
+ }};
+
+ match gen_slave_and_master() {
+ Ok((slave, master)) => Ok(Ros {
+ master: Arc::new(master),
+ slave: Arc::new(slave),
+ param_cache,
+ hostname: String::from(hostname),
+ bind_address: String::from(bind_host),
+ resolver,
+ name,
+ clock: Arc::new(RealClock::default()),
+ static_subs: Vec::new(),
+ logger,
+ shutdown_manager,
+ }),
+ Err(e) => {
+ shutdown_manager.shutdown();
+ Err(e)
+ }
+ }
}
fn map(&mut self, source: &str, destination: &str) -> Result<()> {
diff --git a/rosrust/src/api/slave/mod.rs b/rosrust/src/api/slave/mod.rs
index d99c3ea..b8ed1f3 100644
--- a/rosrust/src/api/slave/mod.rs
+++ b/rosrust/src/api/slave/mod.rs
@@ -63,9 +63,7 @@ impl Slave {
Ok(_) | Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => {}
}
- bound_handler.poll();
- // TODO: use a timed out poll once rouille provides it
- std::thread::sleep(std::time::Duration::from_millis(5));
+ bound_handler.poll_timeout(std::time::Duration::from_millis(500));
}
shutdown_manager.shutdown();
});
@@ -180,3 +178,10 @@ impl Slave {
self.subscriptions.publisher_uris(topic)
}
}
+
+
+impl Drop for Slave {
+ fn drop(&mut self) {
+ self.shutdown_tx.send().unwrap();
+ }
+}
diff --git a/rosrust/src/tcpros/publisher.rs b/rosrust/src/tcpros/publisher.rs
index ef6c842..754de91 100644
--- a/rosrust/src/tcpros/publisher.rs
+++ b/rosrust/src/tcpros/publisher.rs
@@ -10,6 +10,7 @@ use log::error;
use std::collections::HashMap;
use std::net::{TcpListener, TcpStream, ToSocketAddrs};
use std::sync::{atomic, Arc, Mutex};
+use std::os::fd::AsRawFd;
pub struct Publisher {
subscriptions: DataStream,
@@ -18,11 +19,16 @@ pub struct Publisher {
last_message: Arc<Mutex<Arc<Vec<u8>>>>,
queue_size: usize,
exists: Arc<atomic::AtomicBool>,
+ raw_fd : i32,
+ join_handle: Option<std::thread::JoinHandle<()>>
}
impl Drop for Publisher {
fn drop(&mut self) {
+ println!("dropping publisher for {}", &self.topic.name);
self.exists.store(false, atomic::Ordering::SeqCst);
+ unsafe { libc::shutdown(self.raw_fd, libc::SHUT_RD); }
+ self.join_handle.take().unwrap().join().unwrap();
}
}
@@ -138,6 +144,7 @@ impl Publisher {
U: ToSocketAddrs,
{
let listener = TcpListener::bind(address)?;
+ let raw_fd = listener.as_raw_fd();
let socket_address = listener.local_addr()?;
let publisher_exists = Arc::new(atomic::AtomicBool::new(true));
@@ -168,7 +175,7 @@ impl Publisher {
}
};
- tcpconnection::iterate(listener, format!("topic '{}'", topic), iterate_handler);
+ let join_handle = Some(tcpconnection::iterate(listener, format!("topic '{}'", topic), iterate_handler));
let topic = Topic {
name: String::from(topic),
@@ -183,6 +190,8 @@ impl Publisher {
last_message,
queue_size,
exists: publisher_exists,
+ raw_fd,
+ join_handle
})
}
diff --git a/rosrust/src/tcpros/util/tcpconnection.rs b/rosrust/src/tcpros/util/tcpconnection.rs
index 9cc56d4..a8ff41b 100644
--- a/rosrust/src/tcpros/util/tcpconnection.rs
+++ b/rosrust/src/tcpros/util/tcpconnection.rs
@@ -8,11 +8,11 @@ pub enum Feedback {
StopAccepting,
}
-pub fn iterate<F>(listener: TcpListener, tag: String, handler: F)
+pub fn iterate<F>(listener: TcpListener, tag: String, handler: F) -> thread::JoinHandle<()>
where
F: Fn(TcpStream) -> Feedback + Send + 'static,
{
- thread::spawn(move || listener_thread(&listener, &tag, handler));
+ thread::spawn(move || listener_thread(&listener, &tag, handler))
}
fn listener_thread<F>(connections: &TcpListener, tag: &str, handler: F)
@@ -26,7 +26,12 @@ where
Feedback::StopAccepting => break,
},
Err(err) => {
- error!("TCP connection failed at {}: {}", tag, err);
+ if err.kind() == std::io::ErrorKind::InvalidInput {
+ println!("got EINVAL, quitting listener thread");
+ return;
+ }
+ error!("TCP connection failed at {}: {}", & tag, &err);
+ println!("listener_thread ({}) error: {}", &tag, &err);
}
}
} Patch for xml-rpc-rs:
|
Hi,
I'm writing a watchdog type of thing, which waits for roscore to come online, offers a few services while a roscore is running, keeps querying the
run_id
parameter periodically to notice ROS restarts/shutdown.I do the "waiting for roscore" step by calling
rosrust::api::Ros::new
in a loop until it succeeds. Unfortunately, this spawns lots of threads that never terminate.I managed to eliminate most of these by implementing
Drop
forSlave
…… and patching
xml-rpc-rs
to join the server ondrop
.(which unfortunately breaks the binaries in that crate)
The last remaining leaking thread comes from
tcpconnection
, which is spawned for the/rosout
topic inRos::new
. Due to the interface ofTcpListener
from std, I didn't see an obvious way to make this cancellable.I'd appreciate any hints on how to implement proper cleanup for
Ros
(including the case when the constructor fails), to make it feasible for use cases where the process needs to spawn before, or outlive, roscore.The text was updated successfully, but these errors were encountered: