diff --git a/Cargo.lock b/Cargo.lock index 7cc67e268..da6443aa4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5500,7 +5500,9 @@ dependencies = [ "parking_lot", "paste", "pin-project", + "rand", "restate-core", + "restate-log-server", "restate-metadata-store", "restate-rocksdb", "restate-test-util", diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index 3d36b2e8e..46ff0bb8f 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -28,6 +28,7 @@ futures = { workspace = true } metrics = { workspace = true } parking_lot = { workspace = true } pin-project = { workspace = true } +rand = { workspace = true } rocksdb = { workspace = true } serde = { workspace = true } smallvec = { workspace = true } @@ -40,6 +41,7 @@ tracing = { workspace = true } [dev-dependencies] restate-core = { workspace = true, features = ["test-util"] } +restate-log-server = { workspace = true } restate-metadata-store = { workspace = true } restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } diff --git a/crates/bifrost/src/loglet/mod.rs b/crates/bifrost/src/loglet/mod.rs index 7ffc259b4..e81e5d12b 100644 --- a/crates/bifrost/src/loglet/mod.rs +++ b/crates/bifrost/src/loglet/mod.rs @@ -154,6 +154,20 @@ pub trait LogletReadStream: Stream, Operati pub type SendableLogletReadStream = Pin>; +pub(crate) struct Resolver { + tx: oneshot::Sender>, +} + +impl Resolver { + pub fn sealed(self) { + let _ = self.tx.send(Err(AppendError::Sealed)); + } + + pub fn offset(self, offset: LogletOffset) { + let _ = self.tx.send(Ok(offset)); + } +} + pub struct LogletCommit { rx: oneshot::Receiver>, } @@ -170,6 +184,11 @@ impl LogletCommit { let _ = tx.send(Ok(offset)); Self { rx } } + + pub(crate) fn later() -> (Self, Resolver) { + let (tx, rx) = oneshot::channel(); + (Self { rx }, Resolver { tx }) + } } impl std::future::Future for LogletCommit { diff --git a/crates/bifrost/src/providers/replicated_loglet/mod.rs b/crates/bifrost/src/providers/replicated_loglet/mod.rs index 55e281a88..5a16cd3cb 100644 --- a/crates/bifrost/src/providers/replicated_loglet/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/mod.rs @@ -10,5 +10,6 @@ pub(crate) mod metric_definitions; mod provider; +mod sequencer; pub use provider::Factory; diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs new file mode 100644 index 000000000..28cd729a6 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs @@ -0,0 +1,734 @@ +use std::{ + collections::{BTreeMap, HashMap, HashSet, VecDeque}, + pin::Pin, + sync::{ + atomic::{self, Ordering}, + Arc, + }, + time::Duration, + usize, +}; + +use futures::Stream; +use node::{LogletStatus, NodeClient, NodeSet, ReplicationPolicy, Tracker}; +use tokio::sync::{mpsc, oneshot}; +use worker::{Batch, Payload, WorkerEvent}; + +use restate_core::{cancellation_token, ShutdownError, TaskCenter, TaskKind}; +use restate_types::{ + logs::{LogletOffset, Lsn, Record, SequenceNumber, TailState}, + net::log_server::{LogletInfo, Status, Stored}, + replicated_loglet::ReplicatedLogletId, + GenerationalNodeId, +}; + +use crate::loglet::LogletCommit; + +mod node; +mod worker; + +pub use node::Nodes; +const SUBSCRIPTION_STREAM_SIZE: usize = 64; +const NODE_HEALTH_CHECK: Duration = Duration::from_millis(1000); +const NODE_MAX_LAST_RESPONSE_DURATION: Duration = Duration::from_millis(1000); + +//todo: improve error names and description +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("cannot satisfy spread")] + CannotSatisfySpread, + #[error("malformed batch")] + MalformedBatch, + #[error("invalid node set")] + InvalidNodeSet, + #[error("node {0} queue is full")] + TemporaryUnavailable(GenerationalNodeId), + #[error(transparent)] + ShutdownError(#[from] ShutdownError), +} + +/// internal commands sent over the [`SequencerHandler`] to sequencer main loop +struct Call { + request: Q, + sender: oneshot::Sender, +} + +impl Call { + fn from_request(request: Q) -> (oneshot::Receiver, Self) { + let (sender, receiver) = oneshot::channel(); + (receiver, Self { request, sender }) + } +} + +#[derive(derive_more::Deref)] +struct Event { + peer: GenerationalNodeId, + #[deref] + event: E, +} + +impl Event { + fn new(peer: GenerationalNodeId, signal: S) -> Self { + Self { + peer, + event: signal, + } + } +} + +/// Internal commands to the sequencer main loop. This is exclusively used +/// by the SequencerHandler +enum Commands { + ClusterState(Call<(), ClusterState>), + /// executed commands + EnqueueBatch(Call, Result>), +} + +// Internal possible signals that can be received async from log server +enum Events { + Stored(Event), + LogletInfo(Event), +} + +impl Events { + fn peer(&self) -> &GenerationalNodeId { + match self { + Self::Stored(signal) => &signal.peer, + Self::LogletInfo(signal) => &signal.peer, + } + } +} + +#[derive(Debug, Clone)] +pub struct SequencerHandle { + /// internal commands channel. + commands: mpsc::Sender, + /// internal signal channels. Signals are responses that are received + /// async and sent from log server nodes. + /// they are separated from the commands, in case we need to block command + /// processing until a set of responses has been received + signals: mpsc::Sender, +} + +pub(crate) struct Receiver { + commands: mpsc::Receiver, + events: mpsc::Receiver, +} + +impl SequencerHandle { + pub(crate) fn pair() -> (SequencerHandle, Receiver) { + // todo: the size of the channel should be 1 + let (commands_sender, commands_receiver) = mpsc::channel::(1); + let (signals_sender, signals_receiver) = mpsc::channel::(64); + ( + SequencerHandle { + commands: commands_sender, + signals: signals_sender, + }, + Receiver { + commands: commands_receiver, + events: signals_receiver, + }, + ) + } + + pub fn watch_tail(&self) -> futures::stream::BoxStream<'static, TailState> { + unimplemented!() + } + + pub async fn cluster_state(&self) -> Result { + let (receiver, command) = Call::from_request(()); + self.commands + .send(Commands::ClusterState(command)) + .await + .map_err(|_| ShutdownError)?; + + receiver.await.map_err(|_| ShutdownError) + } + + pub async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result { + let (receiver, command) = Call::from_request(payloads); + self.commands + .send(Commands::EnqueueBatch(command)) + .await + .map_err(|_| ShutdownError)?; + + receiver.await.map_err(|_| ShutdownError)? + } + + pub async fn event_stored( + &self, + peer: impl Into, + payloads: Stored, + ) -> Result<(), ShutdownError> { + let signal = Event::new(peer.into(), payloads); + self.signals + .send(Events::Stored(signal)) + .await + .map_err(|_| ShutdownError) + } + + pub async fn event_loglet_info( + &self, + peer: impl Into, + payloads: LogletInfo, + ) -> Result<(), ShutdownError> { + let signal = Event::new(peer.into(), payloads); + self.signals + .send(Events::LogletInfo(signal)) + .await + .map_err(|_| ShutdownError) + } +} + +#[derive(Clone, Debug)] +pub struct ClusterState { + pub sequencer_id: GenerationalNodeId, + pub global_committed_tail: LogletOffset, + pub nodes: BTreeMap, +} + +/// part of state that is shared between multiple appenders +#[derive(Debug)] +pub(crate) struct SequencerGlobalState { + node_id: GenerationalNodeId, + loglet_id: ReplicatedLogletId, + global_committed_tail: atomic::AtomicU32, +} + +impl SequencerGlobalState { + pub fn node_id(&self) -> &GenerationalNodeId { + &self.node_id + } + + pub fn loglet_id(&self) -> &ReplicatedLogletId { + &self.loglet_id + } + + pub fn committed_tail(&self) -> LogletOffset { + LogletOffset::new(self.global_committed_tail.load(Ordering::Acquire)) + } + + pub(crate) fn set_committed_tail(&self, tail: LogletOffset) { + self.global_committed_tail + .fetch_max(tail.into(), Ordering::Release); + } +} + +/// Sequencer inner state machine +/// +/// this holds for example, the replica set (log servers) +/// information about global tail, etc... +#[derive(Debug)] +struct SequencerInner +where + P: ReplicationPolicy, +{ + nodes: node::Nodes, + node_set: NodeSet, + replication_policy: P, + sealed: bool, + handle: SequencerHandle, + offset: Lsn, + inflight_tail: LogletOffset, + global: Arc, + batches: VecDeque>, +} + +pub struct Sequencer; +impl Sequencer { + pub fn start( + task_center: &TaskCenter, + node_id: GenerationalNodeId, + loglet_id: ReplicatedLogletId, + offset: Lsn, + nodes: Nodes, + node_set: Vec, + replication_policy: P, + ) -> Result + where + C: node::NodeClient + Send + Sync + 'static, + P: ReplicationPolicy + Send + Sync + 'static, + { + // - register for all potential response streams from the log-server(s). + + // create a command channel to be used by the sequencer handler. The handler then can be used + // to call and execute commands on the sequencer directly + let (handle, commands) = SequencerHandle::pair(); + + let sequencer = SequencerInner::new( + &task_center, + node_id, + loglet_id, + nodes, + node_set, + replication_policy, + handle.clone(), + offset, + )?; + + task_center.spawn_unmanaged( + TaskKind::SystemService, + "leader-sequencer", + None, + sequencer.run(commands), + )?; + + Ok(handle) + } +} + +impl SequencerInner +where + C: NodeClient + Send + Sync + 'static, + P: ReplicationPolicy, +{ + fn new( + task_center: &TaskCenter, + node_id: GenerationalNodeId, + loglet_id: ReplicatedLogletId, + nodes: Nodes, + node_set: Vec, + replication_policy: P, + handle: SequencerHandle, + offset: Lsn, + ) -> Result { + // shared state with appenders + let shared = Arc::new(SequencerGlobalState { + node_id, + loglet_id, + global_committed_tail: atomic::AtomicU32::new(LogletOffset::OLDEST.into()), + }); + + // build the node set + let node_set = NodeSet::start(task_center, node_set, &nodes, Arc::clone(&shared))?; + + // + Ok(Self { + nodes, + node_set, + replication_policy, + handle, + offset, + inflight_tail: LogletOffset::OLDEST, + global: shared, + sealed: false, + batches: VecDeque::default(), + }) + } + + async fn run(mut self, mut input: Receiver) { + let shutdown = cancellation_token(); + + let mut health = tokio::time::interval(NODE_HEALTH_CHECK); + + // enter main state machine loop + loop { + tokio::select! { + biased; + _ = shutdown.cancelled() => { + break; + }, + // we rely on the fact that the tick is resolved immediately + // on first call to send a ping to all nodes in the node set + _ = health.tick() => { + self.health_check().await; + } + Some(event) = input.events.recv() => { + self.process_event(event).await; + } + Some(command) = input.commands.recv() => { + self.process_command(command).await; + } + } + } + } + + /// health check fo the node set, and also try + async fn health_check(&self) { + for id in self.node_set.keys() { + if let Some(node) = self.nodes.get(id) { + if node.status().duration_since_last_response() < NODE_MAX_LAST_RESPONSE_DURATION { + continue; + } + // otherwise send a heartbeat and wait for response + if let Err(_) = node.client().enqueue_get_loglet_info().await { + tracing::warn!(node=%id, "failed to send get-loglet-info to node") + } + } + } + } + + /// process calls from the SequencerHandler. + async fn process_command(&mut self, command: Commands) { + match command { + Commands::ClusterState(command) => { + let Call { sender, .. } = command; + if let Err(_) = sender.send(self.cluster_state()) {} + } + Commands::EnqueueBatch(command) => { + let Call { request, sender } = command; + + let _ = sender.send(self.enqueue_batch(request).await); + } + } + } + + /// process calls from the SequencerHandler. + async fn process_event(&mut self, event: Events) { + if let Some(node) = self.nodes.get(event.peer()) { + node.status().touch(); + } + + match event { + Events::Stored(event) => { + self.process_stored_event(event).await; + } + Events::LogletInfo(signal) => { + self.process_loglet_info_event(signal); + } + } + } + + fn cluster_state(&self) -> ClusterState { + ClusterState { + global_committed_tail: self.global.committed_tail(), + sequencer_id: self.global.node_id, + nodes: self + .node_set + .iter() + .map(|(id, node)| (id.clone(), node.loglet.clone())) + .collect(), + } + } + + async fn enqueue_batch(&mut self, records: Arc<[Record]>) -> Result { + if self.sealed { + // todo: (question) do we return a sealed loglet commit, or error. + return Ok(LogletCommit::sealed()); + } + + // - create a partial store + let payload = Arc::new(Payload { + first_offset: self.inflight_tail, + records, + }); + + // - compute the new inflight_tail + let new_inflight_tail = payload.inflight_tail().ok_or(Error::MalformedBatch)?; + + // - get the next spread of nodes from the node set that can satisfy this inflight_tail + let spread = self.replication_policy.select( + self.global.committed_tail(), + self.inflight_tail, + &self.node_set, + )?; + + let (commit, resolver) = LogletCommit::later(); + + let tracker = spread.enqueue(&payload); + + // we need to update the inflight tail of the spread + for id in tracker.nodes() { + if let Some(node) = self.node_set.get_mut(id) { + node.loglet.inflight_tail = new_inflight_tail; + } + } + + // - create a batch that will be eventually resolved after we + // receive all "Stored" commands + let batch = Batch { + payload, + tracker, + resolver, + }; + + self.inflight_tail = new_inflight_tail; + self.batches.push_back(batch); + + Ok(commit) + } + + async fn process_stored_event(&mut self, stored: Event) { + match stored.status { + Status::Sealed | Status::Sealing => { + self.sealed = true; + if let Some(node) = self.node_set.get_mut(&stored.peer) { + node.loglet.sealed = true; + } + // reject all batches + // todo: need revision, this might not be always correct + for batch in self.batches.drain(..) { + batch.resolver.sealed(); + } + return; + } + Status::Ok => { + // store succeeded + if let Some(node) = self.node_set.get_mut(&stored.peer) { + // update node local tail. + if stored.local_tail > node.loglet.local_tail { + node.loglet.local_tail = stored.local_tail; + } + + // make sure the worker knows about its progress + // so it can move on + node.worker + .notify(WorkerEvent::Stored(stored.event.clone())) + .await; + } + } + _ => { + todo!() + } + } + + // fence is the range end of first non resolvable + // batch in the queue. + // we start by assuming that all batches are resolvable + let mut fence = self.batches.len(); + for (index, batch) in self.batches.iter_mut().enumerate() { + // if the node local tail is smaller that the batch inflight tail then + // we can safely break from this loop, since every next batch will + // have a higher inflight tail. + // it's also safe to unwrap the inflight_tail here since it won't + // even be in the batches queue if the inflight tail was invalid + let batch_inflight_tail = batch.payload.inflight_tail().unwrap(); + if stored.local_tail < batch_inflight_tail { + break; + } + + batch.tracker.mark_resolved(&stored.peer); + if !batch.tracker.is_complete() { + // once fence is set to this index, it will not change + // until the loop breaks + fence = std::cmp::min(fence, index); + } + } + + for batch in self.batches.drain(..fence) { + let inflight_tail = batch.payload.inflight_tail().unwrap(); + self.global.set_committed_tail(inflight_tail); + // todo: (azmy) we probably need to do a release here + batch.resolver.offset(inflight_tail); + } + } + + fn process_loglet_info_event(&mut self, signal: Event) { + let Event { + peer, + event: loglet_info, + } = signal; + + if loglet_info.sealed { + self.sealed = true; + // todo: finish sealing ? + } + + // update last response time + if let Some(node) = self.nodes.get(&peer) { + node.status().touch(); + } + + self.node_set.entry(peer).and_modify(|node| { + node.loglet.update(&loglet_info); + }); + } +} + +/// todo: (azmy) build actual tests this is just experiments +/// over interactions with log-server +#[cfg(test)] +mod test { + + use restate_core::{network::NetworkError, TaskCenterBuilder, TaskKind}; + use restate_types::{ + logs::{LogletOffset, Lsn, Record, SequenceNumber}, + net::log_server::{LogServerResponseHeader, LogletInfo, Status, Store, Stored}, + GenerationalNodeId, + }; + use std::{collections::HashMap, sync::Arc, time::Duration}; + use tokio::sync::Mutex; + + use super::{ + node::{NodeClient, PolicyAll, PolicySimpleQuorum, ReplicationPolicy}, + SequencerHandle, SequencerInner, + }; + + struct MockNodeClient { + id: GenerationalNodeId, + handle: SequencerHandle, + local_tail: Arc>, + } + + #[async_trait::async_trait] + impl NodeClient for MockNodeClient { + async fn enqueue_store(&self, msg: Store) -> Result<(), NetworkError> { + // directly respond with stored answer + let local_tail = msg.last_offset().unwrap() + 1; + let mut tail = self.local_tail.lock().await; + *tail = local_tail; + self.handle + .event_stored( + self.id.clone(), + Stored { + header: LogServerResponseHeader { + local_tail: local_tail, + sealed: false, + status: Status::Ok, + }, + }, + ) + .await?; + + Ok(()) + } + + async fn enqueue_get_loglet_info(&self) -> Result<(), NetworkError> { + self.handle + .event_loglet_info( + self.id.clone(), + LogletInfo { + header: LogServerResponseHeader { + local_tail: self.local_tail.lock().await.clone(), + sealed: false, + status: Status::Ok, + }, + trim_point: LogletOffset::OLDEST, + }, + ) + .await?; + + Ok(()) + } + } + + async fn default_setup

(size: usize, policy: P) -> SequencerHandle + where + P: ReplicationPolicy + Send + Sync + 'static, + { + //crate::setup_panic_handler(); + let tc = TaskCenterBuilder::default_for_tests().build().unwrap(); + + let (handle, input) = SequencerHandle::pair(); + + let mut nodes = HashMap::with_capacity(size); + + let sequencer_id = GenerationalNodeId::new(1, 1); + for i in 0..size { + let id = GenerationalNodeId::new(i as u32 + 2, 1); + let node = MockNodeClient { + id: id, + handle: handle.clone(), + local_tail: Arc::new(Mutex::new(LogletOffset::OLDEST)), + }; + nodes.insert(id, node); + } + + let node_set = nodes.keys().copied().collect(); + + let sequencer = SequencerInner::new( + &tc, + sequencer_id, + 100.into(), + nodes.into(), + node_set, + policy, + handle.clone(), + Lsn::OLDEST, + ) + .unwrap(); + + tc.spawn_unmanaged( + TaskKind::SystemService, + "sequencer", + None, + sequencer.run(input), + ) + .unwrap(); + + handle + } + + #[tokio::test] + async fn test_simple_all_replication() { + let handle = default_setup(2, PolicyAll).await; + + let records = vec![Record::from("hello world"), Record::from("hello human")]; + let resolved = handle.enqueue_batch(Arc::from(records)).await.unwrap(); + + println!("waiting for resolved commit"); + let tail = tokio::time::timeout(Duration::from_secs(2), resolved) + .await + .unwrap() + .unwrap(); + + let expected = LogletOffset::new(3); + assert_eq!(tail, expected); + + let state = handle.cluster_state().await.unwrap(); + for state in state.nodes.values() { + assert_eq!(state.local_tail, expected); + } + } + + #[tokio::test] + async fn test_simple_quorum_replication() { + let handle = default_setup(3, PolicySimpleQuorum).await; + + let records = Arc::from(vec![ + Record::from("hello world"), + Record::from("hello human"), + ]); + + let resolved = handle.enqueue_batch(Arc::clone(&records)).await.unwrap(); + + let tail = tokio::time::timeout(Duration::from_secs(2), resolved) + .await + .unwrap() + .unwrap(); + + let expected = LogletOffset::new(3); + assert_eq!(tail, expected); + + let state_1 = handle.cluster_state().await.unwrap(); + println!("state: {:#?}", state_1); + // at this point we expect min of 2 nodes to have reached the expected tail + let mut at_tail = 0; + + assert_eq!(expected, state_1.global_committed_tail); + for state in state_1.nodes.values() { + if state.local_tail == expected { + at_tail += 1; + } + } + + assert!(at_tail >= 2); + + // push the next batch! + // NOTE: since the all nodes is caught up to the global committed tail \ + // next time we do enqueue batch this can end up on a completely different set of nodes + let resolved = handle.enqueue_batch(Arc::clone(&records)).await.unwrap(); + + let expected = expected + records.len() as u32; + let tail = tokio::time::timeout(Duration::from_secs(2), resolved) + .await + .unwrap() + .unwrap(); + + assert_eq!(tail, expected); + + let state_2 = handle.cluster_state().await.unwrap(); + println!("state: {:#?}", state_2); + // at this point we expect min of 2 nodes to have reached the expected tail + let mut at_tail = 0; + assert_eq!(expected, state_2.global_committed_tail); + for state in state_2.nodes.values() { + if state.local_tail == expected { + at_tail += 1; + } + } + + assert!(at_tail >= 2); + } +} diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/node.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/node.rs new file mode 100644 index 000000000..f7b75859d --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/node.rs @@ -0,0 +1,344 @@ +use std::{ + collections::{BTreeMap, HashMap}, + sync::{ + atomic::{self, Ordering}, + Arc, Weak, + }, + time::Duration, +}; + +use super::{ + worker::{NodeWorker, NodeWorkerHandle, Payload, SendPermit}, + Error, SequencerGlobalState, +}; +use restate_core::{network::NetworkError, TaskCenter}; +use restate_types::{ + logs::{LogletOffset, SequenceNumber}, + net::log_server::{LogletInfo, Store}, + replicated_loglet::ReplicationProperty, + time::MillisSinceEpoch, + GenerationalNodeId, +}; + +/// LogletHandler trait abstracts the log-server loglet interface. One of possible implementations +/// is a grpc client to running log server +#[async_trait::async_trait] +pub trait NodeClient { + async fn enqueue_store(&self, msg: Store) -> Result<(), NetworkError>; + async fn enqueue_get_loglet_info(&self) -> Result<(), NetworkError>; +} + +#[derive(Debug, Default)] +pub struct NodeStatus { + // todo: this should be monotonic + last_response_time: atomic::AtomicU64, +} + +impl NodeStatus { + pub(crate) fn touch(&self) { + // update value with latest timestamp + self.last_response_time + .store(MillisSinceEpoch::now().into(), Ordering::Relaxed); + } + + pub fn last_response_time(&self) -> MillisSinceEpoch { + self.last_response_time.load(Ordering::Relaxed).into() + } + + pub fn duration_since_last_response(&self) -> Duration { + // last_response_time should be monotonic + self.last_response_time().elapsed() + } +} + +struct NodeInner { + client: C, + state: NodeStatus, +} + +pub struct Node { + inner: Arc>, +} + +impl Clone for Node { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + +impl Node { + fn new(client: C) -> Self { + Self { + inner: Arc::new(NodeInner { + client, + state: NodeStatus::default(), + }), + } + } + pub fn client(&self) -> &C { + &self.inner.client + } + + pub fn status(&self) -> &NodeStatus { + &self.inner.state + } +} + +#[derive(derive_more::Debug, derive_more::Deref, derive_more::DerefMut)] +#[debug("{:?}", inner.keys().collect::>())] +/// Nodes represents the entire cluster of nodes available. This can be shared with +/// multiple Sequencers +pub struct Nodes { + #[deref] + #[deref_mut] + inner: BTreeMap>, +} + +impl From for Nodes +where + I: IntoIterator, +{ + fn from(value: I) -> Self { + let inner: BTreeMap> = value + .into_iter() + .map(|(key, client)| (key, Node::new(client))) + .collect(); + + Self { inner } + } +} + +#[derive(Debug, Clone)] +pub struct LogletStatus { + pub sealed: bool, + pub local_tail: LogletOffset, + pub inflight_tail: LogletOffset, + pub trim_point: LogletOffset, +} + +impl Default for LogletStatus { + fn default() -> Self { + Self { + sealed: false, + local_tail: LogletOffset::INVALID, + inflight_tail: LogletOffset::INVALID, + trim_point: LogletOffset::INVALID, + } + } +} + +impl LogletStatus { + pub fn update(&mut self, info: &LogletInfo) { + self.sealed = info.sealed; + self.local_tail = info.local_tail; + self.trim_point = info.trim_point; + } +} + +#[derive(Debug)] +pub struct LogletNode { + pub loglet: LogletStatus, + pub worker: NodeWorkerHandle, +} + +/// NodeSet is a subset of Nodes that is maintained internally with each sequencer +#[derive(derive_more::Deref, derive_more::DerefMut, Debug)] +pub struct NodeSet { + inner: BTreeMap, +} + +impl NodeSet { + /// creates the node set and start the appenders + pub(crate) fn start( + tc: &TaskCenter, + node_set: impl IntoIterator, + nodes: &Nodes, + shared: Arc, + ) -> Result + where + C: NodeClient + Send + Sync + 'static, + { + let mut inner = BTreeMap::new(); + for id in node_set { + let node = match nodes.get(&id) { + Some(node) => node.clone(), + None => return Err(super::Error::InvalidNodeSet), + }; + + let worker = NodeWorker::start(tc, node, 10, Arc::clone(&shared))?; + + inner.insert( + id, + LogletNode { + loglet: LogletStatus::default(), + worker, + }, + ); + } + + Ok(Self { inner }) + } +} + +impl NodeSet { + pub fn sealed_nodes(&self) -> usize { + self.values() + .filter(|n| matches!(n.loglet, LogletStatus { sealed, .. } if sealed)) + .count() + } +} + +#[derive(Debug)] +pub struct SimpleTracker { + /// number of nodes to have resolved the write before + /// assuming this spread is committed + pub replication_factor: usize, + /// the nodes included in the spread + pub node_set: BTreeMap, +} + +impl Tracker for SimpleTracker { + fn mark_resolved(&mut self, node: &GenerationalNodeId) -> bool { + if let Some(value) = self.node_set.get_mut(node) { + *value = true; + return true; + } + false + } + + fn is_complete(&self) -> bool { + self.node_set.values().filter(|v| **v).count() >= self.replication_factor + } + + fn nodes(&self) -> impl Iterator { + self.node_set.keys() + } +} + +pub struct Spread<'a, T> { + tracker: T, + permits: Vec>, +} + +impl<'a, T> Spread<'a, T> +where + T: Tracker, +{ + pub fn enqueue(self, payload: &Arc) -> T { + let Spread { tracker, permits } = self; + + for permit in permits { + permit.send(Arc::downgrade(payload)); + } + + tracker + } +} + +pub trait Tracker { + // mark a node as "resolved" for that tracker. returns true + // if the node is actually resolved + fn mark_resolved(&mut self, node: &GenerationalNodeId) -> bool; + + // check if this tracker is resolved as complete + fn is_complete(&self) -> bool; + + fn nodes(&self) -> impl Iterator; +} + +pub trait ReplicationPolicy { + type Tracker: Tracker + Send + Sync + 'static; + + fn select<'a>( + &mut self, + global_committed_tail: LogletOffset, + first_offset: LogletOffset, + subset: &'a NodeSet, + ) -> Result, Error>; +} + +pub struct PolicyAll; + +impl ReplicationPolicy for PolicyAll { + type Tracker = SimpleTracker; + fn select<'a>( + &mut self, + _global_committed_tail: LogletOffset, + _first_offset: LogletOffset, + subset: &'a NodeSet, + ) -> Result, Error> { + let mut node_set = BTreeMap::default(); + let mut permits = Vec::default(); + + for (id, node) in subset.iter() { + let permit = node + .worker + .reserve() + .map_err(|_| Error::TemporaryUnavailable(*id))?; + + permits.push(permit); + node_set.insert(*id, false); + } + + Ok(Spread { + permits, + tracker: SimpleTracker { + replication_factor: node_set.len(), + node_set: node_set, + }, + }) + } +} + +pub struct PolicySimpleQuorum; + +impl ReplicationPolicy for PolicySimpleQuorum { + type Tracker = SimpleTracker; + fn select<'a>( + &mut self, + global_committed_tail: LogletOffset, + first_offset: LogletOffset, + subset: &'a NodeSet, + ) -> Result, Error> { + // a fixed write quorum of N/2+1 + let min = subset.len() / 2 + 1; + let mut node_set = BTreeMap::default(); + let mut permits = Vec::with_capacity(subset.len()); + + // pick nodes at random maybe! + use rand::seq::SliceRandom; + let mut all: Vec<&GenerationalNodeId> = subset.keys().collect(); + all.shuffle(&mut rand::thread_rng()); + + for id in all { + let node = subset.get(id).expect("node must exist in node-set"); + // nodes can't have gaps UNLESS the first offset in the batch + // is the global_committed_offset. + if node.loglet.inflight_tail != first_offset && first_offset != global_committed_tail { + continue; + } + + let Ok(permit) = node.worker.reserve() else { + // node is lagging or busy + continue; + }; + + permits.push(permit); + node_set.insert(id.clone(), false); + } + + if permits.len() < min { + return Err(Error::CannotSatisfySpread); + } + + Ok(Spread { + permits, + tracker: SimpleTracker { + replication_factor: min, + node_set: node_set, + }, + }) + } +} diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/worker.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/worker.rs new file mode 100644 index 000000000..cf2a8e9aa --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/worker.rs @@ -0,0 +1,252 @@ +use std::{ + collections::VecDeque, + sync::{Arc, Weak}, + time::Duration, +}; + +use restate_core::{cancellation_token, ShutdownError, TaskCenter, TaskKind}; +use tokio::sync::mpsc; + +use restate_types::{ + logs::{LogletOffset, Record, SequenceNumber}, + net::log_server::{Store, StoreFlags, Stored}, +}; + +use crate::loglet::Resolver; + +use super::{ + node::{Node, NodeClient, SimpleTracker, Tracker}, + Event, SequencerGlobalState, +}; + +#[derive(Debug)] +pub(crate) struct Payload { + pub first_offset: LogletOffset, + pub records: Arc<[Record]>, +} + +impl Payload { + pub fn inflight_tail(&self) -> Option { + let len = u32::try_from(self.records.len()).ok()?; + self.first_offset.checked_add(len).map(Into::into) + } +} + +#[derive(derive_more::Debug)] +pub(crate) struct Batch +where + T: Tracker, +{ + pub payload: Arc, + pub tracker: T, + #[debug(ignore)] + pub resolver: Resolver, +} + +pub struct SendPermit<'a> { + inner: mpsc::Permit<'a, Weak>, +} + +impl<'a> SendPermit<'a> { + pub(crate) fn send(self, payload: Weak) { + self.inner.send(payload) + } +} + +pub(crate) enum WorkerEvent { + Stored(Stored), +} + +#[derive(Clone, Debug)] +pub struct NodeWorkerHandle { + batch_tx: mpsc::Sender>, + event_tx: mpsc::Sender, +} + +impl NodeWorkerHandle { + /// reserve a send slot on the worker queue + pub fn reserve(&self) -> Result> { + Ok(SendPermit { + inner: self.batch_tx.try_reserve()?, + }) + } + + pub(crate) async fn notify(&self, event: WorkerEvent) { + let _ = self.event_tx.send(event).await; + } +} + +pub(crate) struct NodeWorker { + batch_rx: mpsc::Receiver>, + event_rx: mpsc::Receiver, + node: Node, + global: Arc, + buffer: VecDeque>, +} + +impl NodeWorker +where + C: NodeClient + Send + Sync + 'static, +{ + pub fn start( + tc: &TaskCenter, + node: Node, + queue_size: usize, + global: Arc, + ) -> Result { + // we create the channel at a 10% capacity of the full buffer size + // since pending batches will be queued in a VecDequeue + let (batch_tx, batch_rx) = mpsc::channel(std::cmp::max(1, queue_size / 10)); + let (event_tx, event_rx) = mpsc::channel(1); + let handle = NodeWorkerHandle { batch_tx, event_tx }; + + let buffer = VecDeque::with_capacity(queue_size); + let worker = NodeWorker { + batch_rx, + event_rx, + node, + global, + buffer, + }; + + tc.spawn_unmanaged(TaskKind::Disposable, "appender", None, worker.run())?; + + Ok(handle) + } + + async fn run(mut self) { + let token = cancellation_token(); + + loop { + // the first loop, we try to process + // all batches as they arrive and at the same + // time handle events regarding batches + // being processed by the node. + loop { + tokio::select! { + biased; + _ = token.cancelled() => { + return; + } + Some(event) = self.event_rx.recv() => { + self.process_event(event); + } + Some(batch) = self.batch_rx.recv() => { + self.process_batch(batch).await; + } + } + + // there is a chance here that buffer got filled up with pending batches + // that has never received a `stored` event. + // Hence we need to break out of this loop to stop accepting more batches + // to write! + // + // note: this should be == comparison but just in case + if self.buffer.len() >= self.buffer.capacity() { + break; + } + } + + // we can only reach here if we stopped receiving `stored` events + // in that case we will stop receiving more batches and only wait + // for the stored events or retry + let mut timer = tokio::time::interval(Duration::from_millis(250)); + loop { + // in this loop we only handle events (in case we can drain finally) + // but we don't accept any more batches. This will put back pressure + // since the replication policy will not be able to reserve this + // node anymore! + tokio::select! { + _ = token.cancelled() => { + return; + } + Some(event) = self.event_rx.recv() => { + self.process_event(event); + } + _ = timer.tick() => { + self.retry().await; + } + } + + if self.buffer.len() < self.buffer.capacity() { + // we made progress and we can break out of this inner + // loop. + break; + } + } + } + } + + async fn retry(&self) { + // retry to send all items in the batch + for batch in self.buffer.iter() { + self.process_once(batch).await; + } + } + + fn process_event(&mut self, event: WorkerEvent) { + match event { + WorkerEvent::Stored(stored) => { + self.drain(stored); + } + } + } + + fn drain(&mut self, event: Stored) { + let mut trim = 0; + for (i, batch) in self.buffer.iter().enumerate() { + let Some(batch) = batch.upgrade() else { + // batch has been resolved externally and we can ignore it + trim = i + 1; + continue; + }; + + if batch.inflight_tail().unwrap() > event.local_tail { + // no confirmation for this batch yet. + break; + } + trim = i + 1; + } + + self.buffer.drain(..trim); + } + + async fn process_batch(&mut self, batch: Weak) { + if self.process_once(&batch).await { + self.buffer.push_back(batch); + } + } + + async fn process_once(&self, batch: &Weak) -> bool { + let batch = match batch.upgrade() { + Some(batch) => batch, + None => return false, + }; + + let inflight_tail = batch.inflight_tail().expect("valid inflight tail"); + if inflight_tail <= self.global.committed_tail() { + // todo: (question) batch is already committed and we can safely ignore it? + return false; + } + + let store = Store { + first_offset: batch.first_offset, + flags: StoreFlags::empty(), + known_archived: LogletOffset::INVALID, + known_global_tail: self.global.committed_tail(), + loglet_id: self.global.loglet_id, + sequencer: self.global.node_id, + timeout_at: None, + // todo: (question) better way to do this? + payloads: Vec::from_iter(batch.records.iter().map(|r| r.clone())), + }; + + if let Err(err) = self.node.client().enqueue_store(store).await { + //todo: retry + tracing::error!(error = %err, "failed to send store to node"); + } + + // batch is sent but there is a chance that we need to retry + true + } +} diff --git a/crates/types/src/time.rs b/crates/types/src/time.rs index 487425f68..45546f468 100644 --- a/crates/types/src/time.rs +++ b/crates/types/src/time.rs @@ -15,7 +15,17 @@ use std::time::{Duration, SystemTime}; /// Milliseconds since the unix epoch #[derive( - Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, + Debug, + Clone, + Copy, + PartialEq, + Eq, + PartialOrd, + Ord, + Hash, + serde::Serialize, + serde::Deserialize, + derive_more::Into, )] #[serde(transparent)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]