Skip to content

Commit

Permalink
Avoid blocking in the PartitionProcessorManager when being in the con…
Browse files Browse the repository at this point in the history
…trol loop

Summary:
- Improve tracing of ControlProcessors commands
- Run the spawn of partition processors concurrently
- Avoid blocking partition processor manager main loop

While testing I noticed that InvokerService creations takes
around 25ms (on my machine) to be created with `InvokerService::from_options`

I traced the delay to a single line of code here
https://github.com/restatedev/restate/blob/1cb01203f18bfc6ddb5fe4c45f6697799a79c27e/crates/service-client/src/http.rs#L73

My first thought was to cache the `HttpsConnectorBuilder` but this won't work because `HttpsConnectorBuilder` is not
clonable.

Fixes #2146
  • Loading branch information
muhamadazmy committed Nov 6, 2024
1 parent 206e3ee commit 0b91fdf
Show file tree
Hide file tree
Showing 11 changed files with 429 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
NodeState::Alive(AliveNode {
last_heartbeat_at: MillisSinceEpoch::now(),
generational_node_id: peer,
partitions: msg.paritions_processor_state.unwrap_or_default(),
partitions: msg.partition_processor_state.unwrap_or_default(),
}),
);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ mod tests {

let state = [(PartitionId::MIN, partition_processor_status)].into();
let response = msg.to_rpc_response(NodeStateResponse {
paritions_processor_state: Some(state),
partition_processor_state: Some(state),
});

// We are not really sending something back to target, we just need to provide a known
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::{Arc, OnceLock};
use arc_swap::{ArcSwap, AsRaw};
use enum_map::EnumMap;
use tokio::sync::{mpsc, oneshot, watch};
use tracing::instrument;

use restate_types::live::{Live, Pinned};
use restate_types::logs::metadata::Logs;
Expand Down Expand Up @@ -175,6 +176,7 @@ impl Metadata {
}

/// Returns when the metadata kind is at the provided version (or newer)
#[instrument(level = "debug", skip(self))]
pub async fn wait_for_version(
&self,
metadata_kind: MetadataKind,
Expand Down
6 changes: 5 additions & 1 deletion crates/core/src/network/partition_processor_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub enum PartitionProcessorRpcClientError {
Busy,
#[error("internal error: {0}")]
Internal(String),
#[error("partition processor starting")]
Starting,
}

impl PartitionProcessorRpcClientError {
Expand All @@ -60,7 +62,8 @@ impl PartitionProcessorRpcClientError {
)
| PartitionProcessorRpcClientError::UnknownPartition(_)
| PartitionProcessorRpcClientError::UnknownNode(_)
| PartitionProcessorRpcClientError::NotLeader(_) => {
| PartitionProcessorRpcClientError::NotLeader(_)
| PartitionProcessorRpcClientError::Starting => {
// These are pre-flight error that we can distinguish,
// and for which we know for certain that no message was proposed yet to the log.
true
Expand All @@ -83,6 +86,7 @@ impl From<PartitionProcessorRpcError> for PartitionProcessorRpcClientError {
PartitionProcessorRpcError::Internal(msg) => {
PartitionProcessorRpcClientError::Internal(msg)
}
PartitionProcessorRpcError::Starting => PartitionProcessorRpcClientError::Starting,
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,23 @@ impl TaskCenter {
Ok(result)
}

// Spawn a future in its own thread
pub fn spawn_blocking_unmanaged<F, O>(
&self,
name: &'static str,
partition_id: Option<PartitionId>,
future: F,
) -> tokio::task::JoinHandle<O>
where
F: Future<Output = O> + Send + 'static,
O: Send + 'static,
{
let tc = self.clone();
self.inner
.default_runtime_handle
.spawn_blocking(move || tc.block_on(name, partition_id, future))
}

/// Cancelling the child will not cancel the parent. Note that parent task will not
/// wait for children tasks. The parent task is allowed to finish before children.
#[track_caller]
Expand Down
4 changes: 2 additions & 2 deletions crates/node/src/roles/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl BaseRole {
&self,
msg: Incoming<GetNodeState>,
) -> Result<(), ShutdownError> {
let parition_state = if let Some(ref handle) = self.processor_manager_handle {
let partition_state = if let Some(ref handle) = self.processor_manager_handle {
Some(handle.get_state().await?)
} else {
None
Expand All @@ -82,7 +82,7 @@ impl BaseRole {
// only return error if Shutdown
if let Err(NetworkError::Shutdown(err)) = msg
.to_rpc_response(NodeStateResponse {
paritions_processor_state: parition_state,
partition_processor_state: partition_state,
})
.try_send()
.map_err(|err| err.source)
Expand Down
4 changes: 3 additions & 1 deletion crates/types/src/cluster/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ pub struct DeadNode {
pub last_seen_alive: Option<MillisSinceEpoch>,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto)]
#[derive(
Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto, derive_more::Display,
)]
#[proto(target = "crate::protobuf::cluster::RunMode")]
pub enum RunMode {
Leader,
Expand Down
2 changes: 1 addition & 1 deletion crates/types/src/net/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ pub struct GetNodeState {}
pub struct NodeStateResponse {
/// State of paritions processor per parition. Is set to None if this node is not a `Worker` node
#[serde_as(as = "Option<serde_with::Seq<(_, _)>>")]
pub paritions_processor_state: Option<BTreeMap<PartitionId, PartitionProcessorStatus>>,
pub partition_processor_state: Option<BTreeMap<PartitionId, PartitionProcessorStatus>>,
}
2 changes: 2 additions & 0 deletions crates/types/src/net/partition_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub enum PartitionProcessorRpcError {
Busy,
#[error("internal error: {0}")]
Internal(String),
#[error("partition processor starting")]
Starting,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion crates/types/src/net/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct ControlProcessor {
pub command: ProcessorCommand,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, derive_more::Display)]
pub enum ProcessorCommand {
Stop,
Follower,
Expand Down
Loading

0 comments on commit 0b91fdf

Please sign in to comment.