Skip to content

Commit

Permalink
Debug logging around Ingress PP node routing and partition refreshes
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Nov 6, 2024
1 parent c490b19 commit a17f33a
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 11 deletions.
6 changes: 6 additions & 0 deletions crates/core/src/network/partition_processor_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use restate_types::net::partition_processor::{
PartitionProcessorRpcResponse, SubmittedInvocationNotification,
};
use restate_types::partition_table::{FindPartition, PartitionTable, PartitionTableError};
use tracing::info;

use crate::network::rpc_router::{ConnectionAwareRpcError, ConnectionAwareRpcRouter, RpcError};
use crate::network::{HasConnection, Networking, Outgoing, TransportConnect};
Expand Down Expand Up @@ -324,6 +325,11 @@ where
.get_node_by_partition(partition_id)
.ok_or(PartitionProcessorRpcClientError::UnknownNode(partition_id))?;

info!(
"Sending RPC request for partition {} to node: {}",
partition_id, node_id
);

let response = self
.rpc_router
.call(
Expand Down
41 changes: 30 additions & 11 deletions crates/core/src/partitions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;
use arc_swap::ArcSwap;
use tokio::sync::mpsc;
use tokio::time::MissedTickBehavior;
use tracing::{debug, trace};
use tracing::{debug, trace, warn};
use xxhash_rust::xxh3::Xxh3Builder;

use restate_types::cluster_controller::SchedulingPlan;
Expand Down Expand Up @@ -149,7 +149,9 @@ impl PartitionRoutingRefresher {
}

async fn run(mut self) -> anyhow::Result<()> {
debug!("Routing information refresher started");
debug!("Routing information refresher started - sleeping");
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
debug!("Routing information refresher started - actually refreshing now");

let update_interval = Configuration::pinned()
.common
Expand All @@ -165,15 +167,12 @@ impl PartitionRoutingRefresher {
debug!("Routing information refresher stopped");
break;
}
Some(cmd) = self.receiver.recv() => {
match cmd {
Command::SyncRoutingInformation => {
self.spawn_sync_routing_information_task();
}
}
Some(_) = self.receiver.recv() => {
trace!("Refreshing routing information (on-demand)...");
self.spawn_sync_routing_information_task();
}
_ = update_interval.tick() => {
trace!("Refreshing routing information...");
trace!("Refreshing routing information (tick)...");
self.spawn_sync_routing_information_task();
}
}
Expand All @@ -190,14 +189,17 @@ impl PartitionRoutingRefresher {
let partition_to_node_mappings = self.inner.clone();
let metadata_store_client = self.metadata_store_client.clone();

trace!("Starting partition information refresh task");
let task = task_center().spawn_unmanaged(
TaskKind::Disposable,
"refresh-routing-information",
None,
{
async move {
trace!("refresh-routing-information task started...");
sync_routing_information(partition_to_node_mappings, metadata_store_client)
.await;
trace!("refresh-routing-information completed.");
}
},
);
Expand All @@ -222,11 +224,12 @@ async fn sync_routing_information(
partition_to_node_mappings: Arc<ArcSwap<PartitionToNodesRoutingTable>>,
metadata_store_client: MetadataStoreClient,
) {
trace!("Fetching scheduling plan from metadata store");
let result: Result<Option<SchedulingPlan>, _> =
metadata_store_client.get(SCHEDULING_PLAN_KEY.clone()).await;

let Ok(scheduling_plan) = result else {
debug!(
warn!(
"Failed to fetch scheduling plan from metadata store: {:?}",
result
);
Expand All @@ -236,13 +239,18 @@ async fn sync_routing_information(
let scheduling_plan = match scheduling_plan {
Some(plan) => plan,
None => {
debug!("No scheduling plan found in metadata store, unable to refresh partition routing information");
warn!("No scheduling plan found in metadata store, unable to refresh partition routing information");
return;
}
};

let current_mappings = partition_to_node_mappings.load();
if scheduling_plan.version() <= current_mappings.version {
trace!(
"No need to update partition routing information - current version: {}, scheduling plan version: {}",
current_mappings.version,
scheduling_plan.version()
);
return; // No need for update
}

Expand All @@ -252,6 +260,12 @@ async fn sync_routing_information(
partition_nodes.insert(*partition_id, leader.into());
}
}
trace!(
"Updating partition routing information from scheduling plan {} (previous: {}): {:?}",
scheduling_plan.version(),
current_mappings.version,
partition_nodes
);

let _ = partition_to_node_mappings.compare_and_swap(
current_mappings,
Expand All @@ -260,6 +274,11 @@ async fn sync_routing_information(
inner: partition_nodes,
}),
);

trace!(
"Partition routing information updated to version {}",
partition_to_node_mappings.load().version
);
}

#[cfg(any(test, feature = "test-util"))]
Expand Down
1 change: 1 addition & 0 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ impl Node {
"local-metadata-store",
None,
async move {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
metadata_store.run().await?;
Ok(())
},
Expand Down

0 comments on commit a17f33a

Please sign in to comment.