Skip to content

Commit f58af72

Browse files
committed
Avoid blocking in the PartitionProcessorManager when being in the control loop
Summary: - Improve tracing of ControlProcessors commands - Run the spawn of partition processors concurrently - Avoid blocking partition processor manager main loop While testing I noticed that InvokerService creations takes around 25ms (on my machine) to be created with `InvokerService::from_options` I traced the delay to a single line of code here https://github.com/restatedev/restate/blob/1cb01203f18bfc6ddb5fe4c45f6697799a79c27e/crates/service-client/src/http.rs#L73 My first thought was to cache the `HttpsConnectorBuilder` but this won't work because `HttpsConnectorBuilder` is not clonable. Fixes #2146
1 parent 3a293a4 commit f58af72

File tree

8 files changed

+398
-189
lines changed

8 files changed

+398
-189
lines changed

crates/admin/src/cluster_controller/cluster_state_refresher.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
170170
NodeState::Alive(AliveNode {
171171
last_heartbeat_at: MillisSinceEpoch::now(),
172172
generational_node_id: peer,
173-
partitions: msg.paritions_processor_state.unwrap_or_default(),
173+
partitions: msg.partition_processor_state.unwrap_or_default(),
174174
}),
175175
);
176176
}

crates/admin/src/cluster_controller/service.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,7 @@ mod tests {
809809

810810
let state = [(PartitionId::MIN, partition_processor_status)].into();
811811
let response = msg.to_rpc_response(NodeStateResponse {
812-
paritions_processor_state: Some(state),
812+
partition_processor_state: Some(state),
813813
});
814814

815815
// We are not really sending something back to target, we just need to provide a known

crates/core/src/metadata/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::sync::{Arc, OnceLock};
1818
use arc_swap::{ArcSwap, AsRaw};
1919
use enum_map::EnumMap;
2020
use tokio::sync::{mpsc, oneshot, watch};
21+
use tracing::instrument;
2122

2223
use restate_types::live::{Live, Pinned};
2324
use restate_types::logs::metadata::Logs;
@@ -175,6 +176,7 @@ impl Metadata {
175176
}
176177

177178
/// Returns when the metadata kind is at the provided version (or newer)
179+
#[instrument(level = "debug", skip(self))]
178180
pub async fn wait_for_version(
179181
&self,
180182
metadata_kind: MetadataKind,

crates/node/src/roles/base.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl BaseRole {
7373
&self,
7474
msg: Incoming<GetNodeState>,
7575
) -> Result<(), ShutdownError> {
76-
let parition_state = if let Some(ref handle) = self.processor_manager_handle {
76+
let partition_state = if let Some(ref handle) = self.processor_manager_handle {
7777
Some(handle.get_state().await?)
7878
} else {
7979
None
@@ -82,7 +82,7 @@ impl BaseRole {
8282
// only return error if Shutdown
8383
if let Err(NetworkError::Shutdown(err)) = msg
8484
.to_rpc_response(NodeStateResponse {
85-
paritions_processor_state: parition_state,
85+
partition_processor_state: partition_state,
8686
})
8787
.try_send()
8888
.map_err(|err| err.source)

crates/types/src/cluster/cluster_state.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ pub struct DeadNode {
9797
pub last_seen_alive: Option<MillisSinceEpoch>,
9898
}
9999

100-
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto)]
100+
#[derive(
101+
Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto, derive_more::Display,
102+
)]
101103
#[proto(target = "crate::protobuf::cluster::RunMode")]
102104
pub enum RunMode {
103105
Leader,

crates/types/src/net/node.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,5 @@ pub struct GetNodeState {}
3131
pub struct NodeStateResponse {
3232
/// State of paritions processor per parition. Is set to None if this node is not a `Worker` node
3333
#[serde_as(as = "Option<serde_with::Seq<(_, _)>>")]
34-
pub paritions_processor_state: Option<BTreeMap<PartitionId, PartitionProcessorStatus>>,
34+
pub partition_processor_state: Option<BTreeMap<PartitionId, PartitionProcessorStatus>>,
3535
}

crates/types/src/net/partition_processor_manager.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub struct ControlProcessor {
3434
pub command: ProcessorCommand,
3535
}
3636

37-
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
37+
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
3838
pub enum ProcessorCommand {
3939
Stop,
4040
Follower,

0 commit comments

Comments
 (0)