diff --git a/crates/bifrost/src/background_appender.rs b/crates/bifrost/src/background_appender.rs index 6619b6fd4..fa838c92c 100644 --- a/crates/bifrost/src/background_appender.rs +++ b/crates/bifrost/src/background_appender.rs @@ -56,8 +56,8 @@ where } /// Start the background appender as a TaskCenter background task. Note that the task will not - /// autmatically react to TaskCenter's shutdown signal, it gives control over the shutdown - /// behaviour to the the owner of [`AppenderHandle`] to drain or drop when appropriate. + /// automatically react to TaskCenter's shutdown signal, it gives control over the shutdown + /// behaviour to the owner of [`AppenderHandle`] to drain or drop when appropriate. pub fn start( self, task_center: TaskCenter, @@ -213,6 +213,11 @@ impl AppenderHandle { pub fn sender(&self) -> &LogSender { self.sender.as_ref().unwrap() } + + /// Polls the underlying appender task to check whether it has finished or not. + pub async fn poll_appender_task(&mut self) -> Result<()> { + self.inner_handle.as_mut().expect("must be present").await? + } } #[derive(Clone)] diff --git a/crates/worker/src/partition/leadership.rs b/crates/worker/src/partition/leadership.rs index d8be3056b..05a2662b0 100644 --- a/crates/worker/src/partition/leadership.rs +++ b/crates/worker/src/partition/leadership.rs @@ -21,18 +21,17 @@ use std::task::{ready, Context, Poll}; use std::time::{Duration, SystemTime}; use futures::future::OptionFuture; +use futures::never::Never; use futures::stream::FuturesUnordered; use futures::{stream, FutureExt, StreamExt, TryStreamExt}; -use metrics::counter; +use metrics::{counter, Counter}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use tracing::{debug, info, instrument, trace, warn}; +use tracing::{debug, instrument, trace, warn}; use restate_bifrost::{Bifrost, CommitToken}; use restate_core::network::Reciprocal; -use restate_core::{ - metadata, task_center, Metadata, ShutdownError, TaskCenter, TaskHandle, TaskId, TaskKind, -}; +use restate_core::{metadata, task_center, Metadata, ShutdownError, TaskCenter, TaskId, TaskKind}; use restate_errors::NotRunningError; use restate_invoker_api::InvokeInputJournal; use restate_partition_store::PartitionStore; @@ -41,6 +40,7 @@ use restate_storage_api::invocation_status_table::ReadOnlyInvocationStatusTable; use restate_storage_api::outbox_table::{OutboxMessage, OutboxTable}; use restate_storage_api::timer_table::{TimerKey, TimerTable}; use restate_timer::TokioClock; +use restate_types::errors::GenericError; use restate_types::identifiers::{ InvocationId, PartitionKey, PartitionProcessorRpcRequestId, WithPartitionKey, }; @@ -58,7 +58,7 @@ use restate_wal_protocol::control::AnnounceLeader; use restate_wal_protocol::timer::TimerKeyValue; use restate_wal_protocol::{Command, Destination, Envelope, Header, Source}; -use crate::metric_definitions::PARTITION_HANDLE_LEADER_ACTIONS; +use crate::metric_definitions::{PARTITION_ACTUATOR_HANDLED, PARTITION_HANDLE_LEADER_ACTIONS}; use crate::partition::cleaner::Cleaner; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::shuffle::{HintSender, OutboxReaderError, Shuffle, ShuffleMetadata}; @@ -67,6 +67,8 @@ use crate::partition::{respond_to_rpc, shuffle}; const BATCH_READY_UP_TO: usize = 10; +static BIFROST_APPENDER_TASK: &str = "bifrost-appender"; + type TimerService = restate_timer::TimerService; #[derive(Debug, thiserror::Error)] @@ -83,6 +85,38 @@ pub(crate) enum Error { Shutdown(#[from] ShutdownError), #[error("error when self proposing")] SelfProposer, + #[error("task '{name}' failed: {cause}")] + TaskFailed { + name: &'static str, + cause: TaskTermination, + }, +} + +impl Error { + fn task_terminated_unexpectedly(name: &'static str) -> Self { + Error::TaskFailed { + name, + cause: TaskTermination::Unexpected, + } + } + + fn task_failed( + name: &'static str, + err: impl std::error::Error + Send + Sync + 'static, + ) -> Self { + Error::TaskFailed { + name, + cause: TaskTermination::Failure(err.into()), + } + } +} + +#[derive(Debug, derive_more::Display)] +pub(crate) enum TaskTermination { + #[display("unexpected termination")] + Unexpected, + #[display("{}", _0)] + Failure(GenericError), } #[derive(Debug)] @@ -96,6 +130,10 @@ pub(crate) enum ActionEffect { pub(crate) struct LeaderState { leader_epoch: LeaderEpoch, + // only needed for proposing TruncateOutbox to ourselves + own_partition_key: PartitionKey, + action_effects_counter: Counter, + shuffle_hint_tx: HintSender, shuffle_task_id: TaskId, timer_service: Pin>, @@ -113,11 +151,115 @@ pub(crate) struct LeaderState { cleaner_task_id: TaskId, } -pub enum State { +impl LeaderState { + /// Runs the leader specific task which is the processing of action effects and the monitoring + /// of unmanaged tasks. + async fn run(&mut self) -> Result { + let timer_stream = std::pin::pin!(stream::unfold( + &mut self.timer_service, + |timer_service| async { + let timer_value = timer_service.as_mut().next_timer().await; + Some((ActionEffect::Timer(timer_value), timer_service)) + } + )); + + let invoker_stream = (&mut self.invoker_stream).map(ActionEffect::Invoker); + let shuffle_stream = (&mut self.shuffle_stream).map(ActionEffect::Shuffle); + let action_effects_stream = stream::unfold( + &mut self.pending_cleanup_timers_to_schedule, + |pending_cleanup_timers_to_schedule| { + let result = pending_cleanup_timers_to_schedule.pop_front(); + future::ready(result.map(|(invocation_id, duration)| { + ( + ActionEffect::ScheduleCleanupTimer(invocation_id, duration), + pending_cleanup_timers_to_schedule, + ) + })) + }, + ) + .fuse(); + let awaiting_rpc_self_propose_stream = + (&mut self.awaiting_rpc_self_propose).map(|_| ActionEffect::AwaitingRpcSelfProposeDone); + + let all_streams = futures::stream_select!( + invoker_stream, + shuffle_stream, + timer_stream, + action_effects_stream, + awaiting_rpc_self_propose_stream + ); + let mut all_streams = all_streams.ready_chunks(BATCH_READY_UP_TO); + + loop { + tokio::select! { + Some(action_effects) = all_streams.next() => { + self.action_effects_counter.increment(u64::try_from(action_effects.len()).expect("usize fits into u64")); + LeaderState::handle_action_effects(&mut self.self_proposer, self.own_partition_key, action_effects).await?; + }, + result = self.self_proposer.poll_appender_task() => { + return result; + } + } + } + } + + async fn handle_action_effects( + self_proposer: &mut SelfProposer, + own_partition_key: PartitionKey, + action_effects: impl IntoIterator, + ) -> Result<(), Error> { + for effect in action_effects { + match effect { + ActionEffect::Invoker(invoker_effect) => { + self_proposer + .propose( + invoker_effect.invocation_id.partition_key(), + Command::InvokerEffect(invoker_effect), + ) + .await?; + } + ActionEffect::Shuffle(outbox_truncation) => { + // todo: Until we support partition splits we need to get rid of outboxes or introduce partition + // specific destination messages that are identified by a partition_id + self_proposer + .propose( + own_partition_key, + Command::TruncateOutbox(outbox_truncation.index()), + ) + .await?; + } + ActionEffect::Timer(timer) => { + self_proposer + .propose(timer.invocation_id().partition_key(), Command::Timer(timer)) + .await?; + } + ActionEffect::ScheduleCleanupTimer(invocation_id, duration) => { + self_proposer + .propose( + invocation_id.partition_key(), + Command::ScheduleTimer(TimerKeyValue::clean_invocation_status( + MillisSinceEpoch::from(SystemTime::now() + duration), + invocation_id, + )), + ) + .await?; + } + ActionEffect::AwaitingRpcSelfProposeDone => { + // Nothing to do here + } + } + } + + Ok(()) + } +} + +enum State { Follower, Candidate { leader_epoch: LeaderEpoch, - appender_task: TaskHandle>, + // to be able to move out of it + self_proposer: Option, }, Leader(LeaderState), } @@ -218,88 +360,39 @@ where Ok(()) } - async fn announce_leadership( - &mut self, - leader_epoch: LeaderEpoch, - ) -> Result<(), ShutdownError> { - let header = Header { - dest: Destination::Processor { - partition_key: *self - .partition_processor_metadata + async fn announce_leadership(&mut self, leader_epoch: LeaderEpoch) -> Result<(), Error> { + let announce_leader = Command::AnnounceLeader(AnnounceLeader { + // todo: Still need to write generational id for supporting rolling back, can be removed + // with the next release. + node_id: Some(self.partition_processor_metadata.node_id), + leader_epoch, + partition_key_range: Some( + self.partition_processor_metadata .partition_key_range - .start(), - dedup: Some(DedupInformation::self_proposal(EpochSequenceNumber::new( - leader_epoch, - ))), - }, - source: Source::Processor { - partition_id: self.partition_processor_metadata.partition_id, - partition_key: Some( - *self - .partition_processor_metadata - .partition_key_range - .start(), - ), - leader_epoch, - // Kept for backward compatibility. - node_id: self.partition_processor_metadata.node_id.as_plain(), - generational_node_id: Some(self.partition_processor_metadata.node_id), - }, - }; + .clone(), + ), + }); - let envelope = Envelope::new( - header, - Command::AnnounceLeader(AnnounceLeader { - // todo: Still need to write generational id for supporting rolling back, can be removed - // with the next release. - node_id: Some(self.partition_processor_metadata.node_id), - leader_epoch, - partition_key_range: Some( - self.partition_processor_metadata - .partition_key_range - .clone(), - ), - }), - ); + let mut self_proposer = SelfProposer::new( + self.partition_processor_metadata.partition_id, + EpochSequenceNumber::new(leader_epoch), + &self.bifrost, + metadata(), + )?; - let envelope = Arc::new(envelope); - let log_id = LogId::from(self.partition_processor_metadata.partition_id); - let bifrost = self.bifrost.clone(); - - // todo replace with background appender and allowing PP to gracefully fail w/o stopping the process - let appender_task = task_center().spawn_unmanaged(TaskKind::Background, "announce-leadership", Some(self.partition_processor_metadata.partition_id), async move { - loop { - // further instructions/commands from PP manager. - match bifrost.append(log_id, Arc::clone(&envelope)).await { - // only stop on shutdown - Err(restate_bifrost::Error::Shutdown(_)) => return Err(ShutdownError), - Err(e) => { - info!( - %log_id, - %leader_epoch, - ?e, - "Failed to write the announce leadership message to bifrost. Retrying." - ); - // todo: retry with backoff. At the moment, this is very aggressive (intentionally) - // to avoid blocking for too long. - tokio::time::sleep(Duration::from_millis(250)).await; - } - Ok(lsn) => { - debug!( - %log_id, - %leader_epoch, - %lsn, - "Written announce leadership message to bifrost." - ); - return Ok(()); - } - } - } - })?; + self_proposer + .propose( + *self + .partition_processor_metadata + .partition_key_range + .start(), + announce_leader, + ) + .await?; self.state = State::Candidate { leader_epoch, - appender_task, + self_proposer: Some(self_proposer), }; Ok(()) @@ -361,10 +454,17 @@ where } async fn become_leader(&mut self, partition_store: &mut PartitionStore) -> Result<(), Error> { - if let State::Candidate { leader_epoch, .. } = self.state { + if let State::Candidate { + leader_epoch, + self_proposer, + } = &mut self.state + { let invoker_rx = Self::resume_invoked_invocations( &mut self.invoker_tx, - (self.partition_processor_metadata.partition_id, leader_epoch), + ( + self.partition_processor_metadata.partition_id, + *leader_epoch, + ), self.partition_processor_metadata .partition_key_range .clone(), @@ -384,7 +484,7 @@ where let shuffle = Shuffle::new( ShuffleMetadata::new( self.partition_processor_metadata.partition_id, - leader_epoch, + *leader_epoch, self.partition_processor_metadata.node_id, ), OutboxReader::from(partition_store.clone()), @@ -402,16 +502,9 @@ where shuffle.run(), )?; - let self_proposer = SelfProposer::new( - self.partition_processor_metadata.partition_id, - EpochSequenceNumber::new(leader_epoch), - &self.bifrost, - metadata(), - )?; - let cleaner = Cleaner::new( self.partition_processor_metadata.partition_id, - leader_epoch, + *leader_epoch, self.partition_processor_metadata.node_id, partition_store.clone(), self.bifrost.clone(), @@ -429,12 +522,17 @@ where )?; self.state = State::Leader(LeaderState { - leader_epoch, + leader_epoch: *leader_epoch, + own_partition_key: *self + .partition_processor_metadata + .partition_key_range + .start(), + action_effects_counter: counter!(PARTITION_ACTUATOR_HANDLED), shuffle_task_id, cleaner_task_id, shuffle_hint_tx, timer_service, - self_proposer, + self_proposer: self_proposer.take().expect("must be present"), awaiting_rpc_actions: Default::default(), awaiting_rpc_self_propose: Default::default(), invoker_stream: ReceiverStream::new(invoker_rx), @@ -496,8 +594,8 @@ where State::Follower => { // nothing to do :-) } - State::Candidate { appender_task, .. } => { - appender_task.abort(); + State::Candidate { .. } => { + // nothing to do :-) } State::Leader(LeaderState { leader_epoch, @@ -694,116 +792,25 @@ where Ok(()) } - pub async fn next_action_effects(&mut self) -> Option> { + /// Runs the leadership state tasks. This depends on the current state value: + /// + /// * Follower: Nothing to do + /// * Candidate: Monitor appender task + /// * Leader: Process action effects and monitor appender task + pub async fn run(&mut self) -> Result { match &mut self.state { - State::Follower | State::Candidate { .. } => None, - State::Leader(leader_state) => { - let timer_stream = std::pin::pin!(stream::unfold( - &mut leader_state.timer_service, - |timer_service| async { - let timer_value = timer_service.as_mut().next_timer().await; - Some((ActionEffect::Timer(timer_value), timer_service)) - } - )); - - let invoker_stream = (&mut leader_state.invoker_stream).map(ActionEffect::Invoker); - let shuffle_stream = (&mut leader_state.shuffle_stream).map(ActionEffect::Shuffle); - let action_effects_stream = stream::unfold( - &mut leader_state.pending_cleanup_timers_to_schedule, - |pending_cleanup_timers_to_schedule| { - let result = pending_cleanup_timers_to_schedule.pop_front(); - future::ready(result.map(|(invocation_id, duration)| { - ( - ActionEffect::ScheduleCleanupTimer(invocation_id, duration), - pending_cleanup_timers_to_schedule, - ) - })) - }, - ) - .fuse(); - let awaiting_rpc_self_propose_stream = (&mut leader_state - .awaiting_rpc_self_propose) - .map(|_| ActionEffect::AwaitingRpcSelfProposeDone); - - let all_streams = futures::stream_select!( - invoker_stream, - shuffle_stream, - timer_stream, - action_effects_stream, - awaiting_rpc_self_propose_stream - ); - let mut all_streams = all_streams.ready_chunks(BATCH_READY_UP_TO); - all_streams.next().await + State::Follower => Ok(futures::future::pending::().await), + State::Candidate { self_proposer, .. } => { + self_proposer + .as_mut() + .expect("must be present") + .poll_appender_task() + .await } + State::Leader(leader_state) => leader_state.run().await, } } - pub async fn handle_action_effect( - &mut self, - action_effects: impl IntoIterator, - ) -> anyhow::Result<()> { - match &mut self.state { - State::Follower | State::Candidate { .. } => { - // nothing to do :-) - } - State::Leader(leader_state) => { - for effect in action_effects { - match effect { - ActionEffect::Invoker(invoker_effect) => { - leader_state - .self_proposer - .propose( - invoker_effect.invocation_id.partition_key(), - Command::InvokerEffect(invoker_effect), - ) - .await?; - } - ActionEffect::Shuffle(outbox_truncation) => { - // todo: Until we support partition splits we need to get rid of outboxes or introduce partition - // specific destination messages that are identified by a partition_id - leader_state - .self_proposer - .propose( - *self - .partition_processor_metadata - .partition_key_range - .start(), - Command::TruncateOutbox(outbox_truncation.index()), - ) - .await?; - } - ActionEffect::Timer(timer) => { - leader_state - .self_proposer - .propose( - timer.invocation_id().partition_key(), - Command::Timer(timer), - ) - .await?; - } - ActionEffect::ScheduleCleanupTimer(invocation_id, duration) => { - leader_state - .self_proposer - .propose( - invocation_id.partition_key(), - Command::ScheduleTimer(TimerKeyValue::clean_invocation_status( - MillisSinceEpoch::from(SystemTime::now() + duration), - invocation_id, - )), - ) - .await?; - } - ActionEffect::AwaitingRpcSelfProposeDone => { - // Nothing to do here - } - } - } - } - }; - - Ok(()) - } - pub async fn handle_rpc_proposal_command( &mut self, request_id: PartitionProcessorRpcRequestId, @@ -1035,8 +1042,8 @@ impl SelfProposer { } fn create_header(&mut self, partition_key: PartitionKey) -> Header { - let esn = self.epoch_sequence_number.next(); - self.epoch_sequence_number = esn; + let esn = self.epoch_sequence_number; + self.epoch_sequence_number = self.epoch_sequence_number.next(); let my_node_id = self.metadata.my_node_id(); Header { @@ -1048,11 +1055,21 @@ impl SelfProposer { partition_id: self.partition_id, partition_key: Some(partition_key), leader_epoch: self.epoch_sequence_number.leader_epoch, + // Kept for backward compatibility. node_id: my_node_id.as_plain(), generational_node_id: Some(my_node_id), }, } } + + async fn poll_appender_task(&mut self) -> Result { + let result = self.bifrost_appender.poll_appender_task().await; + + Err(match result { + Ok(()) => Error::task_terminated_unexpectedly(BIFROST_APPENDER_TASK), + Err(err) => Error::task_failed(BIFROST_APPENDER_TASK, err), + }) + } } #[derive(Debug, derive_more::From)] diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 9e7e00099..329bb1a81 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -16,7 +16,7 @@ use std::time::{Duration, Instant}; use anyhow::Context; use assert2::let_assert; use futures::{FutureExt, Stream, StreamExt, TryStreamExt as _}; -use metrics::{counter, histogram}; +use metrics::histogram; use tokio::sync::{mpsc, watch}; use tokio::time::MissedTickBehavior; use tracing::{debug, error, info, instrument, trace, warn, Span}; @@ -63,8 +63,8 @@ use restate_wal_protocol::control::AnnounceLeader; use restate_wal_protocol::{Command, Destination, Envelope, Header, Source}; use crate::metric_definitions::{ - PARTITION_ACTUATOR_HANDLED, PARTITION_LABEL, PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION, - PP_APPLY_COMMAND_BATCH_SIZE, PP_APPLY_COMMAND_DURATION, + PARTITION_LABEL, PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION, PP_APPLY_COMMAND_BATCH_SIZE, + PP_APPLY_COMMAND_DURATION, }; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::leadership::{LeadershipState, PartitionProcessorMetadata}; @@ -382,7 +382,6 @@ where let record_actions_latency = histogram!(PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION); let command_batch_size = histogram!(PP_APPLY_COMMAND_BATCH_SIZE, PARTITION_LABEL => partition_id_str); - let actuator_effects_handled = counter!(PARTITION_ACTUATOR_HANDLED); let mut action_collector = ActionCollector::default(); let mut command_buffer = Vec::with_capacity(self.max_command_batch_size); @@ -468,10 +467,9 @@ where self.leadership_state.handle_actions(action_collector.drain(..)).await?; record_actions_latency.record(actions_start.elapsed()); }, - Some(action_effects) = self.leadership_state.next_action_effects() => { - actuator_effects_handled.increment(action_effects.len() as u64); - self.leadership_state.handle_action_effect(action_effects).await?; - }, + result = self.leadership_state.run() => { + result?; + } } // Allow other tasks on this thread to run, but only if we have exhausted the coop // budget.