diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..dd883ce --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +/* +/*/ +!/constellation-internal/ +!/examples/ +!/src/ +!/tests/ +!/Cargo.toml +!/build.rs diff --git a/.mergify.yml b/.mergify.yml index b88ad04..04f2049 100644 --- a/.mergify.yml +++ b/.mergify.yml @@ -3,6 +3,7 @@ pull_request_rules: conditions: - base=master - status-success=tests + - status-success=ci/dockercloud - label!=work-in-progress - "#approved-reviews-by>=1" - "#review-requested=0" @@ -17,6 +18,7 @@ pull_request_rules: conditions: - base=master - status-success=tests + - status-success=ci/dockercloud - label!=work-in-progress - author=alecmocatta # https://github.com/Mergifyio/mergify-engine/issues/451 - "#review-requested=0" diff --git a/Cargo.toml b/Cargo.toml index 2245faf..45cb4dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ [package] name = "constellation-rs" -version = "0.1.5" +version = "0.1.6" license = "Apache-2.0" authors = ["Alec Mocatta "] categories = ["development-tools","network-programming","concurrency","asynchronous","command-line-utilities"] @@ -12,14 +12,11 @@ Constellation is a framework for Rust (nightly) that aides in the writing, debug """ repository = "https://github.com/alecmocatta/constellation" homepage = "https://github.com/alecmocatta/constellation" -documentation = "https://docs.rs/constellation-rs/0.1.5" +documentation = "https://docs.rs/constellation-rs/0.1.6" readme = "README.md" edition = "2018" autotests = true -[lib] -name = "constellation" - [badges] azure-devops = { project = "alecmocatta/constellation", pipeline = "tests" } maintenance = { status = "actively-developed" } @@ -30,9 +27,10 @@ nightly = ["palaver/nightly", "relative/nightly"] distribute_binaries = ["constellation-internal/distribute_binaries"] fringe = ["serde_pipe/fringe"] no_alloc = ["constellation-internal/no_alloc"] +kubernetes = ["distribute_binaries", "kube", "openssl"] [dependencies] -constellation-internal = { path = "constellation-internal", version = "=0.1.5" } +constellation-internal = { path = "constellation-internal", version = "=0.1.6" } atty = "0.2" backtrace = "0.3" bincode = "1.0" @@ -40,6 +38,7 @@ crossbeam = "0.7" docopt = "1.0" either = "1.5" futures-preview = "0.3.0-alpha.18" +kube = { version = "0.16", features = ["openapi"], optional = true } log = "0.4" notifier = { version = "0.1", features = ["tcp_typed"] } once_cell = "1.0" @@ -54,6 +53,9 @@ serde_pipe = "0.1" tcp_typed = "0.1" toml = "0.5" +# dependency of kube; ensure it's vendored to simplify cross-compilation +openssl = { version = "0.10", features = ["vendored"], optional = true } + [target.'cfg(unix)'.dependencies] nix = "0.15" @@ -73,6 +75,11 @@ systemstat = "0.1" [patch.crates-io] systemstat = { git = "https://github.com/alecmocatta/systemstat" } +### + +[lib] +name = "constellation" + # Hopefully we won't need to exhaustively list in future: # https://github.com/rust-lang/cargo/issues/5766 or https://github.com/rust-lang/rust/issues/50297 diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..5617fea --- /dev/null +++ b/Dockerfile @@ -0,0 +1,29 @@ +FROM rustlang/rust:nightly as build +WORKDIR /usr/src + +# Install musl-gcc +RUN apt-get update && apt-get install -y --no-install-recommends musl-tools + +# Download the target for static linking. +RUN rustup target add x86_64-unknown-linux-musl + +# Create a dummy project and build the app's dependencies. +# If the Cargo.toml and Cargo.lock files have not changed, +# we can use the docker build cache and skip this slow step. +RUN USER=root cargo init && USER=root cargo new --lib constellation-internal +COPY Cargo.toml ./ +RUN sed -i '/^###$/q' Cargo.toml +COPY constellation-internal/Cargo.toml ./constellation-internal/ +RUN cargo generate-lockfile +RUN cargo build --bins --features kubernetes --target x86_64-unknown-linux-musl --release + +# Copy the source and build the application. +COPY . ./ +RUN touch ./constellation-internal/src/lib.rs +RUN cargo build --locked --frozen --offline --bin constellation --features kubernetes --target x86_64-unknown-linux-musl --release + +# Copy the statically-linked binary into a scratch container. +FROM scratch +COPY --from=build /usr/src/target/x86_64-unknown-linux-musl/release/constellation . +USER 1000 +ENTRYPOINT ["./constellation"] diff --git a/README.md b/README.md index e91a44d..57e859c 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@

- Docs + Docs

