Skip to content

Commit f96e3ad

Browse files
authored
Merge pull request #31 from alecmocatta/thread-scoped-unwind-catching
Replace global panic hook with abort_on_unwind on constellation threads
2 parents 8c14ddd + e9e621a commit f96e3ad

File tree

9 files changed

+99
-65
lines changed

9 files changed

+99
-65
lines changed

constellation-internal/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ cargo_metadata = { version = "0.8", default-features = false }
2626
either = { version = "1.5", features = ["serde"] }
2727
palaver = "0.2"
2828
rand = { version = "0.7", features = ["small_rng"] }
29+
replace_with = "0.1.4"
2930
serde = { version = "1.0", features = ["derive"] }
3031
serde_bytes = "0.11"
3132
serde_json = "1.0"

constellation-internal/src/lib.rs

+41-6
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ mod format;
2626
pub mod msg;
2727

2828
#[cfg(unix)]
29-
use nix::{fcntl, sys::signal, unistd};
29+
use nix::{fcntl, libc, sys::signal, unistd};
3030
use palaver::file::{copy, memfd_create};
3131
use serde::{Deserialize, Serialize};
3232
use std::{
33-
convert::TryInto, env, ffi::{CString, OsString}, fmt::{self, Debug, Display}, fs::File, io::{self, Read, Seek}, net, ops, os::unix::{
34-
ffi::OsStringExt, io::{AsRawFd, FromRawFd}
35-
}, sync::{Arc, Mutex}
33+
convert::TryInto, env, ffi::{CString, OsString}, fmt::{self, Debug, Display}, fs::File, io::{self, Read, Seek, Write}, net, ops, os::unix::{
34+
ffi::OsStringExt, io::{AsRawFd, FromRawFd, IntoRawFd}
35+
}, process::abort, sync::{Arc, Mutex}
3636
};
3737

