Skip to content

Commit

Permalink
Avoid blocking in the PartitionProcessorManager when being in the con…
Browse files Browse the repository at this point in the history
…trol loop

Summary:
- Improve tracing of ControlProcessors commands
- Run the spawn of partition processors concurrently
- Avoid blocking partition processor manager main loop

While testing I noticed that InvokerService creations takes
around 25ms (on my machine) to be created with `InvokerService::from_options`

I traced the delay to a single line of code here
https://github.com/restatedev/restate/blob/1cb01203f18bfc6ddb5fe4c45f6697799a79c27e/crates/service-client/src/http.rs#L73

My first thought was to cache the `HttpsConnectorBuilder` but this won't work because `HttpsConnectorBuilder` is not
clonable.

Fixes #2146
  • Loading branch information
muhamadazmy committed Nov 5, 2024
1 parent fdb95c5 commit 7b1d08b
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
NodeState::Alive(AliveNode {
last_heartbeat_at: MillisSinceEpoch::now(),
generational_node_id: peer,
partitions: msg.paritions_processor_state.unwrap_or_default(),
partitions: msg.partition_processor_state.unwrap_or_default(),
}),
);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ mod tests {

let state = [(PartitionId::MIN, partition_processor_status)].into();
let response = msg.to_rpc_response(NodeStateResponse {
paritions_processor_state: Some(state),
partition_processor_state: Some(state),
});

// We are not really sending something back to target, we just need to provide a known
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::{Arc, OnceLock};
use arc_swap::{ArcSwap, AsRaw};
use enum_map::EnumMap;
use tokio::sync::{mpsc, oneshot, watch};
use tracing::instrument;

use restate_types::live::{Live, Pinned};
use restate_types::logs::metadata::Logs;
Expand Down Expand Up @@ -175,6 +176,7 @@ impl Metadata {
}

/// Returns when the metadata kind is at the provided version (or newer)
#[instrument(level = "debug", skip(self))]
pub async fn wait_for_version(
&self,
metadata_kind: MetadataKind,
Expand Down
4 changes: 2 additions & 2 deletions crates/node/src/roles/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl BaseRole {
&self,
msg: Incoming<GetNodeState>,
) -> Result<(), ShutdownError> {
let parition_state = if let Some(ref handle) = self.processor_manager_handle {
let partition_state = if let Some(ref handle) = self.processor_manager_handle {
Some(handle.get_state().await?)
} else {
None
Expand All @@ -82,7 +82,7 @@ impl BaseRole {
// only return error if Shutdown
if let Err(NetworkError::Shutdown(err)) = msg
.to_rpc_response(NodeStateResponse {
paritions_processor_state: parition_state,
partition_processor_state: partition_state,
})
.try_send()
.map_err(|err| err.source)
Expand Down
4 changes: 3 additions & 1 deletion crates/types/src/cluster/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ pub struct DeadNode {
pub last_seen_alive: Option<MillisSinceEpoch>,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto)]
#[derive(
Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto, derive_more::Display,
)]
#[proto(target = "crate::protobuf::cluster::RunMode")]
pub enum RunMode {
Leader,
Expand Down
2 changes: 1 addition & 1 deletion crates/types/src/net/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ pub struct GetNodeState {}
pub struct NodeStateResponse {
/// State of paritions processor per parition. Is set to None if this node is not a `Worker` node
#[serde_as(as = "Option<serde_with::Seq<(_, _)>>")]
pub paritions_processor_state: Option<BTreeMap<PartitionId, PartitionProcessorStatus>>,
pub partition_processor_state: Option<BTreeMap<PartitionId, PartitionProcessorStatus>>,
}
2 changes: 1 addition & 1 deletion crates/types/src/net/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct ControlProcessor {
pub command: ProcessorCommand,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, derive_more::Display)]
pub enum ProcessorCommand {
Stop,
Follower,
Expand Down
Loading

0 comments on commit 7b1d08b

Please sign in to comment.