Skip to content

Commit

Permalink
Improved logging for partition routing updates and decisions
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Nov 15, 2024
1 parent 19d1aee commit 9a1b4cb
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 10 deletions.
6 changes: 6 additions & 0 deletions crates/core/src/network/partition_processor_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,12 @@ where
.get_node_by_partition(partition_id)
.ok_or(PartitionProcessorRpcClientError::UnknownNode(partition_id))?;

trace!(
%partition_id,
"Sending PartitionProcessor RPC request to node: {}",
node_id
);

let rpc_result = self
.rpc_router
.call(
Expand Down
34 changes: 24 additions & 10 deletions crates/core/src/partitions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;
use arc_swap::ArcSwap;
use tokio::sync::mpsc;
use tokio::time::MissedTickBehavior;
use tracing::{debug, trace};
use tracing::{debug, trace, warn};
use xxhash_rust::xxh3::Xxh3Builder;

use restate_types::cluster_controller::SchedulingPlan;
Expand Down Expand Up @@ -165,15 +165,12 @@ impl PartitionRoutingRefresher {
debug!("Routing information refresher stopped");
break;
}
Some(cmd) = self.receiver.recv() => {
match cmd {
Command::SyncRoutingInformation => {
self.spawn_sync_routing_information_task();
}
}
Some(_) = self.receiver.recv() => {
trace!("Refreshing routing information (on-demand)");
self.spawn_sync_routing_information_task();
}
_ = update_interval.tick() => {
trace!("Refreshing routing information...");
trace!("Refreshing routing information (periodic refresh)");
self.spawn_sync_routing_information_task();
}
}
Expand Down Expand Up @@ -224,11 +221,12 @@ async fn sync_routing_information(
partition_to_node_mappings: Arc<ArcSwap<PartitionToNodesRoutingTable>>,
metadata_store_client: MetadataStoreClient,
) {
trace!("Fetching scheduling plan from metadata store");
let result: Result<Option<SchedulingPlan>, _> =
metadata_store_client.get(SCHEDULING_PLAN_KEY.clone()).await;

let Ok(scheduling_plan) = result else {
debug!(
warn!(
"Failed to fetch scheduling plan from metadata store: {:?}",
result
);
Expand All @@ -238,13 +236,18 @@ async fn sync_routing_information(
let scheduling_plan = match scheduling_plan {
Some(plan) => plan,
None => {
debug!("No scheduling plan found in metadata store, unable to refresh partition routing information");
warn!("No scheduling plan found in metadata store, unable to refresh partition routing information");
return;
}
};

let current_mappings = partition_to_node_mappings.load();
if scheduling_plan.version() <= current_mappings.version {
trace!(
"No need to update partition routing information - current version: {}, scheduling plan version: {}",
current_mappings.version,
scheduling_plan.version()
);
return; // No need for update
}

Expand All @@ -254,6 +257,12 @@ async fn sync_routing_information(
partition_nodes.insert(*partition_id, leader.into());
}
}
trace!(
"Updating partition routing information from scheduling plan {} (previous: {}): {:?}",
scheduling_plan.version(),
current_mappings.version,
partition_nodes
);

let _ = partition_to_node_mappings.compare_and_swap(
current_mappings,
Expand All @@ -262,6 +271,11 @@ async fn sync_routing_information(
inner: partition_nodes,
}),
);

trace!(
"Partition routing information updated to version {}",
partition_to_node_mappings.load().version
);
}

#[cfg(any(test, feature = "test-util"))]
Expand Down

0 comments on commit 9a1b4cb

Please sign in to comment.