Skip to content

Commit 384fd8d

Browse files
committed
Avoid blocking in the PartitionProcessorManager when being in the control loop
Summary: - Improve tracing of ControlProcessors commands - Run the spawn of partition processors concurrently - Avoid blocking partition processor manager main loop While testing I noticed that InvokerService creations takes around 25ms (on my machine) to be created with `InvokerService::from_options` I traced the delay to a single line of code here https://github.com/restatedev/restate/blob/1cb01203f18bfc6ddb5fe4c45f6697799a79c27e/crates/service-client/src/http.rs#L73 My first thought was to cache the `HttpsConnectorBuilder` but this won't work because `HttpsConnectorBuilder` is not clonable. Fixes #2146
1 parent c490b19 commit 384fd8d

File tree

12 files changed

+442
-196
lines changed

12 files changed

+442
-196
lines changed

crates/admin/src/cluster_controller/cluster_state_refresher.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
170170
NodeState::Alive(AliveNode {
171171
last_heartbeat_at: MillisSinceEpoch::now(),
172172
generational_node_id: peer,
173-
partitions: msg.paritions_processor_state.unwrap_or_default(),
173+
partitions: msg.partition_processor_state.unwrap_or_default(),
174174
}),
175175
);
176176
}

crates/admin/src/cluster_controller/service.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,7 @@ mod tests {
746746

747747
let state = [(PartitionId::MIN, partition_processor_status)].into();
748748
let response = msg.to_rpc_response(NodeStateResponse {
749-
paritions_processor_state: Some(state),
749+
partition_processor_state: Some(state),
750750
});
751751

752752
// We are not really sending something back to target, we just need to provide a known

crates/core/src/metadata/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::sync::{Arc, OnceLock};
1818
use arc_swap::{ArcSwap, AsRaw};
1919
use enum_map::EnumMap;
2020
use tokio::sync::{mpsc, oneshot, watch};
21+
use tracing::instrument;
2122

2223
use restate_types::live::{Live, Pinned};
2324
use restate_types::logs::metadata::Logs;
@@ -175,6 +176,7 @@ impl Metadata {
175176
}
176177

177178
/// Returns when the metadata kind is at the provided version (or newer)
179+
#[instrument(level = "debug", skip(self))]
178180
pub async fn wait_for_version(
179181
&self,
180182
metadata_kind: MetadataKind,

crates/core/src/network/partition_processor_rpc_client.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ pub enum PartitionProcessorRpcClientError {
4646
Busy,
4747
#[error("internal error: {0}")]
4848
Internal(String),
49+
#[error("partition processor starting")]
50+
Starting,
4951
}
5052

5153
impl PartitionProcessorRpcClientError {
@@ -60,7 +62,8 @@ impl PartitionProcessorRpcClientError {
6062
)
6163
| PartitionProcessorRpcClientError::UnknownPartition(_)
6264
| PartitionProcessorRpcClientError::UnknownNode(_)
63-
| PartitionProcessorRpcClientError::NotLeader(_) => {
65+
| PartitionProcessorRpcClientError::NotLeader(_)
66+
| PartitionProcessorRpcClientError::Starting => {
6467
// These are pre-flight error that we can distinguish,
6568
// and for which we know for certain that no message was proposed yet to the log.
6669
true
@@ -83,6 +86,7 @@ impl From<PartitionProcessorRpcError> for PartitionProcessorRpcClientError {
8386
PartitionProcessorRpcError::Internal(msg) => {
8487
PartitionProcessorRpcClientError::Internal(msg)
8588
}
89+
PartitionProcessorRpcError::Starting => PartitionProcessorRpcClientError::Starting,
8690
}
8791
}
8892
}

crates/core/src/task_center.rs

+17
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,23 @@ impl TaskCenter {
634634
Ok(result)
635635
}
636636

637+
// Spawn a future in its own thread
638+
pub fn spawn_blocking_unmanaged<F, O>(
639+
&self,
640+
name: &'static str,
641+
partition_id: Option<PartitionId>,
642+
future: F,
643+
) -> tokio::task::JoinHandle<O>
644+
where
645+
F: Future<Output = O> + Send + 'static,
646+
O: Send + 'static,
647+
{
648+
let tc = self.clone();
649+
self.inner
650+
.default_runtime_handle
651+
.spawn_blocking(move || tc.block_on(name, partition_id, future))
652+
}
653+
637654
/// Cancelling the child will not cancel the parent. Note that parent task will not
638655
/// wait for children tasks. The parent task is allowed to finish before children.
639656
#[track_caller]

crates/node/src/roles/base.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl BaseRole {
7373
&self,
7474
msg: Incoming<GetNodeState>,
7575
) -> Result<(), ShutdownError> {
76-
let parition_state = if let Some(ref handle) = self.processor_manager_handle {
76+
let partition_state = if let Some(ref handle) = self.processor_manager_handle {
7777
Some(handle.get_state().await?)
7878
} else {
7979
None
@@ -82,7 +82,7 @@ impl BaseRole {
8282
// only return error if Shutdown
8383
if let Err(NetworkError::Shutdown(err)) = msg
8484
.to_rpc_response(NodeStateResponse {
85-
paritions_processor_state: parition_state,
85+
partition_processor_state: partition_state,
8686
})
8787
.try_send()
8888
.map_err(|err| err.source)

crates/service-client/src/http.rs

+11-3
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,26 @@ use hyper::body::Body;
2222
use hyper::http::uri::PathAndQuery;
2323
use hyper::http::HeaderValue;
2424
use hyper::{HeaderMap, Method, Request, Response, Uri};
25-
use hyper_rustls::HttpsConnector;
25+
use hyper_rustls::{ConfigBuilderExt, HttpsConnector};
2626
use hyper_util::client::legacy::connect::HttpConnector;
2727
use restate_types::config::HttpOptions;
28+
use rustls::ClientConfig;
2829
use std::error::Error;
2930
use std::fmt::Debug;
3031
use std::future;
3132
use std::future::Future;
33+
use std::sync::LazyLock;
3234

3335
type ProxiedHttpsConnector = ProxyConnector<HttpsConnector<HttpConnector>>;
3436
type ProxiedHttpConnector = ProxyConnector<HttpConnector>;
3537

38+
static TLS_CLIENT_CONFIG: LazyLock<ClientConfig> = LazyLock::new(|| {
39+
ClientConfig::builder()
40+
.with_native_roots()
41+
.expect("Can load native certificates")
42+
.with_no_client_auth()
43+
});
44+
3645
// TODO
3746
// for the time being we use BoxBody here to simplify the migration to hyper 1.0.
3847
// We should consider replacing this with some concrete type that makes sense.
@@ -70,8 +79,7 @@ impl HttpClient {
7079
http_connector.set_connect_timeout(Some(options.connect_timeout.into()));
7180

7281
let https_connector = hyper_rustls::HttpsConnectorBuilder::new()
73-
.with_native_roots()
74-
.expect("Can build native roots")
82+
.with_tls_config(TLS_CLIENT_CONFIG.clone())
7583
.https_or_http()
7684
.enable_http1()
7785
.enable_http2()

crates/types/src/cluster/cluster_state.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ pub struct DeadNode {
9797
pub last_seen_alive: Option<MillisSinceEpoch>,
9898
}
9999

100-
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto)]
100+
#[derive(
101+
Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto, derive_more::Display,
102+
)]
101103
#[proto(target = "crate::protobuf::cluster::RunMode")]
102104
pub enum RunMode {
103105
Leader,

crates/types/src/net/node.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,5 @@ pub struct GetNodeState {}
3131
pub struct NodeStateResponse {
3232
/// State of paritions processor per parition. Is set to None if this node is not a `Worker` node
3333
#[serde_as(as = "Option<serde_with::Seq<(_, _)>>")]
34-
pub paritions_processor_state: Option<BTreeMap<PartitionId, PartitionProcessorStatus>>,
34+
pub partition_processor_state: Option<BTreeMap<PartitionId, PartitionProcessorStatus>>,
3535
}

crates/types/src/net/partition_processor.rs

+2
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ pub enum PartitionProcessorRpcError {
7979
Busy,
8080
#[error("internal error: {0}")]
8181
Internal(String),
82+
#[error("partition processor starting")]
83+
Starting,
8284
}
8385

8486
#[derive(Debug, Clone, Serialize, Deserialize)]

crates/types/src/net/partition_processor_manager.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub struct ControlProcessor {
3434
pub command: ProcessorCommand,
3535
}
3636

37-
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
37+
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, derive_more::Display)]
3838
pub enum ProcessorCommand {
3939
Stop,
4040
Follower,

0 commit comments

Comments
 (0)