diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index c9205cf48..539f54927 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -63,6 +63,11 @@ pub struct WorkerOptions { #[serde(flatten)] pub snapshots: SnapshotsOptions, + + /// # Append retry policy + /// + /// Retry policy for appending records to virtual log (bifrost) + pub append_retry_policy: RetryPolicy, } impl WorkerOptions { @@ -98,6 +103,7 @@ impl Default for WorkerOptions { invoker: Default::default(), max_command_batch_size: NonZeroUsize::new(4).expect("Non zero number"), snapshots: SnapshotsOptions::default(), + append_retry_policy: RetryPolicy::fixed_delay(Duration::from_secs(1), None), } } } diff --git a/crates/worker/src/partition/shuffle.rs b/crates/worker/src/partition/shuffle.rs index 6f29ac0f9..73f4905fd 100644 --- a/crates/worker/src/partition/shuffle.rs +++ b/crates/worker/src/partition/shuffle.rs @@ -261,17 +261,20 @@ where } mod state_machine { - use pin_project::pin_project; use std::cmp::Ordering; use std::future::Future; use std::pin::Pin; use std::sync::Arc; - use std::time::Duration; + + use anyhow::Context; + use pin_project::pin_project; use tokio_util::sync::ReusableBoxFuture; use tracing::{debug, trace}; use restate_storage_api::outbox_table::OutboxMessage; + use restate_types::config::Configuration; use restate_types::message::MessageIndex; + use restate_types::retries::{RetryIter, RetryPolicy}; use restate_wal_protocol::Envelope; use crate::partition::shuffle; @@ -291,7 +294,12 @@ mod state_machine { enum State { Idle, ReadingOutbox, - Sending(#[pin] SendFuture, Arc), + Sending { + #[pin] + send_future: SendFuture, + envelope: Arc, + retry: Option>, + }, } #[pin_project] @@ -302,6 +310,7 @@ mod state_machine { read_future: ReadFuture, send_operation: SendOp, hint_rx: &'a mut async_channel::Receiver, + retry_policy: RetryPolicy, #[pin] state: State, } @@ -342,6 +351,7 @@ mod state_machine { read_future: ReusableBoxFuture::new(reading_future), send_operation, hint_rx, + retry_policy: Configuration::pinned().worker.append_retry_policy.clone(), state: State::ReadingOutbox, } } @@ -371,7 +381,11 @@ mod state_machine { this.metadata, )); let send_future = (this.send_operation)(Arc::clone(&envelope)); - this.state.set(State::Sending(send_future, envelope)); + this.state.set(State::Sending { + send_future, + envelope, + retry: Some(this.retry_policy.clone().into_iter()), + }); break; } Ordering::Greater => { @@ -410,20 +424,41 @@ mod state_machine { )); let send_future = (this.send_operation)(Arc::clone(&envelope)); - this.state.set(State::Sending(send_future, envelope)); + this.state.set(State::Sending { + send_future, + envelope, + retry: Some(this.retry_policy.clone().into_iter()), + }); } else { this.state.set(State::Idle); } } - StateProj::Sending(send_future, envelope) => { + StateProj::Sending { + send_future, + envelope, + retry, + } => { if let Err(err) = send_future.await { debug!("Retrying failed shuffle attempt: {err}"); let send_future = (this.send_operation)(Arc::clone(envelope)); let envelope = Arc::clone(envelope); - this.state.set(State::Sending(send_future, envelope)); + let mut retry = retry.take().expect("retry policy is set"); - tokio::time::sleep(Duration::from_secs(1)).await; + match retry.next() { + Some(delay) => { + this.state.set(State::Sending { + send_future, + envelope, + retry: Some(retry), + }); + + tokio::time::sleep(delay).await; + } + None => { + return Err(err).context("Maximum number of retries exhausted"); + } + } } else { let successfully_shuffled_sequence_number = *this.current_sequence_number;