diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index baffe6563d..760adea0f1 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -10,15 +10,15 @@ mod nodeset_selection; +use futures::never::Never; +use rand::prelude::IteratorRandom; +use rand::{thread_rng, RngCore}; use std::collections::HashMap; use std::iter; use std::num::NonZeroU8; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; - -use rand::prelude::IteratorRandom; -use rand::{thread_rng, RngCore}; use tokio::sync::Semaphore; use tokio::task::JoinSet; use tracing::{debug, trace, trace_span, Instrument}; @@ -1210,7 +1210,7 @@ impl LogsController { }); } - pub async fn run_async_operations(&mut self) -> Result<()> { + pub async fn run_async_operations(&mut self) -> Result { loop { if self.async_operations.is_empty() { futures::future::pending().await diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 11330de902..1c07b4ca80 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -272,7 +272,8 @@ impl Service { state.reconfigure(configuration); } result = state.run() => { - result? + let leader_event = result?; + state.on_leader_event(leader_event).await?; } _ = &mut shutdown => { self.health_status.update(AdminStatus::Unknown); diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 5e2c4eaa53..b0647fecb2 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -87,18 +87,19 @@ where Ok(()) } -} -impl ClusterControllerState -where - T: TransportConnect, -{ - pub async fn run(&mut self) -> anyhow::Result<()> { + pub async fn on_leader_event(&mut self, leader_event: LeaderEvent) -> anyhow::Result<()> { match self { - Self::Follower => { - futures::future::pending::<()>().await; - Ok(()) - } + ClusterControllerState::Follower => Ok(()), + ClusterControllerState::Leader(leader) => leader.on_leader_event(leader_event).await, + } + } + + /// Runs the cluster controller state related tasks. It returns [`LeaderEvent`] which need to + /// be processed by calling [`Self::on_leader_event`]. + pub async fn run(&mut self) -> anyhow::Result { + match self { + Self::Follower => futures::future::pending::>().await, Self::Leader(leader) => leader.run().await, } } @@ -125,6 +126,15 @@ where } } +/// Events that are emitted by a leading cluster controller that need to be processed explicitly +/// because their operations are not cancellation safe. +#[derive(Debug)] +pub enum LeaderEvent { + TrimLogs, + LogsUpdate, + PartitionTableUpdate, +} + pub struct Leader { metadata: Metadata, bifrost: Bifrost, @@ -224,48 +234,82 @@ where create_log_trim_interval(&configuration.admin); } - async fn run(&mut self) -> anyhow::Result<()> { - let bifrost_admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - + async fn run(&mut self) -> anyhow::Result { loop { tokio::select! { _ = self.find_logs_tail_interval.tick() => { self.logs_controller.find_logs_tail(); } _ = OptionFuture::from(self.log_trim_interval.as_mut().map(|interval| interval.tick())) => { - let result = self.trim_logs(bifrost_admin).await; - - if let Err(err) = result { - warn!("Could not trim the logs. This can lead to increased disk usage: {err}"); - } + return Ok(LeaderEvent::TrimLogs); } result = self.logs_controller.run_async_operations() => { result?; } Ok(_) = self.logs_watcher.changed() => { - self.logs_controller.on_logs_update(self.metadata.logs_ref())?; - // tell the scheduler about potentially newly provisioned logs - self.scheduler.on_logs_update(self.logs.live_load(), self.partition_table.live_load()).await? + return Ok(LeaderEvent::LogsUpdate); + } Ok(_) = self.partition_table_watcher.changed() => { - let partition_table = self.partition_table.live_load(); - let logs = self.logs.live_load(); - - self.logs_controller.on_partition_table_update(partition_table); - self.scheduler.on_logs_update(logs, partition_table).await?; + return Ok(LeaderEvent::PartitionTableUpdate); } } } } - async fn trim_logs( - &self, - bifrost_admin: BifrostAdmin<'_>, - ) -> Result<(), restate_bifrost::Error> { + pub async fn on_leader_event(&mut self, leader_event: LeaderEvent) -> anyhow::Result<()> { + match leader_event { + LeaderEvent::TrimLogs => { + self.trim_logs().await; + } + LeaderEvent::LogsUpdate => { + self.on_logs_update().await?; + } + LeaderEvent::PartitionTableUpdate => { + self.on_partition_table_update().await?; + } + } + + Ok(()) + } + + async fn on_logs_update(&mut self) -> anyhow::Result<()> { + self.logs_controller + .on_logs_update(self.metadata.logs_ref())?; + // tell the scheduler about potentially newly provisioned logs + self.scheduler + .on_logs_update(self.logs.live_load(), self.partition_table.live_load()) + .await?; + + Ok(()) + } + + async fn on_partition_table_update(&mut self) -> anyhow::Result<()> { + let partition_table = self.partition_table.live_load(); + let logs = self.logs.live_load(); + + self.logs_controller + .on_partition_table_update(partition_table); + self.scheduler.on_logs_update(logs, partition_table).await?; + + Ok(()) + } + + async fn trim_logs(&self) { + let result = self.trim_logs_inner().await; + + if let Err(err) = result { + warn!("Could not trim the logs. This can lead to increased disk usage: {err}"); + } + } + + async fn trim_logs_inner(&self) -> Result<(), restate_bifrost::Error> { + let bifrost_admin = BifrostAdmin::new( + &self.bifrost, + &self.metadata_writer, + &self.metadata_store_client, + ); + let cluster_state = self.cluster_state_watcher.current(); let mut persisted_lsns_per_partition: BTreeMap<