Skip to content

Commit

Permalink
[Bifrost] SpreadSelector initial implementation
Browse files Browse the repository at this point in the history
Implements a flood selector (selects all writeable nodes in the effective nodeset).
  • Loading branch information
AhmedSoliman committed Sep 19, 2024
1 parent 8571725 commit faff5b9
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ futures = { workspace = true }
metrics = { workspace = true }
parking_lot = { workspace = true }
pin-project = { workspace = true }
rand = { workspace = true }
rocksdb = { workspace = true }
serde = { workspace = true }
smallvec = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
mod checker;
pub mod spread_selector;

pub use checker::*;
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

#[cfg(any(test, feature = "test-util"))]
use std::sync::Arc;

#[cfg(any(test, feature = "test-util"))]
use parking_lot::Mutex;
use rand::prelude::*;

use restate_types::nodes_config::NodesConfiguration;
use restate_types::replicated_loglet::{NodeSet, ReplicationProperty, Spread};

use crate::providers::replicated_loglet::replication::NodeSetChecker;

#[derive(Debug, Clone, thiserror::Error)]
pub enum SpreadSelectorError {
#[error("Insufficient writeable nodes in the nodeset")]
InsufficientWriteableNodes,
}

#[derive(Debug)]
pub enum SelectorStrategy {
/// Selects all writeable nodes in the nodeset, this might lead to over-replication,
/// and it's up to the appender state machine to continue replicating beyond the
/// write-quorum requirements or not.
Flood,
#[cfg(any(test, feature = "test-util"))]
/// Used in testing, generates deterministically static spreads
Fixed(FixedSpreadSelector),
}

/// Spread selector is thread-safe and can be used concurrently.
pub struct SpreadSelector {
nodeset: NodeSet,
strategy: SelectorStrategy,
replication_property: ReplicationProperty,
}

impl SpreadSelector {
pub fn new(
nodeset: NodeSet,
strategy: SelectorStrategy,
replication_property: ReplicationProperty,
) -> Self {
Self {
nodeset,
strategy,
replication_property,
}
}

/// Generates a spread or fails if it's not possible to generate a spread out of
/// the nodeset modulo the non-writeable nodes in the nodes configuration and after excluding
/// the set of nodes passed in `exclude_nodes`.
///
/// The selector avoids nodes non-writeable nodes
pub fn select<R: Rng + ?Sized>(
&self,
rng: &mut R,
nodes_config: &NodesConfiguration,
exclude_nodes: &NodeSet,
) -> Result<Spread, SpreadSelectorError> {
// Get the list of non-empty nodes from the nodeset given the nodes configuration
let effective_nodeset = self.nodeset.to_effective(nodes_config);
let mut writeable_nodes: Vec<_> = effective_nodeset
.into_iter()
.filter(|node_id| !exclude_nodes.contains(node_id))
.filter(|node_id| {
nodes_config
.get_log_server_storage_state(node_id)
.can_write_to()
})
.collect();
if writeable_nodes.len() < self.replication_property.num_copies().into() {
return Err(SpreadSelectorError::InsufficientWriteableNodes);
}

let selected: Spread = match &self.strategy {
SelectorStrategy::Flood => {
writeable_nodes.shuffle(rng);
Spread::from(writeable_nodes)
}
#[cfg(any(test, feature = "test-util"))]
SelectorStrategy::Fixed(selector) => selector.select()?,
};

// validate that we can have write quorum with this spread
let mut checker =
NodeSetChecker::new(&self.nodeset, nodes_config, &self.replication_property);
checker.set_attribute_on_each(&selected, || true);
if !checker.check_write_quorum(|attr| *attr) {
return Err(SpreadSelectorError::InsufficientWriteableNodes);
}

Ok(selected)
}
}

static_assertions::assert_impl_all!(SpreadSelector: Send, Sync);

#[cfg(any(test, feature = "test-util"))]
#[derive(Debug, Clone)]
pub struct FixedSpreadSelector {
pub result: Arc<Mutex<Result<Spread, SpreadSelectorError>>>,
}

#[cfg(any(test, feature = "test-util"))]
impl FixedSpreadSelector {
pub fn select(&self) -> Result<Spread, SpreadSelectorError> {
let guard = self.result.lock();
(*guard).clone()
}
}

