Skip to content

Commit

Permalink
Avoid blocking in the PartitionProcessorManager when being in the con…
Browse files Browse the repository at this point in the history
…trol loop

Summary:
- Improve tracing of ControlProcessors commands
- Run the spawn of partition processors concurrently
- Avoid blocking partition processor manager main loop

While testing I noticed that InvokerService creations takes
around 25ms (on my machine) to be created with `InvokerService::from_options`

I traced the delay to a single line of code here
https://github.com/restatedev/restate/blob/1cb01203f18bfc6ddb5fe4c45f6697799a79c27e/crates/service-client/src/http.rs#L73

My first thought was to cache the `HttpsConnectorBuilder` but this won't work because `HttpsConnectorBuilder` is not
clonable.

Fixes #2146
  • Loading branch information
muhamadazmy committed Nov 5, 2024
1 parent 206e3ee commit 660b455
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
NodeState::Alive(AliveNode {
last_heartbeat_at: MillisSinceEpoch::now(),
generational_node_id: peer,
partitions: msg.paritions_processor_state.unwrap_or_default(),
partitions: msg.partition_processor_state.unwrap_or_default(),
}),
);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ mod tests {

let state = [(PartitionId::MIN, partition_processor_status)].into();
let response = msg.to_rpc_response(NodeStateResponse {
paritions_processor_state: Some(state),
partition_processor_state: Some(state),
});

// We are not really sending something back to target, we just need to provide a known
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::{Arc, OnceLock};
use arc_swap::{ArcSwap, AsRaw};
use enum_map::EnumMap;
use tokio::sync::{mpsc, oneshot, watch};
use tracing::instrument;

use restate_types::live::{Live, Pinned};
use restate_types::logs::metadata::Logs;
Expand Down Expand Up @@ -175,6 +176,7 @@ impl Metadata {
}

/// Returns when the metadata kind is at the provided version (or newer)
#[instrument(level = "debug", skip(self))]
pub async fn wait_for_version(
&self,
metadata_kind: MetadataKind,
Expand Down
6 changes: 5 additions & 1 deletion crates/core/src/network/partition_processor_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub enum PartitionProcessorRpcClientError {
Busy,
#[error("internal error: {0}")]
Internal(String),
#[error("partition processor starting")]
Starting,
}

impl PartitionProcessorRpcClientError {
Expand All @@ -60,7 +62,8 @@ impl PartitionProcessorRpcClientError {
)
| PartitionProcessorRpcClientError::UnknownPartition(_)
| PartitionProcessorRpcClientError::UnknownNode(_)
| PartitionProcessorRpcClientError::NotLeader(_) => {
| PartitionProcessorRpcClientError::NotLeader(_)
| PartitionProcessorRpcClientError::Starting => {
// These are pre-flight error that we can distinguish,
// and for which we know for certain that no message was proposed yet to the log.
true
Expand All @@ -83,6 +86,7 @@ impl From<PartitionProcessorRpcError> for PartitionProcessorRpcClientError {
PartitionProcessorRpcError::Internal(msg) => {
PartitionProcessorRpcClientError::Internal(msg)
}
PartitionProcessorRpcError::Starting => PartitionProcessorRpcClientError::Starting,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/node/src/roles/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl BaseRole {
&self,
msg: Incoming<GetNodeState>,
) -> Result<(), ShutdownError> {
let parition_state = if let Some(ref handle) = self.processor_manager_handle {
let partition_state = if let Some(ref handle) = self.processor_manager_handle {
Some(handle.get_state().await?)
} else {
None
Expand All @@ -82,7 +82,7 @@ impl BaseRole {
// only return error if Shutdown
if let Err(NetworkError::Shutdown(err)) = msg
.to_rpc_response(NodeStateResponse {
paritions_processor_state: parition_state,
partition_processor_state: partition_state,
})
.try_send()
.map_err(|err| err.source)
Expand Down
4 changes: 3 additions & 1 deletion crates/types/src/cluster/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ pub struct DeadNode {
pub last_seen_alive: Option<MillisSinceEpoch>,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto)]
#[derive(
Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto, derive_more::Display,
)]
#[proto(target = "crate::protobuf::cluster::RunMode")]
pub enum RunMode {
Leader,
Expand Down
2 changes: 1 addition & 1 deletion crates/types/src/net/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ pub struct GetNodeState {}
pub struct NodeStateResponse {
/// State of paritions processor per parition. Is set to None if this node is not a `Worker` node
#[serde_as(as = "Option<serde_with::Seq<(_, _)>>")]
pub paritions_processor_state: Option<BTreeMap<PartitionId, PartitionProcessorStatus>>,
pub partition_processor_state: Option<BTreeMap<PartitionId, PartitionProcessorStatus>>,
}
2 changes: 2 additions & 0 deletions crates/types/src/net/partition_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub enum PartitionProcessorRpcError {
Busy,
#[error("internal error: {0}")]
Internal(String),
#[error("partition processor starting")]
Starting,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion crates/types/src/net/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct ControlProcessor {
pub command: ProcessorCommand,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, derive_more::Display)]
pub enum ProcessorCommand {
Stop,
Follower,
Expand Down
Loading

0 comments on commit 660b455

Please sign in to comment.