Skip to content

Commit

Permalink
[TaskCenter] Stage 3
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Nov 22, 2024
1 parent 0f5cd85 commit edf7f31
Show file tree
Hide file tree
Showing 41 changed files with 1,361 additions and 1,453 deletions.
23 changes: 9 additions & 14 deletions crates/admin/src/cluster_controller/cluster_state_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::{
MessageRouterBuilder, NetworkError, Networking, Outgoing, TransportConnect,
};
use restate_core::{Metadata, ShutdownError, TaskCenter, TaskHandle};
use restate_core::{
Metadata, ShutdownError, TaskCenter, TaskCenterFutureExt, TaskHandle, TaskKind,
};
use restate_types::cluster::cluster_state::{
AliveNode, ClusterState, DeadNode, NodeState, SuspectNode,
};
Expand All @@ -28,7 +30,6 @@ use restate_types::time::MillisSinceEpoch;
use restate_types::Version;

pub struct ClusterStateRefresher<T> {
task_center: TaskCenter,
metadata: Metadata,
network_sender: Networking<T>,
get_state_router: RpcRouter<GetNodeState>,
Expand All @@ -39,7 +40,6 @@ pub struct ClusterStateRefresher<T> {

impl<T: TransportConnect> ClusterStateRefresher<T> {
pub fn new(
task_center: TaskCenter,
metadata: Metadata,
network_sender: Networking<T>,
router_builder: &mut MessageRouterBuilder,
Expand All @@ -57,7 +57,6 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
watch::channel(Arc::from(initial_state));

Self {
task_center,
metadata,
network_sender,
get_state_router,
Expand Down Expand Up @@ -97,7 +96,6 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
}

self.in_flight_refresh = Self::start_refresh_task(
self.task_center.clone(),
self.get_state_router.clone(),
self.network_sender.clone(),
Arc::clone(&self.cluster_state_update_tx),
Expand All @@ -108,13 +106,11 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
}

fn start_refresh_task(
tc: TaskCenter,
get_state_router: RpcRouter<GetNodeState>,
network_sender: Networking<T>,
cluster_state_tx: Arc<watch::Sender<Arc<ClusterState>>>,
metadata: Metadata,
) -> Result<Option<TaskHandle<anyhow::Result<()>>>, ShutdownError> {
let task_center = tc.clone();
let refresh = async move {
let last_state = Arc::clone(&cluster_state_tx.borrow());
// make sure we have a partition table that equals or newer than last refresh
Expand All @@ -137,13 +133,12 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
for (_, node_config) in nodes_config.iter() {
let node_id = node_config.current_generation;
let rpc_router = get_state_router.clone();
let tc = tc.clone();
let network_sender = network_sender.clone();
join_set
.build_task()
.name("get-nodes-state")
.spawn(async move {
tc.run_in_scope("get-node-state", None, async move {
.spawn(
async move {
match network_sender.node_connection(node_id).await {
Ok(connection) => {
let outgoing = Outgoing::new(node_id, GetNodeState::default())
Expand All @@ -161,9 +156,9 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
}
Err(network_error) => (node_id, Err(network_error)),
}
})
.await
})
}
.in_current_tc_as_task(TaskKind::InPlace, "get-nodes-state"),
)
.expect("to spawn task");
}
while let Some(Ok((node_id, result))) = join_set.join_next().await {
Expand Down Expand Up @@ -233,7 +228,7 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
Ok(())
};

let handle = task_center.spawn_unmanaged(
let handle = TaskCenter::current().spawn_unmanaged(
restate_core::TaskKind::Disposable,
"cluster-state-refresh",
None,
Expand Down
200 changes: 96 additions & 104 deletions crates/admin/src/cluster_controller/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ impl<T: PartitionProcessorPlacementHints> PartitionProcessorPlacementHints for &
pub struct Scheduler<T> {
scheduling_plan: SchedulingPlan,
last_updated_scheduling_plan: Instant,

task_center: TaskCenter,
metadata_store_client: MetadataStoreClient,
networking: Networking<T>,
}
Expand All @@ -104,7 +102,6 @@ pub struct Scheduler<T> {
impl<T: TransportConnect> Scheduler<T> {
pub async fn init(
configuration: &Configuration,
task_center: TaskCenter,
metadata_store_client: MetadataStoreClient,
networking: Networking<T>,
) -> Result<Self, BuildError> {
Expand All @@ -120,7 +117,6 @@ impl<T: TransportConnect> Scheduler<T> {
Ok(Self {
scheduling_plan,
last_updated_scheduling_plan: Instant::now(),
task_center,
metadata_store_client,
networking,
})
Expand Down Expand Up @@ -478,10 +474,9 @@ impl<T: TransportConnect> Scheduler<T> {
commands,
};

self.task_center.spawn_child(
TaskCenter::spawn_child(
TaskKind::Disposable,
"send-control-processors-to-node",
None,
{
let networking = self.networking.clone();
async move {
Expand Down Expand Up @@ -584,7 +579,9 @@ mod tests {
HashSet, PartitionProcessorPlacementHints, Scheduler,
};
use restate_core::network::{ForwardingHandler, Incoming, MessageCollectorMockConnector};
use restate_core::{metadata, TaskCenterBuilder, TestCoreEnv, TestCoreEnvBuilder};
use restate_core::{
metadata, TaskCenterBuilder, TaskCenterFutureExt, TestCoreEnv, TestCoreEnvBuilder,
};
use restate_types::cluster::cluster_state::{
AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode,
};
Expand Down Expand Up @@ -618,44 +615,41 @@ mod tests {
#[test(tokio::test)]
async fn empty_leadership_changes_dont_modify_plan() -> googletest::Result<()> {
let test_env = TestCoreEnv::create_with_single_node(0, 0).await;
let tc = test_env.tc.clone();
let metadata_store_client = test_env.metadata_store_client.clone();
let networking = test_env.networking.clone();

test_env
.tc
.run_in_scope("test", None, async {
let initial_scheduling_plan = metadata_store_client
.get::<SchedulingPlan>(SCHEDULING_PLAN_KEY.clone())
.await
.expect("scheduling plan");
let mut scheduler = Scheduler::init(
Configuration::pinned().as_ref(),
tc,
metadata_store_client.clone(),
networking,
async {
let initial_scheduling_plan = metadata_store_client
.get::<SchedulingPlan>(SCHEDULING_PLAN_KEY.clone())
.await
.expect("scheduling plan");
let mut scheduler = Scheduler::init(
Configuration::pinned().as_ref(),
metadata_store_client.clone(),
networking,
)
.await?;
let observed_cluster_state = ObservedClusterState::default();

scheduler
.on_observed_cluster_state(
&observed_cluster_state,
&metadata().nodes_config_ref(),
NoPlacementHints,
)
.await?;
let observed_cluster_state = ObservedClusterState::default();

scheduler
.on_observed_cluster_state(
&observed_cluster_state,
&metadata().nodes_config_ref(),
NoPlacementHints,
)
.await?;

let scheduling_plan = metadata_store_client
.get::<SchedulingPlan>(SCHEDULING_PLAN_KEY.clone())
.await
.expect("scheduling plan");
let scheduling_plan = metadata_store_client
.get::<SchedulingPlan>(SCHEDULING_PLAN_KEY.clone())
.await
.expect("scheduling plan");

assert_eq!(initial_scheduling_plan, scheduling_plan);
assert_eq!(initial_scheduling_plan, scheduling_plan);

Ok(())
})
.await
Ok(())
}
.in_tc(&test_env.tc)
.await
}

#[test(tokio::test(start_paused = true))]
Expand Down Expand Up @@ -742,78 +736,76 @@ mod tests {
.set_scheduling_plan(initial_scheduling_plan)
.build()
.await;
let tc = env.tc.clone();
env.tc
.run_in_scope("test", None, async move {
let mut scheduler = Scheduler::init(
Configuration::pinned().as_ref(),
tc,
metadata_store_client.clone(),
networking,
)
.await?;
let mut observed_cluster_state = ObservedClusterState::default();
async move {
let mut scheduler = Scheduler::init(
Configuration::pinned().as_ref(),
metadata_store_client.clone(),
networking,
)
.await?;
let mut observed_cluster_state = ObservedClusterState::default();

for _ in 0..num_scheduling_rounds {
let cluster_state = random_cluster_state(&node_ids, num_partitions);
for _ in 0..num_scheduling_rounds {
let cluster_state = random_cluster_state(&node_ids, num_partitions);

observed_cluster_state.update(&cluster_state);
scheduler
.on_observed_cluster_state(
&observed_cluster_state,
&metadata().nodes_config_ref(),
NoPlacementHints,
)
.await?;
// collect all control messages from the network to build up the effective scheduling plan
let control_messages = control_recv
.as_mut()
.take_until(tokio::time::sleep(Duration::from_secs(10)))
.collect::<Vec<_>>()
.await;

let observed_cluster_state =
derive_observed_cluster_state(&cluster_state, control_messages);
let target_scheduling_plan = metadata_store_client
.get::<SchedulingPlan>(SCHEDULING_PLAN_KEY.clone())
.await?
.expect("the scheduler should have created a scheduling plan");

// assert that the effective scheduling plan aligns with the target scheduling plan
assert_that!(
observed_cluster_state,
matches_scheduling_plan(&target_scheduling_plan)
);

let alive_nodes: HashSet<_> = cluster_state
.alive_nodes()
.map(|node| node.generational_node_id.as_plain())
.collect();

for (_, target_state) in target_scheduling_plan.iter() {
// assert that every partition has a leader which is part of the alive nodes set
assert!(target_state
.leader
.is_some_and(|leader| alive_nodes.contains(&leader)));

// assert that the replication strategy was respected
match replication_strategy {
ReplicationStrategy::OnAllNodes => {
assert_eq!(target_state.node_set, alive_nodes)
}
ReplicationStrategy::Factor(replication_factor) => assert_eq!(
target_state.node_set.len(),
alive_nodes.len().min(
usize::try_from(replication_factor.get())
.expect("u32 fits into usize")
)
),
observed_cluster_state.update(&cluster_state);
scheduler
.on_observed_cluster_state(
&observed_cluster_state,
&metadata().nodes_config_ref(),
NoPlacementHints,
)
.await?;
// collect all control messages from the network to build up the effective scheduling plan
let control_messages = control_recv
.as_mut()
.take_until(tokio::time::sleep(Duration::from_secs(10)))
.collect::<Vec<_>>()
.await;

let observed_cluster_state =
derive_observed_cluster_state(&cluster_state, control_messages);
let target_scheduling_plan = metadata_store_client
.get::<SchedulingPlan>(SCHEDULING_PLAN_KEY.clone())
.await?
.expect("the scheduler should have created a scheduling plan");

// assert that the effective scheduling plan aligns with the target scheduling plan
assert_that!(
observed_cluster_state,
matches_scheduling_plan(&target_scheduling_plan)
);

let alive_nodes: HashSet<_> = cluster_state
.alive_nodes()
.map(|node| node.generational_node_id.as_plain())
.collect();

for (_, target_state) in target_scheduling_plan.iter() {
// assert that every partition has a leader which is part of the alive nodes set
assert!(target_state
.leader
.is_some_and(|leader| alive_nodes.contains(&leader)));

// assert that the replication strategy was respected
match replication_strategy {
ReplicationStrategy::OnAllNodes => {
assert_eq!(target_state.node_set, alive_nodes)
}
ReplicationStrategy::Factor(replication_factor) => assert_eq!(
target_state.node_set.len(),
alive_nodes.len().min(
usize::try_from(replication_factor.get())
.expect("u32 fits into usize")
)
),
}
}
googletest::Result::Ok(())
})
.await?;
}
googletest::Result::Ok(())
}
.in_tc(&env.tc)
.await?;

Ok(())
}
Expand Down
Loading

0 comments on commit edf7f31

Please sign in to comment.