Skip to content

Commit

Permalink
[TaskCenter] Stage 2 of refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Nov 22, 2024
1 parent 55d0d18 commit 95a9837
Show file tree
Hide file tree
Showing 27 changed files with 402 additions and 173 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ opentelemetry_sdk = { version = "0.24.0" }
parking_lot = { version = "0.12" }
paste = "1.0"
pin-project = "1.0"
pin-project-lite = { version = "0.2" }
prost = { version = "0.13.1" }
prost-build = { version = "0.13.1" }
priority-queue = "2.0.3"
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub fn spawn_restate(config: Configuration) -> TaskCenter {
restate_types::config::set_current_config(config.clone());
let updateable_config = Configuration::updateable();

tc.block_on("benchmark", None, async {
tc.block_on(async {
RocksDbManager::init(Constant::new(config.common));

tc.spawn(TaskKind::SystemBoot, "restate", None, async move {
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/benches/append_throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ fn write_throughput_local_loglet(c: &mut Criterion) {
provider,
));

let bifrost = tc.block_on("bifrost-init", None, async {
let bifrost = tc.block_on(async {
let metadata = metadata();
let bifrost_svc = BifrostService::new(restate_core::task_center(), metadata)
.enable_local_loglet(&Live::from_value(config));
Expand Down
4 changes: 1 addition & 3 deletions crates/bifrost/benches/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ pub async fn spawn_environment(
let metadata_writer = metadata_manager.writer();
tc.try_set_global_metadata(metadata.clone());

tc.run_in_scope_sync("db-manager-init", None, || {
RocksDbManager::init(Constant::new(config.common))
});
tc.run_in_scope_sync(|| RocksDbManager::init(Constant::new(config.common)));

let logs = restate_types::logs::metadata::bootstrap_logs_metadata(provider, None, num_logs);

Expand Down
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ metrics = { workspace = true }
opentelemetry = { workspace = true }
once_cell = { workspace = true }
parking_lot = { workspace = true }
pin-project = { workspace = true }
pin-project-lite = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
rand = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ mod tests {
S: Fn(&mut T, Version),
{
let tc = TaskCenterBuilder::default().build()?;
tc.block_on("test", None, async move {
tc.block_on(async move {
let metadata_builder = MetadataBuilder::default();
let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata = metadata_builder.to_metadata();
Expand Down Expand Up @@ -689,7 +689,7 @@ mod tests {
I: Fn(&mut T),
{
let tc = TaskCenterBuilder::default().build()?;
tc.block_on("test", None, async move {
tc.block_on(async move {
let metadata_builder = MetadataBuilder::default();
let metadata_store_client = MetadataStoreClient::new_in_memory();

Expand Down
24 changes: 24 additions & 0 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,30 @@ pub struct Metadata {
}

impl Metadata {
pub fn try_with_current<F, R>(f: F) -> Option<R>
where
F: Fn(&Metadata) -> R,
{
TaskCenter::with_metadata(|m| f(m))
}

pub fn try_current() -> Option<Metadata> {
TaskCenter::with_current(|tc| tc.metadata())
}

#[track_caller]
pub fn with_current<F, R>(f: F) -> R
where
F: FnOnce(&Metadata) -> R,
{
TaskCenter::with_metadata(|m| f(m)).expect("called outside task-center scope")
}

#[track_caller]
pub fn current() -> Metadata {
TaskCenter::with_current(|tc| tc.metadata()).expect("called outside task-center scope")
}

#[inline(always)]
pub fn nodes_config_snapshot(&self) -> Arc<NodesConfiguration> {
self.inner.nodes_config.load_full()
Expand Down
171 changes: 171 additions & 0 deletions crates/core/src/task_center/extensions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::future::Future;
use std::pin::Pin;

use pin_project_lite::pin_project;
use tokio::task::futures::TaskLocalFuture;
use tokio_util::sync::CancellationToken;

use crate::task_center::TaskContext;
use crate::Metadata;

use super::{
GlobalOverrides, TaskCenter, TaskId, TaskKind, CURRENT_TASK_CENTER, OVERRIDES, TASK_CONTEXT,
};

type TaskCenterFuture<F> =
TaskLocalFuture<TaskCenter, TaskLocalFuture<GlobalOverrides, TaskLocalFuture<TaskContext, F>>>;

/// Adds the ability to override task-center for a future and all its children
pub trait TaskCenterFutureExt: Sized {
/// Ensures that a future will run within a task-center context. This will inherit the current
/// task context (if there is one). Otherwise, it'll run in the context of the root task (task-id=0).
fn in_tc(self, task_center: &TaskCenter) -> WithTaskCenter<Self>;

/// Lets task-center treat this future as a psuedo-task. It gets its own TaskId and an
/// independent cancellation token. However, task-center will not spawn this as a task nor
/// manage its lifecycle.
fn in_tc_as_task(
self,
task_center: &TaskCenter,
kind: TaskKind,
name: &'static str,
) -> WithTaskCenter<Self>;

/// Ensures that a future will run within the task-center in current scope. This will inherit the current
/// task context (if there is one). Otherwise, it'll run in the context of the root task (task-id=0).
///
/// This is useful when running dispatching a future as a task on an external runtime/thread,
/// or when running a future on tokio's JoinSet without representing those tokio tasks as
/// task-center tasks. However, in the latter case, it's preferred to use
/// [`Self::in_current_ts_as_task`] instead.
fn in_current_tc(self) -> WithTaskCenter<Self>;

/// Attaches current task-center and lets it treat the future as a psuedo-task. It gets its own TaskId and an
/// independent cancellation token. However, task-center will not spawn this as a task nor
/// manage its lifecycle.
fn in_current_tc_as_task(self, kind: TaskKind, name: &'static str) -> WithTaskCenter<Self>;
}

pin_project! {
pub struct WithTaskCenter<F> {
#[pin]
inner_fut: TaskCenterFuture<F>,
}
}

impl<F, O> TaskCenterFutureExt for F
where
F: Future<Output = O>,
{
fn in_tc(self, task_center: &TaskCenter) -> WithTaskCenter<Self> {
let ctx = task_center.with_task_context(Clone::clone);

let inner = CURRENT_TASK_CENTER.scope(
task_center.clone(),
OVERRIDES.scope(
OVERRIDES.try_with(Clone::clone).unwrap_or_default(),
TASK_CONTEXT.scope(ctx, self),
),
);
WithTaskCenter { inner_fut: inner }
}

fn in_tc_as_task(
self,
task_center: &TaskCenter,
kind: TaskKind,
name: &'static str,
) -> WithTaskCenter<Self> {
let ctx = task_center.with_task_context(move |parent| TaskContext {
id: TaskId::default(),
name,
kind,
cancellation_token: CancellationToken::new(),
partition_id: parent.partition_id,
});

let inner = CURRENT_TASK_CENTER.scope(
task_center.clone(),
OVERRIDES.scope(
OVERRIDES.try_with(Clone::clone).unwrap_or_default(),
TASK_CONTEXT.scope(ctx, self),
),
);
WithTaskCenter { inner_fut: inner }
}

/// Ensures that a future will run within a task-center context. This will inherit the current
/// task context (if there is one). Otherwise, it'll run in the context of the root task (task-id=0).
fn in_current_tc(self) -> WithTaskCenter<Self> {
TaskCenter::with_current(|tc| self.in_tc(tc))
}

fn in_current_tc_as_task(self, kind: TaskKind, name: &'static str) -> WithTaskCenter<Self> {
TaskCenter::with_current(|tc| self.in_tc_as_task(tc, kind, name))
}
}

impl<T: Future> Future for WithTaskCenter<T> {
type Output = T::Output;

fn poll(
self: Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
this.inner_fut.poll(ctx)
}
}

/// Adds the ability to override Metadata for a future and all its children
pub trait MetadataFutureExt: Sized {
/// Attaches restate's Metadata as an override on a future and all children futures or
/// task-center tasks spawned from it.
fn with_metadata(self, metadata: &Metadata) -> WithMetadata<Self>;
}

pin_project! {
pub struct WithMetadata<F> {
#[pin]
inner_fut: TaskLocalFuture<GlobalOverrides, F>,
}
}

impl<F, O> MetadataFutureExt for F
where
F: Future<Output = O>,
{
fn with_metadata(self, metadata: &Metadata) -> WithMetadata<Self> {
let current_overrides = OVERRIDES.try_with(Clone::clone).unwrap_or_default();
// temporary mute until overrides include more fields
#[allow(clippy::needless_update)]
let overrides = GlobalOverrides {
metadata: Some(metadata.clone()),
..current_overrides
};
let inner = OVERRIDES.scope(overrides, self);
WithMetadata { inner_fut: inner }
}
}

impl<T: Future> Future for WithMetadata<T> {
type Output = T::Output;

fn poll(
self: Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
this.inner_fut.poll(ctx)
}
}
Loading

0 comments on commit 95a9837

Please sign in to comment.