Constellation is a framework for Rust (nightly) that aides in the writing, debugging and deployment of distributed programs. It draws heavily from [Erlang/OTP](https://en.wikipedia.org/wiki/Erlang_(programming_language)), [MPI](https://en.wikipedia.org/wiki/Message_Passing_Interface), and [CSP](https://en.wikipedia.org/wiki/Communicating_sequential_processes); and leverages the Rust ecosystem where it can including [serde](https://serde.rs/) + [bincode](https://github.com/servo/bincode) for network serialization, and [mio](https://github.com/tokio-rs/mio) and [futures-rs](https://github.com/rust-lang-nursery/futures-rs) for asynchronous channels over TCP. @@ -27,13 +27,13 @@ For leveraging Constellation directly, read on. ## Constellation framework -* Constellation is a framework that's initialised with a call to [`init()`](https://docs.rs/constellation-rs/0.1.5/constellation/fn.init.html) at the beginning of your program. -* You can [`spawn(closure)`](https://docs.rs/constellation-rs/0.1.5/constellation/fn.spawn.html) new processes, which run `closure`. +* Constellation is a framework that's initialised with a call to [`init()`](https://docs.rs/constellation-rs/0.1.6/constellation/fn.init.html) at the beginning of your program. +* You can [`spawn(closure)`](https://docs.rs/constellation-rs/0.1.6/constellation/fn.spawn.html) new processes, which run `closure`. * `spawn(closure)` returns the Pid of the new process. -* You can communicate between processes by creating channels with [`Sender::new(remote_pid)`](https://docs.rs/constellation-rs/0.1.5/constellation/struct.Sender.html#method.new) and [`Receiver::new(remote_pid)`](https://docs.rs/constellation-rs/0.1.5/constellation/struct.Receiver.html#method.new). -* Channels can be used asynchronously with [`sender.send(value).await`](https://docs.rs/constellation-rs/0.1.5/constellation/struct.Sender.html#method.send) and [`receiver.recv().await`](https://docs.rs/constellation-rs/0.1.5/constellation/struct.Receiver.html#method.recv). +* You can communicate between processes by creating channels with [`Sender::new(remote_pid)`](https://docs.rs/constellation-rs/0.1.6/constellation/struct.Sender.html#method.new) and [`Receiver::new(remote_pid)`](https://docs.rs/constellation-rs/0.1.6/constellation/struct.Receiver.html#method.new). +* Channels can be used asynchronously with [`sender.send(value).await`](https://docs.rs/constellation-rs/0.1.6/constellation/struct.Sender.html#method.send) and [`receiver.recv().await`](https://docs.rs/constellation-rs/0.1.6/constellation/struct.Receiver.html#method.recv). * [futures-rs](https://github.com/rust-lang-nursery/futures-rs) provides useful functions and adapters including `select()` and `join()` for working with channels. -* You can also block on channels with the [`.block()`](https://docs.rs/constellation-rs/0.1.5/constellation/trait.FutureExt1.html#method.block) convenience method: `sender.send().block()` and `receiver.recv().block()`. +* You can also block on channels with the [`.block()`](https://docs.rs/constellation-rs/0.1.6/constellation/trait.FutureExt1.html#method.block) convenience method: `sender.send().block()` and `receiver.recv().block()`. * For more information on asynchronous programming in Rust check out the [Async Book](https://rust-lang.github.io/async-book/index.html)! Here's a simple example recursively spawning processes to distribute the task of finding Fibonacci numbers: @@ -248,7 +248,7 @@ Please file an issue if you experience any other requirements. ## API -[see Rust doc](https://docs.rs/constellation-rs/0.1.5) +[see Rust doc](https://docs.rs/constellation-rs/0.1.6) ## Testing diff --git a/azure-pipelines.yml b/azure-pipelines.yml index a1ece06..ded804a 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -14,7 +14,7 @@ jobs: endpoint: alecmocatta default: rust_toolchain: nightly - rust_lint_toolchain: nightly-2019-08-22 + rust_lint_toolchain: nightly-2019-09-13 rust_flags: '' rust_features: 'no_alloc;no_alloc distribute_binaries' rust_target_check: '' diff --git a/constellation-internal/Cargo.toml b/constellation-internal/Cargo.toml index 21fca1d..b28a76a 100644 --- a/constellation-internal/Cargo.toml +++ b/constellation-internal/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "constellation-internal" -version = "0.1.5" +version = "0.1.6" license = "Apache-2.0" authors = ["Alec Mocatta "] categories = ["development-tools","network-programming","concurrency","asynchronous"] @@ -10,7 +10,7 @@ Common components for the `constellation` framework. """ repository = "https://github.com/alecmocatta/constellation" homepage = "https://github.com/alecmocatta/constellation" -documentation = "https://docs.rs/constellation-internal/0.1.5" +documentation = "https://docs.rs/constellation-internal/0.1.6" edition = "2018" [features] diff --git a/constellation-internal/src/format.rs b/constellation-internal/src/format.rs index 2006026..b1dfad3 100644 --- a/constellation-internal/src/format.rs +++ b/constellation-internal/src/format.rs @@ -82,6 +82,7 @@ impl Formatter { } } + #[allow(clippy::too_many_lines)] pub fn write(&mut self, event: &DeployOutputEvent) { match *event { DeployOutputEvent::Spawn(pid_, new_pid) => { diff --git a/constellation-internal/src/lib.rs b/constellation-internal/src/lib.rs index 729071d..2e6f8af 100644 --- a/constellation-internal/src/lib.rs +++ b/constellation-internal/src/lib.rs @@ -1,4 +1,4 @@ -#![doc(html_root_url = "https://docs.rs/constellation-internal/0.1.5")] +#![doc(html_root_url = "https://docs.rs/constellation-internal/0.1.6")] #![warn( // missing_copy_implementations, missing_debug_implementations, @@ -30,7 +30,7 @@ use nix::{fcntl, libc, sys::signal, unistd}; use palaver::file::{copy, memfd_create}; use serde::{Deserialize, Serialize}; use std::{ - convert::TryInto, env, ffi::{CString, OsString}, fmt::{self, Debug, Display}, fs::File, io::{self, Read, Seek, Write}, net, ops, os::unix::{ + convert::{TryFrom, TryInto}, env, error::Error, ffi::{CString, OsString}, fmt::{self, Debug, Display}, fs::File, io::{self, Read, Seek, Write}, net, ops, os::unix::{ ffi::OsStringExt, io::{AsRawFd, FromRawFd, IntoRawFd} }, process::abort, sync::{Arc, Mutex} }; @@ -278,8 +278,8 @@ pub enum Format { /// ``` /// # use constellation_internal::Resources; /// pub const RESOURCES_DEFAULT: Resources = Resources { -/// mem: 1024 * 1024 * 1024, // 1 GiB -/// cpu: 65536 / 16, // 1/16th of a logical CPU core +/// mem: 100 * 1024 * 1024, // 100 MiB +/// cpu: 65536 / 16, // 1/16th of a logical CPU core /// }; /// ``` #[derive(Copy, Clone, PartialEq, Serialize, Deserialize, Debug)] @@ -299,15 +299,102 @@ impl Default for Resources { /// ``` /// # use constellation_internal::Resources; /// pub const RESOURCES_DEFAULT: Resources = Resources { -/// mem: 1024 * 1024 * 1024, // 1 GiB -/// cpu: 65536 / 16, // 1/16th of a logical CPU core +/// mem: 100 * 1024 * 1024, // 100 MiB +/// cpu: 65536 / 16, // 1/16th of a logical CPU core /// }; /// ``` pub const RESOURCES_DEFAULT: Resources = Resources { - mem: 1024 * 1024 * 1024, // 1 GiB - cpu: 65536 / 16, // 1/16th of a logical CPU core + mem: 100 * 1024 * 1024, // 100 MiB + cpu: 65536 / 16, // 1/16th of a logical CPU core }; +/// An error returned by the [`try_spawn()`](try_spawn) method detailing the reason if known. +#[allow(missing_copy_implementations)] +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum TrySpawnError { + /// [`try_spawn()`](try_spawn) failed because the new process couldn't be allocated. + NoCapacity, + /// [`try_spawn()`](try_spawn) failed because `constellation::init()` is not called immediately inside main(). + Recce, + /// [`try_spawn()`](try_spawn) failed for unknown reasons. + Unknown, + #[doc(hidden)] + __Nonexhaustive, // https://github.com/rust-lang/rust/issues/44109 +} + +/// An error returned by the [`spawn()`](spawn) method detailing the reason if known. +#[allow(missing_copy_implementations)] +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum SpawnError { + /// [`spawn()`](spawn) failed because `constellation::init()` is not called immediately inside main(). + Recce, + /// [`spawn()`](spawn) failed for unknown reasons. + Unknown, + #[doc(hidden)] + __Nonexhaustive, +} +impl From for TrySpawnError { + fn from(error: SpawnError) -> Self { + match error { + SpawnError::Recce => Self::Recce, + SpawnError::Unknown => Self::Unknown, + SpawnError::__Nonexhaustive => unreachable!(), + } + } +} +impl TryFrom for SpawnError { + type Error = (); + + fn try_from(error: TrySpawnError) -> Result { + match error { + TrySpawnError::NoCapacity => Err(()), + TrySpawnError::Recce => Ok(Self::Recce), + TrySpawnError::Unknown => Ok(Self::Unknown), + TrySpawnError::__Nonexhaustive => unreachable!(), + } + } +} +impl Display for TrySpawnError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::NoCapacity => write!( + f, + "try_spawn() failed because the new process couldn't be allocated" + ), + Self::Recce => write!( + f, + "try_spawn() because constellation::init() is not called immediately inside main()" + ), + Self::Unknown => write!(f, "try_spawn() failed for unknown reasons"), + Self::__Nonexhaustive => unreachable!(), + } + } +} +impl Debug for TrySpawnError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Display::fmt(self, f) + } +} +impl Display for SpawnError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Recce => write!( + f, + "spawn() because constellation::init() is not called immediately inside main()" + ), + Self::Unknown => write!(f, "spawn() failed for unknown reasons"), + Self::__Nonexhaustive => unreachable!(), + } + } +} +impl Debug for SpawnError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Display::fmt(self, f) + } +} +impl Error for TrySpawnError {} +impl Error for SpawnError {} + #[derive(Clone, Serialize, Deserialize, Debug)] #[serde(/*tag = "event", */rename_all = "lowercase")] pub enum FabricOutputEvent { diff --git a/constellation-internal/src/msg.rs b/constellation-internal/src/msg.rs index 0009412..31f8de0 100644 --- a/constellation-internal/src/msg.rs +++ b/constellation-internal/src/msg.rs @@ -9,6 +9,8 @@ where A: FileOrVec, B: FileOrVec, { + /// Whether to wait for space to allocate the process, or just bail. + pub block: bool, /// The resources required for this process. pub resources: Resources, /// The socket addresses required to bind to for this process. @@ -148,7 +150,8 @@ mod serde { where S: Serializer, { - let mut state = serializer.serialize_tuple(6)?; + let mut state = serializer.serialize_tuple(7)?; + state.serialize_element(&self.block)?; state.serialize_element(&self.resources)?; state.serialize_element(&self.bind)?; state.serialize_element(&self.args)?; @@ -193,7 +196,8 @@ mod serde { where S: Serializer, { - let mut state = serializer.serialize_tuple(6)?; + let mut state = serializer.serialize_tuple(7)?; + state.serialize_element(&self.value.block)?; state.serialize_element(&self.value.resources)?; state.serialize_element(&self.value.bind)?; state.serialize_element(&self.value.args)?; @@ -274,7 +278,7 @@ mod serde { // where // D: Deserializer<'de>, // { - // deserializer.deserialize_tuple(6, FabricRequestVisitor) + // deserializer.deserialize_tuple(7, FabricRequestVisitor) // } // } // struct FabricRequestVisitor; @@ -288,27 +292,31 @@ mod serde { // where // V: SeqAccess<'de>, // { - // let resources = seq + // let block = seq // .next_element()? // .ok_or_else(|| de::Error::invalid_length(0, &self))?; - // let bind = seq + // let resources = seq // .next_element()? // .ok_or_else(|| de::Error::invalid_length(1, &self))?; - // let args: Vec = seq + // let bind = seq // .next_element()? // .ok_or_else(|| de::Error::invalid_length(2, &self))?; - // let vars = seq + // let args: Vec = seq // .next_element()? // .ok_or_else(|| de::Error::invalid_length(3, &self))?; + // let vars = seq + // .next_element()? + // .ok_or_else(|| de::Error::invalid_length(4, &self))?; // let arg = seq // .next_element::()? - // .ok_or_else(|| de::Error::invalid_length(4, &self))? + // .ok_or_else(|| de::Error::invalid_length(5, &self))? // .into_vec(); // let binary = seq // .next_element::()? - // .ok_or_else(|| de::Error::invalid_length(5, &self))? + // .ok_or_else(|| de::Error::invalid_length(6, &self))? // .into_vec(); // Ok(FabricRequest { + // block, // resources, // bind, // args, @@ -390,7 +398,7 @@ mod serde { { READER.with(|reader| { deserializer.deserialize_tuple( - 6, + 7, FabricRequestSeed::new(unsafe { &mut **reader.borrow_mut().as_mut().unwrap() }), ) }) @@ -411,18 +419,21 @@ mod serde { where V: SeqAccess<'de>, { - let resources = seq + let block = seq .next_element()? .ok_or_else(|| de::Error::invalid_length(0, &self))?; - let bind = seq + let resources = seq .next_element()? .ok_or_else(|| de::Error::invalid_length(1, &self))?; - let args: Vec = seq + let bind = seq .next_element()? .ok_or_else(|| de::Error::invalid_length(2, &self))?; - let vars = seq + let args: Vec = seq .next_element()? .ok_or_else(|| de::Error::invalid_length(3, &self))?; + let vars = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(4, &self))?; let arg = A::next_element_seed( &mut seq, FileSeed { @@ -432,7 +443,7 @@ mod serde { seal: false, }, )? - .ok_or_else(|| de::Error::invalid_length(4, &self))?; + .ok_or_else(|| de::Error::invalid_length(5, &self))?; #[cfg(feature = "distribute_binaries")] let binary = B::next_element_seed( &mut seq, @@ -443,11 +454,12 @@ mod serde { seal: true, }, )? - .ok_or_else(|| de::Error::invalid_length(5, &self))?; + .ok_or_else(|| de::Error::invalid_length(6, &self))?; #[cfg(not(feature = "distribute_binaries"))] let binary = seq.next_element()? - .ok_or_else(|| de::Error::invalid_length(5, &self))?; + .ok_or_else(|| de::Error::invalid_length(6, &self))?; Ok(FabricRequest { + block, resources, bind, args, diff --git a/docker.test.yml b/docker.test.yml new file mode 100644 index 0000000..c58369e --- /dev/null +++ b/docker.test.yml @@ -0,0 +1,3 @@ +sut: + build: . + command: "-V" diff --git a/k8s.yaml b/k8s.yaml new file mode 100644 index 0000000..9b247a0 --- /dev/null +++ b/k8s.yaml @@ -0,0 +1,123 @@ +apiVersion: v1 +kind: Service +metadata: + name: constellation +spec: + selector: + constellation: master + ports: + - name: constellation + protocol: TCP + port: 12321 + targetPort: 12321 + type: LoadBalancer +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: constellation +spec: + replicas: 1 + selector: + matchLabels: &labels + constellation: master + template: + metadata: + labels: *labels + spec: + containers: + - name: constellation + image: constellationrs/constellation:0.1.6 + args: + - kube + - 0.0.0.0:32123 + - 0.0.0.0:12321 + - "1GiB" + - "1" + - "1" + env: + - name: CONSTELLATION_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + ports: + - name: constellation + containerPort: 32123 + - name: constellation-b + containerPort: 12321 + resources: + requests: + memory: "1Gi" + cpu: "1" + serviceAccountName: constellation-service-account + terminationGracePeriodSeconds: 1 +--- +apiVersion: apps/v1 +kind: ReplicaSet +metadata: + name: constellation +spec: + replicas: 0 + selector: + matchLabels: + constellation: node + template: + metadata: + labels: + constellation: node + spec: + containers: + - name: constellation + image: constellationrs/constellation:0.1.6 + args: + - 0.0.0.0:32123 + env: + - name: CONSTELLATION_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + ports: + - name: constellation + containerPort: 32123 + resources: + requests: + memory: "1Gi" + cpu: "1" + serviceAccountName: constellation-service-account + terminationGracePeriodSeconds: 1 +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: constellation-service-account +--- +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: ClusterRole +metadata: + name: constellation-role +rules: + - apiGroups: + - "" + - apps + resources: + - pods + verbs: ["list", "watch"] + - apiGroups: + - "" + - apps + resources: + - replicasets/scale + verbs: ["patch"] +--- +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: ClusterRoleBinding +metadata: + name: constellation-role +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: constellation-role +subjects: + - kind: ServiceAccount + name: constellation-service-account + namespace: default diff --git a/src/bin/constellation/args.rs b/src/bin/constellation/args.rs index 58f3e78..acf9dba 100644 --- a/src/bin/constellation/args.rs +++ b/src/bin/constellation/args.rs @@ -1,3 +1,5 @@ +#![allow(clippy::too_many_lines)] + use serde::Deserialize; use std::{error::Error, fs::File, io::Read, net::SocketAddr}; @@ -155,6 +157,34 @@ impl Args { let format = format.unwrap_or(Format::Human); let role: Role = match (&*args.next().unwrap(), args.peek()) { ("bridge", None) => Role::Bridge, + #[cfg(feature = "kubernetes")] + ("kube", _) => { + if let ( + Some(Ok(master_bind)), + Some(Ok(bridge_bind)), + Some(Ok(mem)), + Some(Ok(cpu)), + Some(Ok(replicas)), + None, + ) = ( + args.next().map(|x| x.parse::()), + args.next().map(|x| x.parse::()), + args.next().map(|x| parse_mem_size(&x)), + args.next().map(|x| parse_cpu_size(&x)), + args.next().map(|x| x.parse::()), + args.next(), + ) { + Role::KubeMaster { + master_bind, + bridge_bind, + mem, + cpu, + replicas, + } + } else { + return Err((format!("Invalid kubernetes master options, expecting , like 127.0.0.1:9999 127.0.0.1:8888 400GiB 34\n{}", USAGE), false)); + } + } (bind, Some(_)) if bind.parse::().is_ok() => { let bind = bind.parse().unwrap(); let mut nodes = Vec::new(); diff --git a/src/bin/constellation/bridge.rs b/src/bin/constellation/bridge.rs index 92b3be3..d0fbdaf 100644 --- a/src/bin/constellation/bridge.rs +++ b/src/bin/constellation/bridge.rs @@ -20,7 +20,8 @@ TODO: can lose processes such that ctrl+c doesn't kill them. i think if we kill #![allow( clippy::similar_names, clippy::type_complexity, - clippy::shadow_unrelated + clippy::shadow_unrelated, + clippy::too_many_lines )] use futures::{future::FutureExt, sink::SinkExt, stream::StreamExt}; @@ -36,7 +37,7 @@ use std::{ use constellation::FutureExt1; use constellation_internal::{ - abort_on_unwind, abort_on_unwind_1, forbid_alloc, map_bincode_err, msg::{bincode_deserialize_from, bincode_serialize_into, BridgeRequest, FabricRequest}, BufferedStream, DeployInputEvent, DeployOutputEvent, ExitStatus, Fd, Pid, ProcessInputEvent, ProcessOutputEvent, Resources + abort_on_unwind, abort_on_unwind_1, forbid_alloc, map_bincode_err, msg::{bincode_deserialize_from, bincode_serialize_into, BridgeRequest, FabricRequest}, BufferedStream, DeployInputEvent, DeployOutputEvent, ExitStatus, Fd, Pid, ProcessInputEvent, ProcessOutputEvent, Resources, TrySpawnError }; const SCHEDULER_FD: Fd = 4; @@ -239,7 +240,10 @@ fn recce( fn manage_connection( stream: TcpStream, - sender: mpsc::SyncSender<(FabricRequest, File>, mpsc::SyncSender>)>, + sender: mpsc::SyncSender<( + FabricRequest, File>, + mpsc::SyncSender>, + )>, ) -> Result<(), ()> { #[cfg(not(any(target_os = "macos", target_os = "ios")))] nix::sys::socket::setsockopt( @@ -275,20 +279,24 @@ fn manage_connection( .map_err(drop)?; assert_eq!(request.arg.len(), 0); bincode::serialize_into(&mut request.arg, &constellation::pid()).unwrap(); - let resources = request.resources.or_else(|| { - recce( - #[cfg(feature = "distribute_binaries")] - &request.binary, - &request.args, - &request.vars, - ) - .ok() - }); - let pid: Option = resources.and_then(|resources| { + let resources = request + .resources + .or_else(|| { + recce( + #[cfg(feature = "distribute_binaries")] + &request.binary, + &request.args, + &request.vars, + ) + .ok() + }) + .ok_or(TrySpawnError::Recce); + let pid: Result = resources.and_then(|resources| { let (sender_, receiver) = mpsc::sync_channel(0); sender .send(( FabricRequest { + block: true, // use cases for false? resources, bind: vec![], args: request.args, @@ -302,7 +310,7 @@ fn manage_connection( receiver.recv().unwrap() }); bincode::serialize_into(&mut stream_write, &pid).map_err(drop)?; - if let Some(pid) = pid { + if let Ok(pid) = pid { let x = PROCESS_COUNT.fetch_add(1, atomic::Ordering::Relaxed); trace!("BRIDGE: SPAWN ({})", x); let (sender, receiver) = mpsc::sync_channel(0); @@ -393,6 +401,9 @@ pub fn main() { .name(String::from("a")) .spawn(abort_on_unwind(move || { for stream in listener.incoming() { + if stream.is_err() { + continue; + } trace!("BRIDGE: accepted"); let sender = sender.clone(); let _ = thread::Builder::new() @@ -414,7 +425,7 @@ pub fn main() { bincode_serialize_into(&mut scheduler_write.write(), &request).unwrap(); - let pid: Option = bincode::deserialize_from(&mut scheduler_read) + let pid: Result = bincode::deserialize_from(&mut scheduler_read) .map_err(map_bincode_err) .unwrap(); sender.send(pid).unwrap(); diff --git a/src/bin/constellation/kube.rs b/src/bin/constellation/kube.rs new file mode 100644 index 0000000..e17ce21 --- /dev/null +++ b/src/bin/constellation/kube.rs @@ -0,0 +1,89 @@ +#![allow(clippy::module_name_repetitions)] + +use ::kube::{ + api::{Api, ListParams, PatchParams}, client::APIClient, config +}; +use serde_json::json; +use std::{ + collections::HashMap, env, fs::read_to_string, net::{IpAddr, SocketAddr}, thread +}; + +use super::master; +use constellation_internal::{abort_on_unwind, Pid, PidInternal}; + +pub fn kube_master( + master_bind: SocketAddr, fabric_port: u16, bridge_bind: SocketAddr, mem: u64, cpu: u32, + replicas: u32, +) { + let namespace = + read_to_string("/var/run/secrets/kubernetes.io/serviceaccount/namespace").unwrap(); + + let config = config::incluster_config().expect("failed to load in-cluster kubeconfig"); + let client = APIClient::new(config); + + let jobs = Api::v1ReplicaSet(client.clone()).within(&namespace); //.group("extensions").version("v1beta1"); + + let replicas = replicas - 1; // one is this master + + let fs = json!({ + "spec": { "replicas": replicas } + }); + let _ = jobs + .patch_scale( + "constellation", + &PatchParams::default(), + serde_json::to_vec(&fs).unwrap(), + ) + .unwrap(); + + let pods = Api::v1Pod(client).within(&namespace); + + let ips = loop { + let pods = pods + .list(&ListParams { + label_selector: Some(format!("{}={}", "constellation", "node")), + ..ListParams::default() + }) + .expect("failed to list pods") + .items; + let ips: Vec = pods + .into_iter() + .filter_map(|pod| Some(pod.status?.pod_ip?.parse().unwrap())) + .collect(); + if ips.len() == replicas as usize { + break ips; + } + std::thread::sleep(std::time::Duration::from_secs(2)); + }; + + let _ = thread::Builder::new() + .name(String::from("master")) + .spawn(abort_on_unwind(move || { + std::thread::sleep(std::time::Duration::from_secs(10)); + + let master_addr = SocketAddr::new( + env::var("CONSTELLATION_IP").unwrap().parse().unwrap(), + master_bind.port(), + ); + + let mut nodes = ips + .into_iter() + .map(|ip| { + let fabric = SocketAddr::new(ip, master_bind.port()); + let bridge = None; + (fabric, (bridge, mem, cpu)) + }) + .collect::>(); // TODO: error on clash + let _ = nodes.insert( + SocketAddr::new(master_addr.ip(), fabric_port), + (Some(bridge_bind), mem, cpu), + ); + + master::run( + SocketAddr::new(master_bind.ip(), master_addr.port()), + Pid::new(master_addr.ip(), master_addr.port()), + nodes, + ) + })) + .unwrap(); +} diff --git a/src/bin/constellation/main.rs b/src/bin/constellation/main.rs index 767ddd9..40033c1 100644 --- a/src/bin/constellation/main.rs +++ b/src/bin/constellation/main.rs @@ -68,11 +68,14 @@ clippy::similar_names, clippy::type_complexity, clippy::non_ascii_literal, - clippy::shadow_unrelated + clippy::shadow_unrelated, + clippy::too_many_lines )] mod args; mod bridge; +#[cfg(feature = "kubernetes")] +mod kube; mod master; use either::Either; @@ -89,6 +92,8 @@ use std::{ ffi::{CStr, CString}, fs::File, os::unix::{ffi::OsStringExt, io::IntoRawFd} }; +#[cfg(feature = "kubernetes")] +use self::kube::kube_master; use constellation_internal::{ abort_on_unwind, abort_on_unwind_1, forbid_alloc, map_bincode_err, msg::{bincode_deserialize_from, FabricRequest}, BufferedStream, FabricOutputEvent, Fd, Format, Pid, PidInternal, Trace }; @@ -101,6 +106,14 @@ struct Args { } #[derive(PartialEq, Debug)] enum Role { + #[cfg(feature = "kubernetes")] + KubeMaster { + master_bind: SocketAddr, + bridge_bind: SocketAddr, + mem: u64, + cpu: u32, + replicas: u32, + }, Master(SocketAddr, Vec), Worker(SocketAddr), Bridge, @@ -143,30 +156,51 @@ fn main() { let stdout = io::stdout(); let trace = &Trace::new(stdout, args.format, args.verbose); let (listen, listener) = match args.role { + #[cfg(feature = "kubernetes")] + Role::KubeMaster { + master_bind, + bridge_bind, + mem, + cpu, + replicas, + } => { + let fabric = TcpListener::bind(SocketAddr::new(master_bind.ip(), 0)).unwrap(); + kube_master( + master_bind, + fabric.local_addr().unwrap().port(), + bridge_bind, + mem, + cpu, + replicas, + ); + (master_bind.ip(), fabric) + } Role::Master(listen, mut nodes) => { let fabric = TcpListener::bind(SocketAddr::new(listen.ip(), 0)).unwrap(); let master_addr = nodes[0].fabric; nodes[0] .fabric .set_port(fabric.local_addr().unwrap().port()); + let nodes = nodes + .into_iter() + .map( + |Node { + fabric, + bridge, + mem, + cpu, + }| { (fabric, (bridge, mem, cpu)) }, + ) + .collect::>(); // TODO: error on clash let _ = thread::Builder::new() .name(String::from("master")) .spawn(abort_on_unwind(move || { + std::thread::sleep(std::time::Duration::from_secs(1)); master::run( SocketAddr::new(listen.ip(), master_addr.port()), Pid::new(master_addr.ip(), master_addr.port()), - nodes - .into_iter() - .map( - |Node { - fabric, - bridge, - mem, - cpu, - }| { (fabric, (bridge, mem, cpu)) }, - ) - .collect::>(), - ); // TODO: error on clash + nodes, + ) })) .unwrap(); (listen.ip(), fabric) @@ -176,15 +210,25 @@ fn main() { }; loop { - let (stream, addr) = listener.accept().unwrap(); - println!("accepted"); + let accepted = listener.accept(); + if accepted.is_err() { + continue; + } + let (stream, addr) = accepted.unwrap(); let mut pending_inner = HashMap::new(); let pending = &sync::RwLock::new(&mut pending_inner); let (mut stream_read, stream_write) = (BufferedStream::new(&stream), &sync::Mutex::new(&stream)); - bincode::serialize_into::<_, IpAddr>(&mut *stream_write.lock().unwrap(), &addr.ip()) - .unwrap(); - let ip = bincode::deserialize_from::<_, IpAddr>(&mut stream_read).unwrap(); + if bincode::serialize_into::<_, IpAddr>(&mut *stream_write.try_lock().unwrap(), &addr.ip()) + .is_err() + { + continue; + } + let ip = bincode::deserialize_from::<_, IpAddr>(&mut stream_read); + if ip.is_err() { + continue; + } + let ip = ip.unwrap(); crossbeam::scope(|scope| { while let Ok(request) = bincode_deserialize_from(&mut stream_read).map_err(map_bincode_err) diff --git a/src/bin/constellation/master.rs b/src/bin/constellation/master.rs index d1c4aed..8d0a791 100644 --- a/src/bin/constellation/master.rs +++ b/src/bin/constellation/master.rs @@ -1,3 +1,5 @@ +#![allow(clippy::too_many_lines)] + use either::Either; use serde::Serialize; use std::{ @@ -5,7 +7,7 @@ use std::{ }; use constellation_internal::{ - abort_on_unwind, abort_on_unwind_1, map_bincode_err, msg::{bincode_deserialize_from, FabricRequest}, BufferedStream, Pid, Resources + abort_on_unwind, abort_on_unwind_1, map_bincode_err, msg::{bincode_deserialize_from, FabricRequest}, BufferedStream, Pid, Resources, TrySpawnError }; #[derive(Debug)] @@ -44,7 +46,7 @@ pub fn run( Either< ( FabricRequest, Vec>, - SyncSender>, + SyncSender>, Option, ), (usize, Either), @@ -57,7 +59,8 @@ pub fn run( .map(|(i, (fabric, (bridge, mem, cpu)))| { let node = Node { mem, cpu }; let (sender_a, receiver_a) = sync_channel::, Vec>>(0); - let stream = TcpStream::connect(&fabric).unwrap(); + let stream = TcpStream::connect(&fabric) + .unwrap_or_else(|e| panic!("couldn't connect to node {}: {:?}: {}", i, fabric, e)); let sender1 = sender.clone(); let _ = thread::Builder::new() .spawn(abort_on_unwind(move || { @@ -83,40 +86,43 @@ pub fn run( .unwrap(); })) .unwrap(); - let sender = sender.clone(); - let _ = thread::Builder::new() - .spawn(abort_on_unwind(move || { - #[cfg(feature = "distribute_binaries")] - let binary = { - let mut binary = Vec::new(); - let mut file_in = palaver::env::exe().unwrap(); - let _ = std::io::Read::read_to_end(&mut file_in, &mut binary).unwrap(); - binary - }; - #[cfg(not(feature = "distribute_binaries"))] - let binary = std::marker::PhantomData; - let (sender_, receiver) = sync_channel::>(0); - sender - .send(Either::Left(( - FabricRequest { - resources: Resources { mem: 0, cpu: 0 }, - bind: bridge.into_iter().collect(), - args: vec![ - OsString::from(env::current_exe().unwrap()), - OsString::from("bridge"), - ], - vars: Vec::new(), - binary, - arg: Vec::new(), - }, - sender_, - Some(i), - ))) - .unwrap(); - let _pid: Pid = receiver.recv().unwrap().unwrap(); - // println!("bridge at {:?}", pid); - })) - .unwrap(); + if let Some(bridge) = bridge { + let sender = sender.clone(); + let _ = thread::Builder::new() + .spawn(abort_on_unwind(move || { + #[cfg(feature = "distribute_binaries")] + let binary = { + let mut binary = Vec::new(); + let mut file_in = palaver::env::exe().unwrap(); + let _ = std::io::Read::read_to_end(&mut file_in, &mut binary).unwrap(); + binary + }; + #[cfg(not(feature = "distribute_binaries"))] + let binary = std::marker::PhantomData; + let (sender_, receiver) = sync_channel::>(0); + sender + .send(Either::Left(( + FabricRequest { + block: false, + resources: Resources { mem: 0, cpu: 0 }, + bind: vec![bridge], + args: vec![ + OsString::from(env::current_exe().unwrap()), + OsString::from("bridge"), + ], + vars: Vec::new(), + binary, + arg: Vec::new(), + }, + sender_, + Some(i), + ))) + .unwrap(); + let _pid: Pid = receiver.recv().unwrap().unwrap(); + // println!("bridge at {:?}", pid); + })) + .unwrap(); + } (sender_a, node, fabric.ip(), VecDeque::new()) }) .collect::>(); @@ -126,6 +132,9 @@ pub fn run( .spawn(abort_on_unwind(move || { for stream in listener.incoming() { // println!("accepted"); + if stream.is_err() { + continue; + } let stream = stream.unwrap(); let sender = sender.clone(); let _ = thread::Builder::new() @@ -136,9 +145,9 @@ pub fn run( bincode_deserialize_from(&mut stream_read).map_err(map_bincode_err) { // println!("parsed"); - let (sender_, receiver) = sync_channel::>(0); + let (sender_, receiver) = sync_channel::>(0); sender.send(Either::Left((request, sender_, None))).unwrap(); - let pid: Option = receiver.recv().unwrap(); + let pid: Result = receiver.recv().unwrap(); // let mut stream_write = stream_write.write(); if bincode::serialize_into(&mut stream_write, &pid).is_err() { break; @@ -183,7 +192,12 @@ pub fn run( // "Failing a spawn! Cannot allocate process {:#?} to nodes {:#?}", // resources, nodes // ); - sender.send(None).unwrap(); + if request.block { + // TODO! + sender.send(Err(TrySpawnError::Unknown)).unwrap(); + } else { + sender.send(Err(TrySpawnError::NoCapacity)).unwrap(); + } } } Either::Right((node_, Either::Left(pid))) => { @@ -192,7 +206,7 @@ pub fn run( let (sender, process) = node.3.pop_front().unwrap(); let x = processes.insert((node_, pid), process); assert!(x.is_none()); - sender.send(Some(pid)).unwrap(); + sender.send(Ok(pid)).unwrap(); } Either::Right((node, Either::Right(pid))) => { let process = processes.remove(&(node, pid)).unwrap(); diff --git a/src/bin/deploy.rs b/src/bin/deploy.rs index 8c463f8..259b31c 100644 --- a/src/bin/deploy.rs +++ b/src/bin/deploy.rs @@ -35,7 +35,7 @@ use std::{ }; use constellation_internal::{ - abort_on_unwind_1, map_bincode_err, msg::{bincode_serialize_into, BridgeRequest}, BufferedStream, DeployInputEvent, DeployOutputEvent, Envs, ExitStatus, Format, Formatter, Pid, StyleSupport + abort_on_unwind_1, map_bincode_err, msg::{bincode_serialize_into, BridgeRequest}, BufferedStream, DeployInputEvent, DeployOutputEvent, Envs, ExitStatus, Format, Formatter, Pid, StyleSupport, TrySpawnError }; const USAGE: &str = "Run a binary on a constellation cluster. @@ -60,6 +60,7 @@ struct Args { arg_args: Vec, // ffi::OsString } +#[allow(clippy::too_many_lines)] fn main() { let envs = Envs::from_env(); let args: Args = docopt::Docopt::new(USAGE) @@ -105,12 +106,10 @@ fn main() { bincode_serialize_into(&mut stream_write.write(), &request) .map_err(map_bincode_err) .unwrap_or_else(|e| panic!("Couldn't communicate with bridge: {:?}", e)); - let pid: Option = bincode::deserialize_from(&mut stream_read) + let pid: Result = bincode::deserialize_from(&mut stream_read) .map_err(map_bincode_err) .unwrap_or_else(|e| panic!("Couldn't communicate with bridge: {:?}", e)); - let pid = pid.unwrap_or_else(|| { - panic!("Deploy failed due to not being able to allocate process to any of the nodes or constellation::init() not being called immediately inside main()") - }); // TODO get resources from bridge + let pid = pid.unwrap_or_else(|e| panic!("Deploy failed due to {}", e)); // TODO get resources from bridge crossbeam::scope(|scope| { let _ = scope.spawn(abort_on_unwind_1(|_scope| { let mut stdin = io::stdin(); diff --git a/src/channel/inner.rs b/src/channel/inner.rs index 17ffb37..802cc95 100644 --- a/src/channel/inner.rs +++ b/src/channel/inner.rs @@ -18,10 +18,10 @@ pub enum Inner { } impl Inner { pub fn connect( - local: SocketAddr, remote: SocketAddr, incoming: Option, + bind: SocketAddr, local: SocketAddr, remote: SocketAddr, incoming: Option, notifier: &impl Notifier, ) -> Self { - InnerConnecting::new(local, remote, incoming, notifier).into() + InnerConnecting::new(bind, local, remote, incoming, notifier).into() } pub fn poll(&mut self, notifier: &impl Notifier) { diff --git a/src/channel/inner_states.rs b/src/channel/inner_states.rs index 56eadbc..b28a0d6 100644 --- a/src/channel/inner_states.rs +++ b/src/channel/inner_states.rs @@ -39,12 +39,12 @@ pub enum InnerConnecting { } impl InnerConnecting { pub fn new( - local: SocketAddr, remote: SocketAddr, incoming: Option, + bind: SocketAddr, local: SocketAddr, remote: SocketAddr, incoming: Option, notifier: &impl Notifier, ) -> InnerConnectingPoll { if ord(&local, &remote) { assert!(incoming.is_none()); - Self::Outgoing(Some(Connection::connect(local, remote, notifier))) + Self::Outgoing(Some(Connection::connect(bind, remote, notifier))) } else { Self::Incoming(incoming) } diff --git a/src/channel/mod.rs b/src/channel/mod.rs index 73117fa..d49d7e9 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -48,31 +48,34 @@ pub struct Reactor { notifier: Notifier, listener: RwLock>, sockets: RwLock>>>>, + bind: SocketAddr, local: SocketAddr, } impl Reactor { #[allow(dead_code)] - pub fn new(host: IpAddr) -> (Self, u16) { + pub fn new(host: IpAddr, local: IpAddr) -> (Self, u16) { let notifier = Notifier::new(); let (listener, port) = Listener::new_ephemeral(&host, ¬ifier.context(Key(ptr::null()))); let sockets = RwLock::new(HashMap::new()); - let local = SocketAddr::new(host, port); + let bind = SocketAddr::new(host, port); + let local = SocketAddr::new(local, port); ( Self { notifier, listener: RwLock::new(Some(listener)), sockets, + bind, local, }, port, ) } - pub fn with_fd(fd: Fd) -> Self { + pub fn with_fd(fd: Fd, local: SocketAddr) -> Self { let notifier = Notifier::new(); let listener = Listener::with_fd(fd, ¬ifier.context(Key(ptr::null()))); let sockets = RwLock::new(HashMap::new()); - let local = if let socket::SockAddr::Inet(inet) = socket::getsockname(fd).unwrap() { + let bind = if let socket::SockAddr::Inet(inet) = socket::getsockname(fd).unwrap() { inet.to_std() } else { panic!() @@ -81,11 +84,14 @@ impl Reactor { notifier, listener: RwLock::new(Some(listener)), sockets, + bind, local, } } - pub fn with_forwardee(socket_forwardee: SocketForwardee, local: SocketAddr) -> Self { + pub fn with_forwardee( + socket_forwardee: SocketForwardee, bind: SocketAddr, local: SocketAddr, + ) -> Self { let notifier = Notifier::new(); let listener = Listener::with_socket_forwardee(socket_forwardee, ¬ifier.context(Key(ptr::null()))); @@ -94,10 +100,12 @@ impl Reactor { notifier, listener: RwLock::new(Some(listener)), sockets, + bind, local, } } + #[allow(clippy::too_many_lines)] pub fn run< F: FnMut() -> C + marker::Send + 'static, C: Borrow, @@ -120,10 +128,11 @@ impl Reactor { let context = context(); let context = context.borrow(); let mut listener = context.listener.try_write().unwrap(); - let (notifier, listener, sockets, local) = ( + let (notifier, listener, sockets, bind, local) = ( &context.notifier, listener.as_mut().unwrap(), &context.sockets, + &context.bind, &context.local, ); let mut done: Option< @@ -251,6 +260,7 @@ impl Reactor { ); } else if channel.inner.closed() { let mut inner = Inner::connect( + *bind, *local, remote, Some(connectee), @@ -304,6 +314,7 @@ impl Reactor { ¬ifier.context(Key(notifier_key as *const ())); let connectee: Connection = connection(notifier).into(); let mut inner = Inner::connect( + *bind, *local, remote, Some(connectee), @@ -488,14 +499,19 @@ pub struct Sender { } impl Sender { pub fn new(remote: SocketAddr, context: &Reactor) -> Option { - let (notifier, sockets, local) = (&context.notifier, &context.sockets, &context.local); + let (notifier, sockets, bind, local) = ( + &context.notifier, + &context.sockets, + &context.bind, + &context.local, + ); let sockets = &mut *sockets.write().unwrap(); let channel = match sockets.entry(remote) { hash_map::Entry::Vacant(vacant) => { let channel = Arc::new(RwLock::new(None)); let notifier_key: *const RwLock> = &*channel; let notifier = ¬ifier.context(Key(notifier_key as *const ())); - let mut inner = Channel::new(Inner::connect(*local, remote, None, notifier)); + let mut inner = Channel::new(Inner::connect(*bind, *local, remote, None, notifier)); inner.senders_count += 1; *channel.try_write().unwrap() = Some(inner); let _ = vacant.insert(channel.clone()); @@ -693,14 +709,19 @@ pub struct Receiver { } impl Receiver { pub fn new(remote: SocketAddr, context: &Reactor) -> Option { - let (notifier, sockets, local) = (&context.notifier, &context.sockets, &context.local); + let (notifier, sockets, bind, local) = ( + &context.notifier, + &context.sockets, + &context.bind, + &context.local, + ); let sockets = &mut *sockets.write().unwrap(); let channel = match sockets.entry(remote) { hash_map::Entry::Vacant(vacant) => { let channel = Arc::new(RwLock::new(None)); let notifier_key: *const RwLock> = &*channel; let notifier = ¬ifier.context(Key(notifier_key as *const ())); - let mut inner = Channel::new(Inner::connect(*local, remote, None, notifier)); + let mut inner = Channel::new(Inner::connect(*bind, *local, remote, None, notifier)); inner.receivers_count += 1; *channel.try_write().unwrap() = Some(inner); let _ = vacant.insert(channel.clone()); diff --git a/src/lib.rs b/src/lib.rs index c456f8a..8e266ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,7 @@ //! //! The only requirement to use is that [`init()`](init) must be called immediately inside your application's `main()` function. -#![doc(html_root_url = "https://docs.rs/constellation-rs/0.1.5")] +#![doc(html_root_url = "https://docs.rs/constellation-rs/0.1.6")] #![cfg_attr(feature = "nightly", feature(read_initializer))] #![feature(cfg_doctest)] #![warn( @@ -54,7 +54,7 @@ use palaver::{ use pin_utils::pin_mut; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{ - borrow, convert::{Infallible, TryFrom, TryInto}, error::Error, ffi::{CStr, CString, OsString}, fmt, fs, future::Future, io::{self, Read, Write}, iter, marker, mem::MaybeUninit, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream}, ops, os::unix::{ + borrow, convert::{Infallible, TryInto}, ffi::{CStr, CString, OsString}, fmt, fs, future::Future, io::{self, Read, Write}, iter, marker, mem::MaybeUninit, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream}, ops, os::unix::{ ffi::OsStringExt, io::{AsRawFd, FromRawFd, IntoRawFd} }, path, pin::Pin, process, str, sync::{mpsc, Arc, Mutex, RwLock}, task::{Context, Poll}, thread::{self, Thread} }; @@ -63,8 +63,10 @@ use constellation_internal::{ abort_on_unwind, file_from_reader, forbid_alloc, map_bincode_err, msg::{bincode_serialize_into, FabricRequest}, BufferedStream, Deploy, DeployOutputEvent, Envs, ExitStatus, Fd, Format, Formatter, PidInternal, ProcessInputEvent, ProcessOutputEvent, StyleSupport }; +#[doc(inline)] pub use channel::ChannelError; -pub use constellation_internal::{Pid, Resources, RESOURCES_DEFAULT}; +#[doc(inline)] +pub use constellation_internal::{Pid, Resources, SpawnError, TrySpawnError, RESOURCES_DEFAULT}; ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -489,6 +491,7 @@ pub fn resources() -> Resources { ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +#[allow(clippy::too_many_lines)] fn spawn_native( resources: Resources, f: serde_closure::FnOnce<(Vec,), fn((Vec,), (Pid,))>, _block: bool, @@ -643,8 +646,7 @@ fn spawn_native( } fn spawn_deployed( - resources: Resources, f: serde_closure::FnOnce<(Vec,), fn((Vec,), (Pid,))>, - _block: bool, + resources: Resources, f: serde_closure::FnOnce<(Vec,), fn((Vec,), (Pid,))>, block: bool, ) -> Result { trace!("spawn_deployed"); let stream = unsafe { TcpStream::from_raw_fd(SCHEDULER_FD) }; @@ -673,6 +675,7 @@ fn spawn_deployed( #[cfg(not(feature = "distribute_binaries"))] let binary = std::marker::PhantomData::; let request = FabricRequest { + block, resources, bind: vec![], args: env::args_os().expect("Couldn't get argv"), @@ -683,82 +686,19 @@ fn spawn_deployed( bincode_serialize_into(&mut stream_write.write(), &request) .map_err(map_bincode_err) .unwrap(); - let pid: Option = bincode::deserialize_from(&mut stream_read) + let pid: Result = bincode::deserialize_from(&mut stream_read) .map_err(map_bincode_err) .unwrap(); drop(stream_read); - trace!("{} spawned? {}", self::pid(), pid.unwrap()); - if let Some(pid) = pid { + trace!("{} spawned? {}", self::pid(), pid.as_ref().unwrap()); + if let Ok(pid) = pid { let file = unsafe { fs::File::from_raw_fd(MONITOR_FD) }; bincode::serialize_into(&mut &file, &ProcessOutputEvent::Spawn(pid)).unwrap(); let _ = file.into_raw_fd(); } let _ = stream.into_raw_fd(); - pid.ok_or(TrySpawnError::NoCapacity) -} - -/// An error returned by the [`try_spawn()`](try_spawn) method detailing the reason if known. -#[allow(missing_copy_implementations)] -#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)] -pub enum TrySpawnError { - /// [`try_spawn()`](try_spawn) failed because the new process couldn't be allocated. - NoCapacity, - /// [`try_spawn()`](try_spawn) failed for unknown reasons. - Unknown, - #[doc(hidden)] - __Nonexhaustive, // https://github.com/rust-lang/rust/issues/44109 -} - -/// An error returned by the [`spawn()`](spawn) method detailing the reason if known. -#[allow(missing_copy_implementations)] -#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)] -pub enum SpawnError { - /// [`spawn()`](spawn) failed for unknown reasons. - Unknown, - #[doc(hidden)] - __Nonexhaustive, -} -impl From for TrySpawnError { - fn from(error: SpawnError) -> Self { - match error { - SpawnError::Unknown => Self::Unknown, - SpawnError::__Nonexhaustive => unreachable!(), - } - } -} -impl TryFrom for SpawnError { - type Error = (); - - fn try_from(error: TrySpawnError) -> Result { - match error { - TrySpawnError::NoCapacity => Err(()), - TrySpawnError::Unknown => Ok(Self::Unknown), - TrySpawnError::__Nonexhaustive => unreachable!(), - } - } -} -impl fmt::Display for TrySpawnError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::NoCapacity => write!( - f, - "try_spawn() failed because the new process couldn't be allocated" - ), - Self::Unknown => write!(f, "try_spawn() failed for unknown reasons"), - Self::__Nonexhaustive => unreachable!(), - } - } -} -impl fmt::Display for SpawnError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Unknown => write!(f, "spawn() failed for unknown reasons"), - Self::__Nonexhaustive => unreachable!(), - } - } + pid } -impl Error for TrySpawnError {} -impl Error for SpawnError {} async fn spawn_inner( resources: Resources, start: T, block: bool, @@ -847,7 +787,7 @@ pub fn bridge_init() -> TcpListener { .unwrap(); } - let reactor = channel::Reactor::with_fd(LISTENER_FD); + let reactor = channel::Reactor::with_fd(LISTENER_FD, pid().addr()); *REACTOR.try_write().unwrap() = Some(reactor); let handle = channel::Reactor::run( || BorrowMap::new(REACTOR.read().unwrap(), borrow_unwrap_option), @@ -885,7 +825,7 @@ fn native_bridge(format: Format, our_pid: Pid) -> Pid { let bridge_pid = Pid::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bridge_process_id); PID.set(bridge_pid).unwrap(); - let reactor = channel::Reactor::with_fd(LISTENER_FD); + let reactor = channel::Reactor::with_fd(LISTENER_FD, bridge_pid.addr()); *REACTOR.try_write().unwrap() = Some(reactor); let handle = channel::Reactor::run( || BorrowMap::new(REACTOR.read().unwrap(), borrow_unwrap_option), @@ -1004,6 +944,7 @@ fn native_process_listener() -> (Fd, u16) { (process_listener, process_id.port()) } +#[allow(clippy::too_many_lines)] fn monitor_process( bridge: Pid, deployed: bool, ) -> (channel::SocketForwardee, Fd, Fd, Option, Fd) { @@ -1073,7 +1014,7 @@ fn monitor_process( .unwrap(); } - let reactor = channel::Reactor::with_fd(LISTENER_FD); + let reactor = channel::Reactor::with_fd(LISTENER_FD, pid().addr()); *REACTOR.try_write().unwrap() = Some(reactor); let handle = channel::Reactor::run( || BorrowMap::new(REACTOR.read().unwrap(), borrow_unwrap_option), @@ -1283,6 +1224,7 @@ fn monitor_process( /// Initialise the [constellation](self) runtime. This must be called immediately inside your application's `main()` function. /// /// The `resources` argument describes memory and CPU requirements for the initial process. +#[allow(clippy::too_many_lines)] pub fn init(resources: Resources) { // simple_logging::log_to_file( // format!("logs/{}.log", std::process::id()), @@ -1387,13 +1329,13 @@ pub fn init(resources: Resources) { // let err = unsafe{libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL)}; assert_eq!(err, 0); }); - let port = { + let bind = { let listener = unsafe { TcpListener::from_raw_fd(LISTENER_FD) }; let local_addr = listener.local_addr().unwrap(); let _ = listener.into_raw_fd(); - local_addr.port() + local_addr }; - let our_pid = Pid::new(ip, port); + let our_pid = Pid::new(ip, bind.port()); PID.set(our_pid).unwrap(); trace!( @@ -1456,7 +1398,7 @@ pub fn init(resources: Resources) { .unwrap(); } - let reactor = channel::Reactor::with_forwardee(socket_forwardee, pid().addr()); + let reactor = channel::Reactor::with_forwardee(socket_forwardee, bind, pid().addr()); *REACTOR.try_write().unwrap() = Some(reactor); let handle = channel::Reactor::run( || BorrowMap::new(REACTOR.read().unwrap(), borrow_unwrap_option), diff --git a/tests/tester/main.rs b/tests/tester/main.rs index c3f2e67..a822a8a 100644 --- a/tests/tester/main.rs +++ b/tests/tester/main.rs @@ -215,6 +215,7 @@ fn treeize( } } +#[allow(clippy::too_many_lines)] fn main() { let start = time::Instant::now(); std::env::set_var("RUST_BACKTRACE", "full");