3838
#[cfg(target_family = "unix")]
@@ -552,12 +552,12 @@ pub enum ProcessInputEvent {
552552

553553
#[allow(missing_debug_implementations)]
554554
#[derive(Clone)]
555-
pub struct Trace<W: io::Write> {
555+
pub struct Trace<W: Write> {
556556
stdout: Arc<Mutex<W>>,
557557
format: Format,
558558
verbose: bool,
559559
}
560-
impl<W: io::Write> Trace<W> {
560+
impl<W: Write> Trace<W> {
561561
pub fn new(stdout: W, format: Format, verbose: bool) -> Self {
562562
Self {
563563
stdout: Arc::new(Mutex::new(stdout)),
@@ -638,6 +638,41 @@ where
638638

639639
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
640640

641+
struct StdErr;
642+
impl Write for StdErr {
643+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
644+
let mut file = unsafe { File::from_raw_fd(libc::STDERR_FILENO) };
645+
let ret = file.write(buf);
646+
let _ = file.into_raw_fd();
647+
ret
648+
}
649+
fn flush(&mut self) -> io::Result<()> {
650+
Ok(())
651+
}
652+
}
653+
654+
#[inline]
655+
fn abort_on_unwind_<F: FnOnce() -> T, T>(f: F) -> T {
656+
replace_with::on_unwind(f, || {
657+
let _ = StdErr.write_all(b"Constellation: detected unexpected panic; aborting\n");
658+
abort();
659+
})
660+
}
661+
662+
#[must_use]
663+
#[inline]
664+
pub fn abort_on_unwind<F: FnOnce() -> T, T>(f: F) -> impl FnOnce() -> T {
665+
|| abort_on_unwind_(f)
666+
}
667+
668+
#[must_use]
669+
#[inline]
670+
pub fn abort_on_unwind_1<F: FnOnce(&A) -> T, T, A>(f: F) -> impl FnOnce(&A) -> T {
671+
|a| abort_on_unwind_(|| f(a))
672+
}
673+
674+
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
675+
641676
pub mod cargo_metadata {
642677
use cargo_metadata::Target;
643678
use serde::Deserialize;

src/bin/constellation/bridge.rs

+16-10
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use std::{
3636

3737
use constellation::FutureExt1;
3838
use constellation_internal::{
39-
forbid_alloc, map_bincode_err, msg::{bincode_deserialize_from, bincode_serialize_into, BridgeRequest, FabricRequest}, BufferedStream, DeployInputEvent, DeployOutputEvent, ExitStatus, Fd, Pid, ProcessInputEvent, ProcessOutputEvent, Resources
39+
abort_on_unwind, abort_on_unwind_1, forbid_alloc, map_bincode_err, msg::{bincode_deserialize_from, bincode_serialize_into, BridgeRequest, FabricRequest}, BufferedStream, DeployInputEvent, DeployOutputEvent, ExitStatus, Fd, Pid, ProcessInputEvent, ProcessOutputEvent, Resources
4040
};
4141

4242
const SCHEDULER_FD: Fd = 4;
@@ -87,7 +87,9 @@ fn monitor_process(
8787
let sender_ = sender_.clone();
8888
let _ = thread::Builder::new()
8989
.name(String::from("d"))
90-
.spawn(move || monitor_process(new_pid, sender_, receiver1))
90+
.spawn(abort_on_unwind(move || {
91+
monitor_process(new_pid, sender_, receiver1)
92+
}))
9193
.unwrap();
9294
}
9395
ProcessOutputEvent::Output(fd, output) => {
@@ -207,10 +209,10 @@ fn recce(
207209
nix::unistd::close(writer).unwrap();
208210
// let _ = thread::Builder::new()
209211
// .name(String::from(""))
210-
// .spawn(move || {
212+
// .spawn(abort_on_unwind(move || {
211213
// thread::sleep(RECCE_TIMEOUT);
212214
// let _ = nix::sys::signal::kill(child, nix::sys::signal::Signal::SIGKILL);
213-
// })
215+
// }))
214216
// .unwrap();
215217
// TODO: do this without waitpid/kill race
216218
loop {
@@ -307,12 +309,14 @@ fn manage_connection(
307309
let (sender1, receiver1) = futures::channel::mpsc::channel(0);
308310
let _ = thread::Builder::new()
309311
.name(String::from("c"))
310-
.spawn(move || monitor_process(pid, sender, receiver1))
312+
.spawn(abort_on_unwind(move || {
313+
monitor_process(pid, sender, receiver1)
314+
}))
311315
.unwrap();
312316
let hashmap = &Mutex::new(HashMap::new());
313317
let _ = hashmap.lock().unwrap().insert(pid, sender1);
314318
crossbeam::scope(|scope| {
315-
let _ = scope.spawn(move |_scope| {
319+
let _ = scope.spawn(abort_on_unwind_1(move |_scope| {
316320
loop {
317321
let event: Result<DeployInputEvent, _> =
318322
bincode::deserialize_from(&mut stream_read).map_err(map_bincode_err);
@@ -340,7 +344,7 @@ fn manage_connection(
340344
for (_, process) in x.iter_mut() {
341345
let _unchecked_error = process.send(InputEventInt::Kill).block();
342346
}
343-
});
347+
}));
344348
for event in receiver.iter() {
345349
let event = match event {
346350
OutputEventInt::Spawn(pid, new_pid, sender) => {
@@ -387,16 +391,18 @@ pub fn main() {
387391
let (sender, receiver) = mpsc::sync_channel(0);
388392
let _ = thread::Builder::new()
389393
.name(String::from("a"))
390-
.spawn(move || {
394+
.spawn(abort_on_unwind(move || {
391395
for stream in listener.incoming() {
392396
trace!("BRIDGE: accepted");
393397
let sender = sender.clone();
394398
let _ = thread::Builder::new()
395399
.name(String::from("b"))
396-
.spawn(|| manage_connection(stream.unwrap(), sender))
400+
.spawn(abort_on_unwind(|| {
401+
manage_connection(stream.unwrap(), sender)
402+
}))
397403
.unwrap();
398404
}
399-
})
405+
}))
400406
.unwrap();
401407

402408
for (request, sender) in receiver {

src/bin/constellation/main.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ use std::{
9090
};
9191

9292
use constellation_internal::{
93-
forbid_alloc, map_bincode_err, msg::{bincode_deserialize_from, FabricRequest}, BufferedStream, FabricOutputEvent, Fd, Format, Pid, PidInternal, Trace
93+
abort_on_unwind, abort_on_unwind_1, forbid_alloc, map_bincode_err, msg::{bincode_deserialize_from, FabricRequest}, BufferedStream, FabricOutputEvent, Fd, Format, Pid, PidInternal, Trace
9494
};
9595

9696
#[derive(PartialEq, Debug)]
@@ -151,7 +151,7 @@ fn main() {
151151
.set_port(fabric.local_addr().unwrap().port());
152152
let _ = thread::Builder::new()
153153
.name(String::from("master"))
154-
.spawn(move || {
154+
.spawn(abort_on_unwind(move || {
155155
master::run(
156156
SocketAddr::new(listen.ip(), master_addr.port()),
157157
Pid::new(master_addr.ip(), master_addr.port()),
@@ -167,7 +167,7 @@ fn main() {
167167
)
168168
.collect::<HashMap<_, _>>(),
169169
); // TODO: error on clash
170-
})
170+
}))
171171
.unwrap();
172172
(listen.ip(), fabric)
173173
}
@@ -352,7 +352,7 @@ fn main() {
352352
{
353353
break;
354354
}
355-
let _ = scope.spawn(move |_scope| {
355+
let _ = scope.spawn(abort_on_unwind_1(move |_scope| {
356356
loop {
357357
match wait::waitpid(child, None) {
358358
Err(nix::Error::Sys(nix::errno::Errno::EINTR)) => (),
@@ -380,7 +380,7 @@ fn main() {
380380
&Either::Right::<Pid, Pid>(process_id),
381381
)
382382
.map_err(map_bincode_err);
383-
});
383+
}));
384384
}
385385
for (&_job, &pid) in pending.read().unwrap().iter() {
386386
// TODO: this is racey

src/bin/constellation/master.rs

+11-11
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::{
55
};
66

77
use constellation_internal::{
8-
map_bincode_err, msg::{bincode_deserialize_from, FabricRequest}, BufferedStream, Pid, Resources
8+
abort_on_unwind, abort_on_unwind_1, map_bincode_err, msg::{bincode_deserialize_from, FabricRequest}, BufferedStream, Pid, Resources
99
};
1010

1111
#[derive(Debug)]
@@ -60,19 +60,19 @@ pub fn run(
6060
let stream = TcpStream::connect(&fabric).unwrap();
6161
let sender1 = sender.clone();
6262
let _ = thread::Builder::new()
63-
.spawn(move || {
63+
.spawn(abort_on_unwind(move || {
6464
let (receiver, sender) = (receiver_a, sender1);
6565
let (mut stream_read, mut stream_write) =
6666
(BufferedStream::new(&stream), BufferedStream::new(&stream));
6767
bincode::serialize_into::<_, IpAddr>(&mut stream_write, &fabric.ip()).unwrap();
6868
let _ip = bincode::deserialize_from::<_, IpAddr>(&mut stream_read).unwrap();
6969
crossbeam::scope(|scope| {
70-
let _ = scope.spawn(|_spawn| {
70+
let _ = scope.spawn(abort_on_unwind_1(|_spawn| {
7171
for request in receiver {
7272
bincode::serialize_into(&mut stream_write.write(), &request)
7373
.unwrap();
7474
}
75-
});
75+
}));
7676
while let Ok(done) =
7777
bincode::deserialize_from::<_, Either<Pid, Pid>>(&mut stream_read)
7878
.map_err(map_bincode_err)
@@ -81,11 +81,11 @@ pub fn run(
8181
}
8282
})
8383
.unwrap();
84-
})
84+
}))
8585
.unwrap();
8686
let sender = sender.clone();
8787
let _ = thread::Builder::new()
88-
.spawn(move || {
88+
.spawn(abort_on_unwind(move || {
8989
#[cfg(feature = "distribute_binaries")]
9090
let binary = {
9191
let mut binary = Vec::new();
@@ -115,21 +115,21 @@ pub fn run(
115115
.unwrap();
116116
let _pid: Pid = receiver.recv().unwrap().unwrap();
117117
// println!("bridge at {:?}", pid);
118-
})
118+
}))
119119
.unwrap();
120120
(sender_a, node, fabric.ip(), VecDeque::new())
121121
})
122122
.collect::<Vec<_>>();
123123

124124
let listener = TcpListener::bind(bind_addr).unwrap();
125125
let _ = thread::Builder::new()
126-
.spawn(move || {
126+
.spawn(abort_on_unwind(move || {
127127
for stream in listener.incoming() {
128128
// println!("accepted");
129129
let stream = stream.unwrap();
130130
let sender = sender.clone();
131131
let _ = thread::Builder::new()
132-
.spawn(move || {
132+
.spawn(abort_on_unwind(move || {
133133
let (mut stream_read, mut stream_write) =
134134
(BufferedStream::new(&stream), &stream);
135135
while let Ok(request) =
@@ -144,10 +144,10 @@ pub fn run(
144144
break;
145145
}
146146
}
147-
})
147+
}))
148148
.unwrap();
149149
}
150-
})
150+
}))
151151
.unwrap();
152152

153153
let mut processes: HashMap<(usize, Pid), Resources> = HashMap::new();

src/bin/deploy.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use std::{
3535
};
3636

3737
use constellation_internal::{
38-
map_bincode_err, msg::{bincode_serialize_into, BridgeRequest}, BufferedStream, DeployInputEvent, DeployOutputEvent, Envs, ExitStatus, Format, Formatter, Pid, StyleSupport
38+
abort_on_unwind_1, map_bincode_err, msg::{bincode_serialize_into, BridgeRequest}, BufferedStream, DeployInputEvent, DeployOutputEvent, Envs, ExitStatus, Format, Formatter, Pid, StyleSupport
3939
};
4040

4141
const USAGE: &str = "Run a binary on a constellation cluster.
@@ -112,7 +112,7 @@ fn main() {
112112
panic!("Deploy failed due to not being able to allocate process to any of the nodes or constellation::init() not being called immediately inside main()")
113113
}); // TODO get resources from bridge
114114
crossbeam::scope(|scope| {
115-
let _ = scope.spawn(|_scope| {
115+
let _ = scope.spawn(abort_on_unwind_1(|_scope| {
116116
let mut stdin = io::stdin();
117117
loop {
118118
let mut buf = MaybeUninit::<[u8; 1024]>::uninit();
@@ -130,7 +130,7 @@ fn main() {
130130
break;
131131
}
132132
}
133-
});
133+
}));
134134
let mut exit_code = ExitStatus::Success;
135135
let mut ref_count = 1;
136136
let mut pids = HashSet::new();

src/channel/mod.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use std::{
1111
};
1212
use tcp_typed::{Connection, Listener};
1313

14+
use constellation_internal::abort_on_unwind;
15+
1416
use super::Fd;
1517

1618
pub use self::{inner::*, inner_states::*};
@@ -114,7 +116,7 @@ impl Reactor {
114116
let mut triggeree = Some(triggeree);
115117
let tcp_thread = thread::Builder::new()
116118
.name(String::from("tcp-thread"))
117-
.spawn(move || {
119+
.spawn(abort_on_unwind(move || {
118120
let context = context();
119121
let context = context.borrow();
120122
let mut listener = context.listener.try_write().unwrap();
@@ -418,7 +420,7 @@ impl Reactor {
418420
});
419421
}
420422
// trace!("/close"); // called after rust runtime exited, not sure what trace does
421-
})
423+
}))
422424
.unwrap();
423425
Handle {
424426
triggerer: Some(triggerer),

0 commit comments

Comments
 (0)