Skip to content

Commit

Permalink
Add logging
Browse files Browse the repository at this point in the history
  • Loading branch information
igalshilman committed Nov 19, 2024
1 parent 2ca7fd8 commit be3e74c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
4 changes: 2 additions & 2 deletions crates/core/src/metadata_store/providers/objstore/glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Server {
// pre-fetch the latest version
// TODO: should there be a limit before we give up? figure out how to exit gracefully?
loop {
match delegate.load_last_version().await {
match delegate.load_latest_version().await {
Ok(_) => {
break;
}
Expand All @@ -85,7 +85,7 @@ impl Server {
select! {
_ = refresh.tick() => {
// periodically check for a newer version
if let Err(e) = delegate.load_last_version().await {
if let Err(e) = delegate.load_latest_version().await {
tracing::warn!(error = ?e, "error while loading latest metastore version, will retry in the next interval.");
}
}
Expand Down
19 changes: 17 additions & 2 deletions crates/core/src/metadata_store/providers/objstore/metadata_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl ObjectStoreMetastoreImpl {
}
}

pub(crate) async fn load_last_version(&self) -> Result<(), GenericError> {
pub(crate) async fn load_latest_version(&self) -> Result<(), GenericError> {
let latest_version = self.version_repository.get_latest_version().await?;

let Some(latest) = latest_version else {
Expand All @@ -37,10 +37,19 @@ impl ObjectStoreMetastoreImpl {
match cached_version.cmp(&latest) {
Ordering::Equal => {
// cache hit
tracing::trace!(
"No new version detected (current version: {})",
cached_version
);
Ok(())
}
Ordering::Less => {
// cache miss
tracing::info!(
"New metadata version discovered {} (previously known was {})",
latest,
cached_version
);
let latest_store_buf = self.version_repository.get_version(latest).await?;
let last_store = ImmutableStore::deserialize(latest_store_buf)?;
self.latest_store_cache.store(Arc::new(last_store));
Expand Down Expand Up @@ -80,6 +89,10 @@ impl ObjectStoreMetastoreImpl {
{
Ok(_) => {
// All good!
tracing::info!(
"successfully published a new metadata store version {}",
new_store.current_version()
);
self.latest_store_cache.store(Arc::new(new_store));
return Ok(());
}
Expand All @@ -91,7 +104,9 @@ impl ObjectStoreMetastoreImpl {
};
tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
// refresh and try again
self.load_last_version().await.map_err(WriteError::Store)?;
self.load_latest_version()
.await
.map_err(WriteError::Store)?;
}
Err(e) => {
return Err(WriteError::Store(e.into()));
Expand Down

0 comments on commit be3e74c

Please sign in to comment.