Skip to content

Commit

Permalink
Run non-cancellation safe cluster controller leader tasks in select e…
Browse files Browse the repository at this point in the history
…xpression

To avoid losing progress, this commit runs all cluster controller leader tasks that
are not cancellation safe as part of the top-level select expression instead of a
select arm. Select arms can be cancelled and therefore we would lose progress in this
case. This applies to trimming the logs and updating scheduler when there is a logs
or partition table update.
  • Loading branch information
tillrohrmann committed Nov 22, 2024
1 parent f846f1c commit 7c7b16b
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 39 deletions.
8 changes: 4 additions & 4 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@

mod nodeset_selection;

use futures::never::Never;
use rand::prelude::IteratorRandom;
use rand::{thread_rng, RngCore};
use std::collections::HashMap;
use std::iter;
use std::num::NonZeroU8;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

use rand::prelude::IteratorRandom;
use rand::{thread_rng, RngCore};
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tracing::{debug, trace, trace_span, Instrument};
Expand Down Expand Up @@ -1210,7 +1210,7 @@ impl LogsController {
});
}

pub async fn run_async_operations(&mut self) -> Result<()> {
pub async fn run_async_operations(&mut self) -> Result<Never> {
loop {
if self.async_operations.is_empty() {
futures::future::pending().await
Expand Down
3 changes: 2 additions & 1 deletion crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ impl<T: TransportConnect> Service<T> {
state.reconfigure(configuration);
}
result = state.run() => {
result?
let leader_event = result?;
state.on_leader_event(leader_event).await?;
}
_ = &mut shutdown => {
self.health_status.update(AdminStatus::Unknown);
Expand Down
112 changes: 78 additions & 34 deletions crates/admin/src/cluster_controller/service/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,19 @@ where

Ok(())
}
}

impl<T> ClusterControllerState<T>
where
T: TransportConnect,
{
pub async fn run(&mut self) -> anyhow::Result<()> {
pub async fn on_leader_event(&mut self, leader_event: LeaderEvent) -> anyhow::Result<()> {
match self {
Self::Follower => {
futures::future::pending::<()>().await;
Ok(())
}
ClusterControllerState::Follower => Ok(()),
ClusterControllerState::Leader(leader) => leader.on_leader_event(leader_event).await,
}
}

/// Runs the cluster controller state related tasks. It returns [`LeaderEvent`] which need to
/// be processed by calling [`Self::on_leader_event`].
pub async fn run(&mut self) -> anyhow::Result<LeaderEvent> {
match self {
Self::Follower => futures::future::pending::<anyhow::Result<_>>().await,
Self::Leader(leader) => leader.run().await,
}
}
Expand All @@ -125,6 +126,15 @@ where
}
}

/// Events that are emitted by a leading cluster controller that need to be processed explicitly
/// because their operations are not cancellation safe.
#[derive(Debug)]
pub enum LeaderEvent {
TrimLogs,
LogsUpdate,
PartitionTableUpdate,
}

pub struct Leader<T> {
metadata: Metadata,
bifrost: Bifrost,
Expand Down Expand Up @@ -224,48 +234,82 @@ where
create_log_trim_interval(&configuration.admin);
}

async fn run(&mut self) -> anyhow::Result<()> {
let bifrost_admin = BifrostAdmin::new(
&self.bifrost,
&self.metadata_writer,
&self.metadata_store_client,
);

async fn run(&mut self) -> anyhow::Result<LeaderEvent> {
loop {
tokio::select! {
_ = self.find_logs_tail_interval.tick() => {
self.logs_controller.find_logs_tail();
}
_ = OptionFuture::from(self.log_trim_interval.as_mut().map(|interval| interval.tick())) => {
let result = self.trim_logs(bifrost_admin).await;

if let Err(err) = result {
warn!("Could not trim the logs. This can lead to increased disk usage: {err}");
}
return Ok(LeaderEvent::TrimLogs);
}
result = self.logs_controller.run_async_operations() => {
result?;
}
Ok(_) = self.logs_watcher.changed() => {
self.logs_controller.on_logs_update(self.metadata.logs_ref())?;
// tell the scheduler about potentially newly provisioned logs
self.scheduler.on_logs_update(self.logs.live_load(), self.partition_table.live_load()).await?
return Ok(LeaderEvent::LogsUpdate);

}
Ok(_) = self.partition_table_watcher.changed() => {
let partition_table = self.partition_table.live_load();
let logs = self.logs.live_load();

self.logs_controller.on_partition_table_update(partition_table);
self.scheduler.on_logs_update(logs, partition_table).await?;
return Ok(LeaderEvent::PartitionTableUpdate);
}
}
}
}

async fn trim_logs(
&self,
bifrost_admin: BifrostAdmin<'_>,
) -> Result<(), restate_bifrost::Error> {
pub async fn on_leader_event(&mut self, leader_event: LeaderEvent) -> anyhow::Result<()> {
match leader_event {
LeaderEvent::TrimLogs => {
self.trim_logs().await;
}
LeaderEvent::LogsUpdate => {
self.on_logs_update().await?;
}
LeaderEvent::PartitionTableUpdate => {
self.on_partition_table_update().await?;
}
}

Ok(())
}

async fn on_logs_update(&mut self) -> anyhow::Result<()> {
self.logs_controller
.on_logs_update(self.metadata.logs_ref())?;
// tell the scheduler about potentially newly provisioned logs
self.scheduler
.on_logs_update(self.logs.live_load(), self.partition_table.live_load())
.await?;

Ok(())
}

async fn on_partition_table_update(&mut self) -> anyhow::Result<()> {
let partition_table = self.partition_table.live_load();
let logs = self.logs.live_load();

self.logs_controller
.on_partition_table_update(partition_table);
self.scheduler.on_logs_update(logs, partition_table).await?;

Ok(())
}

async fn trim_logs(&self) {
let result = self.trim_logs_inner().await;

if let Err(err) = result {
warn!("Could not trim the logs. This can lead to increased disk usage: {err}");
}
}

async fn trim_logs_inner(&self) -> Result<(), restate_bifrost::Error> {
let bifrost_admin = BifrostAdmin::new(
&self.bifrost,
&self.metadata_writer,
&self.metadata_store_client,
);

let cluster_state = self.cluster_state_watcher.current();

let mut persisted_lsns_per_partition: BTreeMap<
Expand Down

0 comments on commit 7c7b16b

Please sign in to comment.