#[cfg(test)]
mod tests {
use super::*;

use googletest::prelude::*;

use restate_types::nodes_config::StorageState;
use restate_types::PlainNodeId;

use crate::providers::replicated_loglet::test_util::generate_logserver_nodes_config;

#[test]
fn test_with_fixed_spread_selector() -> Result<()> {
let nodes_config = generate_logserver_nodes_config(10, StorageState::ReadWrite);
let replication = ReplicationProperty::new(3.try_into().unwrap());
let nodeset: NodeSet = (1..=5).collect();

// smoke test
let strategy = FixedSpreadSelector {
result: Arc::new(Mutex::new(Ok(Spread::from([1, 2, 3])))),
};
let selector = SpreadSelector::new(
nodeset,
SelectorStrategy::Fixed(strategy.clone()),
replication,
);
let mut rng = rand::thread_rng();
let spread = selector.select(&mut rng, &nodes_config, &NodeSet::empty())?;
assert_that!(spread, eq(Spread::from([1, 2, 3])));

// Fixed selector ignores exclude nodes as long as sufficient nodes are passed down
let spread = selector.select(&mut rng, &nodes_config, &NodeSet::from([1]))?;
assert_that!(spread, eq(Spread::from([1, 2, 3])));

// No sufficient nodes to select from if nodes config is too small or sufficient nodes are
// excluded to make the effective nodeset too small
//
// only 2 nodes left in the nodeset
let spread = selector.select(&mut rng, &nodes_config, &NodeSet::from([1, 2, 3]));
assert_that!(
spread,
err(pat!(SpreadSelectorError::InsufficientWriteableNodes))
);

let nodes_config = generate_logserver_nodes_config(2, StorageState::ReadWrite);
let replication = ReplicationProperty::new(3.try_into().unwrap());
let nodeset: NodeSet = (1..=3).collect();
let selector = SpreadSelector::new(nodeset, SelectorStrategy::Fixed(strategy), replication);

let spread = selector.select(&mut rng, &nodes_config, &NodeSet::empty());
assert_that!(
spread,
err(pat!(SpreadSelectorError::InsufficientWriteableNodes))
);

Ok(())
}

#[test]
fn test_flood_spread_selector() -> Result<()> {
let nodes_config = generate_logserver_nodes_config(10, StorageState::ReadWrite);
let replication = ReplicationProperty::new(3.try_into().unwrap());
let nodeset: NodeSet = (1..=5).collect();

let selector = SpreadSelector::new(nodeset, SelectorStrategy::Flood, replication);
let mut rng = rand::thread_rng();
let spread = selector.select(&mut rng, &nodes_config, &NodeSet::empty())?;
let spread = spread.to_vec();

assert_that!(
spread,
unordered_elements_are![
eq(PlainNodeId::new(1)),
eq(PlainNodeId::new(2)),
eq(PlainNodeId::new(3)),
eq(PlainNodeId::new(4)),
eq(PlainNodeId::new(5))
]
);

let spread = selector.select(&mut rng, &nodes_config, &NodeSet::from([1, 4]))?;
let spread = spread.to_vec();

assert_that!(
spread,
unordered_elements_are![
eq(PlainNodeId::new(2)),
eq(PlainNodeId::new(3)),
eq(PlainNodeId::new(5))
]
);

let spread = selector.select(&mut rng, &nodes_config, &NodeSet::from([1, 4, 2]));
assert_that!(
spread,
err(pat!(SpreadSelectorError::InsufficientWriteableNodes))
);

Ok(())
}
}
2 changes: 2 additions & 0 deletions crates/types/src/replicated_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

mod params;
mod replication_property;
mod spread;

pub use params::*;
pub use replication_property::*;
pub use spread::*;
50 changes: 50 additions & 0 deletions crates/types/src/replicated_loglet/spread.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::PlainNodeId;

#[derive(
Debug,
Clone,
PartialEq,
Eq,
Hash,
derive_more::Deref,
derive_more::DerefMut,
derive_more::IntoIterator,
derive_more::From,
derive_more::Index,
derive_more::IndexMut,
)]
pub struct Spread(Box<[PlainNodeId]>);

impl From<Vec<PlainNodeId>> for Spread {
fn from(v: Vec<PlainNodeId>) -> Self {
Self(v.into_boxed_slice())
}
}

impl From<Vec<u32>> for Spread {
fn from(v: Vec<u32>) -> Self {
Self(v.into_iter().map(PlainNodeId::from).collect())
}
}

impl<const N: usize> From<[PlainNodeId; N]> for Spread {
fn from(value: [PlainNodeId; N]) -> Self {
Self(From::from(value))
}
}

impl<const N: usize> From<[u32; N]> for Spread {
fn from(value: [u32; N]) -> Self {
Self(value.into_iter().map(PlainNodeId::from).collect())
}
}

0 comments on commit faff5b9

Please sign in to comment.