Skip to content

Commit

Permalink
[TaskCenter] Stage 1 of task-center clean-up
Browse files Browse the repository at this point in the history
TaskCenter has been an incredibly useful tool but it has grown organically and is spiraling out of control. This PR is the first step to break it down into smaller modules in preparation for a more ergonomic API for its users.

Changes in this PR are mechanical and does cause any behavioural changes.
  • Loading branch information
AhmedSoliman committed Nov 20, 2024
1 parent 43dab46 commit 18b7c23
Show file tree
Hide file tree
Showing 6 changed files with 348 additions and 296 deletions.
2 changes: 0 additions & 2 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ mod metric_definitions;
pub mod network;
pub mod partitions;
mod task_center;
mod task_center_types;
pub mod worker_api;
pub use error::*;

Expand All @@ -24,7 +23,6 @@ pub use metadata::{
MetadataWriter, SyncError, TargetVersion,
};
pub use task_center::*;
pub use task_center_types::*;

#[cfg(any(test, feature = "test-util"))]
mod test_env;
Expand Down
130 changes: 130 additions & 0 deletions crates/core/src/task_center/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::atomic::{AtomicUsize, Ordering};

use tracing::error;

use restate_types::config::CommonOptions;

use super::TaskCenter;

static WORKER_ID: AtomicUsize = const { AtomicUsize::new(0) };

#[derive(Debug, thiserror::Error)]
pub enum TaskCenterBuildError {
#[error(transparent)]
Tokio(#[from] tokio::io::Error),
}

/// Used to create a new task center. In practice, there should be a single task center for the
/// entire process but we might need to create more than one in integration test scenarios.
#[derive(Default)]
pub struct TaskCenterBuilder {
default_runtime_handle: Option<tokio::runtime::Handle>,
default_runtime: Option<tokio::runtime::Runtime>,
ingress_runtime_handle: Option<tokio::runtime::Handle>,
ingress_runtime: Option<tokio::runtime::Runtime>,
options: Option<CommonOptions>,
#[cfg(any(test, feature = "test-util"))]
pause_time: bool,
}

impl TaskCenterBuilder {
pub fn default_runtime_handle(mut self, handle: tokio::runtime::Handle) -> Self {
self.default_runtime_handle = Some(handle);
self.default_runtime = None;
self
}

pub fn ingress_runtime_handle(mut self, handle: tokio::runtime::Handle) -> Self {
self.ingress_runtime_handle = Some(handle);
self.ingress_runtime = None;
self
}

pub fn options(mut self, options: CommonOptions) -> Self {
self.options = Some(options);
self
}

pub fn default_runtime(mut self, runtime: tokio::runtime::Runtime) -> Self {
self.default_runtime_handle = Some(runtime.handle().clone());
self.default_runtime = Some(runtime);
self
}

pub fn ingress_runtime(mut self, runtime: tokio::runtime::Runtime) -> Self {
self.ingress_runtime_handle = Some(runtime.handle().clone());
self.ingress_runtime = Some(runtime);
self
}

#[cfg(any(test, feature = "test-util"))]
pub fn pause_time(mut self, pause_time: bool) -> Self {
self.pause_time = pause_time;
self
}

#[cfg(any(test, feature = "test-util"))]
pub fn default_for_tests() -> Self {
Self::default()
.ingress_runtime_handle(tokio::runtime::Handle::current())
.default_runtime_handle(tokio::runtime::Handle::current())
.pause_time(true)
}

pub fn build(mut self) -> Result<TaskCenter, TaskCenterBuildError> {
let options = self.options.unwrap_or_default();
if self.default_runtime_handle.is_none() {
let mut default_runtime_builder = tokio_builder("worker", &options);
#[cfg(any(test, feature = "test-util"))]
if self.pause_time {
default_runtime_builder.start_paused(self.pause_time);
}
let default_runtime = default_runtime_builder.build()?;
self.default_runtime_handle = Some(default_runtime.handle().clone());
self.default_runtime = Some(default_runtime);
}

if self.ingress_runtime_handle.is_none() {
let mut ingress_runtime_builder = tokio_builder("ingress", &options);
#[cfg(any(test, feature = "test-util"))]
if self.pause_time {
ingress_runtime_builder.start_paused(self.pause_time);
}
let ingress_runtime = ingress_runtime_builder.build()?;
self.ingress_runtime_handle = Some(ingress_runtime.handle().clone());
self.ingress_runtime = Some(ingress_runtime);
}

if cfg!(any(test, feature = "test-util")) {
eprintln!("!!!! Runnning with test-util enabled !!!!");
}
Ok(TaskCenter::new(
self.default_runtime_handle.unwrap(),
self.ingress_runtime_handle.unwrap(),
self.default_runtime,
self.ingress_runtime,
))
}
}

fn tokio_builder(prefix: &'static str, common_opts: &CommonOptions) -> tokio::runtime::Builder {
let mut builder = tokio::runtime::Builder::new_multi_thread();
builder.enable_all().thread_name_fn(move || {
let id = WORKER_ID.fetch_add(1, Ordering::Relaxed);
format!("rs:{}-{}", prefix, id)
});

builder.worker_threads(common_opts.default_thread_pool_size());

builder
}
Loading

0 comments on commit 18b7c23

Please sign in to comment.