From 9a1b4cbb1f7f69eed87f167fbc22858a4c6839aa Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Fri, 15 Nov 2024 19:50:47 +0200 Subject: [PATCH] Improved logging for partition routing updates and decisions --- .../network/partition_processor_rpc_client.rs | 6 ++++ crates/core/src/partitions/mod.rs | 34 +++++++++++++------ 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/crates/core/src/network/partition_processor_rpc_client.rs b/crates/core/src/network/partition_processor_rpc_client.rs index a0d26fece..e6e9e30df 100644 --- a/crates/core/src/network/partition_processor_rpc_client.rs +++ b/crates/core/src/network/partition_processor_rpc_client.rs @@ -334,6 +334,12 @@ where .get_node_by_partition(partition_id) .ok_or(PartitionProcessorRpcClientError::UnknownNode(partition_id))?; + trace!( + %partition_id, + "Sending PartitionProcessor RPC request to node: {}", + node_id + ); + let rpc_result = self .rpc_router .call( diff --git a/crates/core/src/partitions/mod.rs b/crates/core/src/partitions/mod.rs index d8c84b19c..d1964e05b 100644 --- a/crates/core/src/partitions/mod.rs +++ b/crates/core/src/partitions/mod.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use arc_swap::ArcSwap; use tokio::sync::mpsc; use tokio::time::MissedTickBehavior; -use tracing::{debug, trace}; +use tracing::{debug, trace, warn}; use xxhash_rust::xxh3::Xxh3Builder; use restate_types::cluster_controller::SchedulingPlan; @@ -165,15 +165,12 @@ impl PartitionRoutingRefresher { debug!("Routing information refresher stopped"); break; } - Some(cmd) = self.receiver.recv() => { - match cmd { - Command::SyncRoutingInformation => { - self.spawn_sync_routing_information_task(); - } - } + Some(_) = self.receiver.recv() => { + trace!("Refreshing routing information (on-demand)"); + self.spawn_sync_routing_information_task(); } _ = update_interval.tick() => { - trace!("Refreshing routing information..."); + trace!("Refreshing routing information (periodic refresh)"); self.spawn_sync_routing_information_task(); } } @@ -224,11 +221,12 @@ async fn sync_routing_information( partition_to_node_mappings: Arc>, metadata_store_client: MetadataStoreClient, ) { + trace!("Fetching scheduling plan from metadata store"); let result: Result, _> = metadata_store_client.get(SCHEDULING_PLAN_KEY.clone()).await; let Ok(scheduling_plan) = result else { - debug!( + warn!( "Failed to fetch scheduling plan from metadata store: {:?}", result ); @@ -238,13 +236,18 @@ async fn sync_routing_information( let scheduling_plan = match scheduling_plan { Some(plan) => plan, None => { - debug!("No scheduling plan found in metadata store, unable to refresh partition routing information"); + warn!("No scheduling plan found in metadata store, unable to refresh partition routing information"); return; } }; let current_mappings = partition_to_node_mappings.load(); if scheduling_plan.version() <= current_mappings.version { + trace!( + "No need to update partition routing information - current version: {}, scheduling plan version: {}", + current_mappings.version, + scheduling_plan.version() + ); return; // No need for update } @@ -254,6 +257,12 @@ async fn sync_routing_information( partition_nodes.insert(*partition_id, leader.into()); } } + trace!( + "Updating partition routing information from scheduling plan {} (previous: {}): {:?}", + scheduling_plan.version(), + current_mappings.version, + partition_nodes + ); let _ = partition_to_node_mappings.compare_and_swap( current_mappings, @@ -262,6 +271,11 @@ async fn sync_routing_information( inner: partition_nodes, }), ); + + trace!( + "Partition routing information updated to version {}", + partition_to_node_mappings.load().version + ); } #[cfg(any(test, feature = "test-util"))]