Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce retry-policy to shuffle #2228

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/types/src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ pub struct WorkerOptions {

#[serde(flatten)]
pub snapshots: SnapshotsOptions,

/// # Append retry policy
///
/// Retry policy for appending records to virtual log (bifrost)
pub append_retry_policy: RetryPolicy,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why we shouldn't use bifrost's append_retry_policy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly wasn't sure if the bifrost append retry policy should be reused here. In my mind the two polices are not related since one of them is internal to bifrost operation, while this one is from a perspective of a bifrost user (the shuffler in this case)

That being said, I am totally fine to drop this one and reuse the one from bifrost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, we are already using it once we call into the Bifrost::append method, right?

I've introduced the infinite retry policy in the shuffle for the demo. Since we can now handle partition processor errors in the partition processor manager, we could also say that we don't retry outside of the bifrost retries and fail the shuffle and thereby the pp if it fails to append entries. Then it would be the responsibility of the PPM and the CC to decide whether to restart the PP or not. Or we retry a few times in the shuffle and only then give up.

}

impl WorkerOptions {
Expand Down Expand Up @@ -98,6 +103,7 @@ impl Default for WorkerOptions {
invoker: Default::default(),
max_command_batch_size: NonZeroUsize::new(4).expect("Non zero number"),
snapshots: SnapshotsOptions::default(),
append_retry_policy: RetryPolicy::fixed_delay(Duration::from_secs(1), None),
}
}
}
Expand Down
51 changes: 43 additions & 8 deletions crates/worker/src/partition/shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,17 +261,20 @@ where
}

mod state_machine {
use pin_project::pin_project;
use std::cmp::Ordering;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use pin_project::pin_project;
use tokio_util::sync::ReusableBoxFuture;
use tracing::{debug, trace};

use restate_storage_api::outbox_table::OutboxMessage;
use restate_types::config::Configuration;
use restate_types::message::MessageIndex;
use restate_types::retries::{RetryIter, RetryPolicy};
use restate_wal_protocol::Envelope;

use crate::partition::shuffle;
Expand All @@ -291,7 +294,12 @@ mod state_machine {
enum State<SendFuture> {
Idle,
ReadingOutbox,
Sending(#[pin] SendFuture, Arc<Envelope>),
Sending {
#[pin]
send_future: SendFuture,
envelope: Arc<Envelope>,
retry: Option<RetryIter<'static>>,
},
}

#[pin_project]
Expand All @@ -302,6 +310,7 @@ mod state_machine {
read_future: ReadFuture<OutboxReader>,
send_operation: SendOp,
hint_rx: &'a mut async_channel::Receiver<NewOutboxMessage>,
retry_policy: RetryPolicy,
#[pin]
state: State<SendFuture>,
}
Expand Down Expand Up @@ -342,6 +351,7 @@ mod state_machine {
read_future: ReusableBoxFuture::new(reading_future),
send_operation,
hint_rx,
retry_policy: Configuration::pinned().worker.append_retry_policy.clone(),
state: State::ReadingOutbox,
}
}
Expand Down Expand Up @@ -371,7 +381,11 @@ mod state_machine {
this.metadata,
));
let send_future = (this.send_operation)(Arc::clone(&envelope));
this.state.set(State::Sending(send_future, envelope));
this.state.set(State::Sending {
send_future,
envelope,
retry: Some(this.retry_policy.clone().into_iter()),
});
break;
}
Ordering::Greater => {
Expand Down Expand Up @@ -410,20 +424,41 @@ mod state_machine {
));
let send_future = (this.send_operation)(Arc::clone(&envelope));

this.state.set(State::Sending(send_future, envelope));
this.state.set(State::Sending {
send_future,
envelope,
retry: Some(this.retry_policy.clone().into_iter()),
});
} else {
this.state.set(State::Idle);
}
}
StateProj::Sending(send_future, envelope) => {
StateProj::Sending {
send_future,
envelope,
retry,
} => {
if let Err(err) = send_future.await {
debug!("Retrying failed shuffle attempt: {err}");

let send_future = (this.send_operation)(Arc::clone(envelope));
let envelope = Arc::clone(envelope);
this.state.set(State::Sending(send_future, envelope));
let mut retry = retry.take().expect("retry policy is set");

tokio::time::sleep(Duration::from_secs(1)).await;
match retry.next() {
Some(delay) => {
this.state.set(State::Sending {
send_future,
envelope,
retry: Some(retry),
});

tokio::time::sleep(delay).await;
}
None => {
return Err(err).context("Maximum number of retries exhausted");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to support finite restart strategies, then we need to change the following things: Make sure that we are not running in a TaskKind that panics on errors and we need to manage the task so that the owner (partition processor) learns about the failed task and can react to it (e.g. in the simplest case propagating it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, The only reason I thought this is okay was because this function can return Result. But indeed as you said this will cause eventually a Shutdown of the node.

But I am wondering now if the shuffler should actually implement a retry at all after your comment and the fact that bifrost will retry forever

}
}
} else {
let successfully_shuffled_sequence_number =
*this.current_sequence_number;
Expand Down
Loading