Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid blocking in the PartitionProcessorManager when being in the control loop #2179

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
NodeState::Alive(AliveNode {
last_heartbeat_at: MillisSinceEpoch::now(),
generational_node_id: peer,
partitions: msg.paritions_processor_state.unwrap_or_default(),
partitions: msg.partition_processor_state.unwrap_or_default(),
}),
);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ mod tests {

let state = [(PartitionId::MIN, partition_processor_status)].into();
let response = msg.to_rpc_response(NodeStateResponse {
paritions_processor_state: Some(state),
partition_processor_state: Some(state),
});

// We are not really sending something back to target, we just need to provide a known
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::{Arc, OnceLock};
use arc_swap::{ArcSwap, AsRaw};
use enum_map::EnumMap;
use tokio::sync::{mpsc, oneshot, watch};
use tracing::instrument;

use restate_types::live::{Live, Pinned};
use restate_types::logs::metadata::Logs;
Expand Down Expand Up @@ -175,6 +176,7 @@ impl Metadata {
}

/// Returns when the metadata kind is at the provided version (or newer)
#[instrument(level = "debug", skip(self))]
pub async fn wait_for_version(
&self,
metadata_kind: MetadataKind,
Expand Down
6 changes: 5 additions & 1 deletion crates/core/src/network/partition_processor_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub enum PartitionProcessorRpcClientError {
Busy,
#[error("internal error: {0}")]
Internal(String),
#[error("partition processor starting")]
Starting,
}

impl PartitionProcessorRpcClientError {
Expand All @@ -60,7 +62,8 @@ impl PartitionProcessorRpcClientError {
)
| PartitionProcessorRpcClientError::UnknownPartition(_)
| PartitionProcessorRpcClientError::UnknownNode(_)
| PartitionProcessorRpcClientError::NotLeader(_) => {
| PartitionProcessorRpcClientError::NotLeader(_)
| PartitionProcessorRpcClientError::Starting => {
// These are pre-flight error that we can distinguish,
// and for which we know for certain that no message was proposed yet to the log.
true
Expand All @@ -83,6 +86,7 @@ impl From<PartitionProcessorRpcError> for PartitionProcessorRpcClientError {
PartitionProcessorRpcError::Internal(msg) => {
PartitionProcessorRpcClientError::Internal(msg)
}
PartitionProcessorRpcError::Starting => PartitionProcessorRpcClientError::Starting,
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,23 @@ impl TaskCenter {
Ok(result)
}

// Spawn a future in its own thread
pub fn spawn_blocking_unmanaged<F, O>(
&self,
name: &'static str,
partition_id: Option<PartitionId>,
future: F,
) -> tokio::task::JoinHandle<O>
where
F: Future<Output = O> + Send + 'static,
O: Send + 'static,
{
let tc = self.clone();
self.inner
.default_runtime_handle
.spawn_blocking(move || tc.block_on(name, partition_id, future))
}

/// Cancelling the child will not cancel the parent. Note that parent task will not
/// wait for children tasks. The parent task is allowed to finish before children.
#[track_caller]
Expand Down
4 changes: 2 additions & 2 deletions crates/node/src/roles/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl BaseRole {
&self,
msg: Incoming<GetNodeState>,
) -> Result<(), ShutdownError> {
let parition_state = if let Some(ref handle) = self.processor_manager_handle {
let partition_state = if let Some(ref handle) = self.processor_manager_handle {
Some(handle.get_state().await?)
} else {
None
Expand All @@ -82,7 +82,7 @@ impl BaseRole {
// only return error if Shutdown
if let Err(NetworkError::Shutdown(err)) = msg
.to_rpc_response(NodeStateResponse {
paritions_processor_state: parition_state,
partition_processor_state: partition_state,
})
.try_send()
.map_err(|err| err.source)
Expand Down
14 changes: 11 additions & 3 deletions crates/service-client/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,26 @@ use hyper::body::Body;
use hyper::http::uri::PathAndQuery;
use hyper::http::HeaderValue;
use hyper::{HeaderMap, Method, Request, Response, Uri};
use hyper_rustls::HttpsConnector;
use hyper_rustls::{ConfigBuilderExt, HttpsConnector};
use hyper_util::client::legacy::connect::HttpConnector;
use restate_types::config::HttpOptions;
use rustls::ClientConfig;
use std::error::Error;
use std::fmt::Debug;
use std::future;
use std::future::Future;
use std::sync::LazyLock;

type ProxiedHttpsConnector = ProxyConnector<HttpsConnector<HttpConnector>>;
type ProxiedHttpConnector = ProxyConnector<HttpConnector>;

static TLS_CLIENT_CONFIG: LazyLock<ClientConfig> = LazyLock::new(|| {
ClientConfig::builder()
.with_native_roots()
.expect("Can load native certificates")
.with_no_client_auth()
});

// TODO
// for the time being we use BoxBody here to simplify the migration to hyper 1.0.
// We should consider replacing this with some concrete type that makes sense.
Expand Down Expand Up @@ -70,8 +79,7 @@ impl HttpClient {
http_connector.set_connect_timeout(Some(options.connect_timeout.into()));

let https_connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.expect("Can build native roots")
.with_tls_config(TLS_CLIENT_CONFIG.clone())
.https_or_http()
.enable_http1()
.enable_http2()
Expand Down
4 changes: 3 additions & 1 deletion crates/types/src/cluster/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ pub struct DeadNode {
pub last_seen_alive: Option<MillisSinceEpoch>,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto)]
#[derive(
Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto, derive_more::Display,
)]
#[proto(target = "crate::protobuf::cluster::RunMode")]
pub enum RunMode {
Leader,
Expand Down
2 changes: 1 addition & 1 deletion crates/types/src/net/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ pub struct GetNodeState {}
pub struct NodeStateResponse {
/// State of paritions processor per parition. Is set to None if this node is not a `Worker` node
#[serde_as(as = "Option<serde_with::Seq<(_, _)>>")]
pub paritions_processor_state: Option<BTreeMap<PartitionId, PartitionProcessorStatus>>,
pub partition_processor_state: Option<BTreeMap<PartitionId, PartitionProcessorStatus>>,
}
2 changes: 2 additions & 0 deletions crates/types/src/net/partition_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub enum PartitionProcessorRpcError {
Busy,
#[error("internal error: {0}")]
Internal(String),
#[error("partition processor starting")]
Starting,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion crates/types/src/net/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct ControlProcessor {
pub command: ProcessorCommand,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, derive_more::Display)]
pub enum ProcessorCommand {
Stop,
Follower,
Expand Down
Loading
Loading