Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
igalshilman committed Nov 19, 2024
1 parent ad240ac commit 2ca7fd8
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 30 deletions.
36 changes: 30 additions & 6 deletions crates/core/src/metadata_store/providers/objstore/glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::metadata_store::providers::objstore::metadata_impl::ObjectStoreMetastoreImpl;
use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError};
use std::time::Duration;

use bytestring::ByteString;
use restate_types::Version;
use tokio::select;
use tokio::sync::oneshot::Sender;

use restate_types::Version;

use crate::metadata_store::providers::objstore::metadata_impl::ObjectStoreMetastoreImpl;
use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError};

#[derive(Debug)]
pub(crate) enum Commands {
Get {
Expand Down Expand Up @@ -61,12 +65,30 @@ impl Server {
delegate,
} = self;

// an optimization to pre-fetch the latest version
// TODO: log failures
let _ = delegate.load_last_version().await;
// pre-fetch the latest version
// TODO: should there be a limit before we give up? figure out how to exit gracefully?
loop {
match delegate.load_last_version().await {
Ok(_) => {
break;
}
Err(e) => {
tracing::warn!(error = ?e, "error while loading latest metastore version, retrying in 1 sec...");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}

let mut refresh = tokio::time::interval(Duration::from_secs(5));

loop {
select! {
_ = refresh.tick() => {
// periodically check for a newer version
if let Err(e) = delegate.load_last_version().await {
tracing::warn!(error = ?e, "error while loading latest metastore version, will retry in the next interval.");
}
}
Some(cmd) = receiver.recv() => {
match cmd {
Commands::Get{key,tx } => {
Expand All @@ -83,11 +105,13 @@ impl Server {
let _ = tx.send(delegate.delete(key, precondition).await);
}
Commands::Shutdown => {
tracing::info!("received a shutdown request, exiting the polling metadata loop");
break;
}
}
}
else => {
tracing::info!("input channel is closed, exiting the metadata polling loop.");
break;
}
}
Expand Down
42 changes: 31 additions & 11 deletions crates/core/src/metadata_store/providers/objstore/metadata_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use crate::metadata_store::providers::objstore::version_repository::{
use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError};
use arc_swap::ArcSwap;
use bytestring::ByteString;
use rand::Rng;
use restate_types::errors::GenericError;
use restate_types::Version;
use std::cmp::Ordering;
use std::sync::Arc;

pub struct ObjectStoreMetastoreImpl {
Expand All @@ -26,19 +28,34 @@ impl ObjectStoreMetastoreImpl {
let latest_version = self.version_repository.get_latest_version().await?;

let Some(latest) = latest_version else {
// no currently known version, let's initialize a default one.
let store = Default::default();
self.latest_store_cache.store(Arc::new(store));
// nothing to do, currently there is no known (or visible) latest version.
// we will use our local cached version.
return Ok(());
};

let latest_store_buf = self.version_repository.get_version(latest).await?;

let last_store = ImmutableStore::deserialize(latest_store_buf)?;

self.latest_store_cache.store(Arc::new(last_store));
let cached_version = self.latest_store_cache.load().current_version();
match cached_version.cmp(&latest) {
Ordering::Equal => {
// cache hit
Ok(())
}
Ordering::Less => {
// cache miss
let latest_store_buf = self.version_repository.get_version(latest).await?;
let last_store = ImmutableStore::deserialize(latest_store_buf)?;
self.latest_store_cache.store(Arc::new(last_store));

Ok(())
Ok(())
}
Ordering::Greater => {
// TODO(igal): this is very suspicious, theoretically this should not happen,
// but i don't think we should panic here, because there could be an object store
// that has an eventual consistent listing? so momentarily it might appear that our cached version
// is larger than the latest cached version.
// this should not affect correctness however.
Ok(())
}
}
}

async fn compare_and_swap(
Expand Down Expand Up @@ -68,8 +85,11 @@ impl ObjectStoreMetastoreImpl {
}
Err(VersionRepositoryError::AlreadyExists(_)) => {
// TODO: use a more sophisticated and configurable jitter
tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;

let delay = {
let mut rng = rand::thread_rng();
rng.gen_range(50..300)
};
tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
// refresh and try again
self.load_last_version().await.map_err(WriteError::Store)?;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::metadata_store::providers::objstore::version_repository::{
VersionRepository, VersionRepositoryError,
};
use std::sync::Arc;

use bytes::Bytes;
use futures::TryStreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{Error, ObjectStore, PutMode, PutOptions, PutPayload};

use crate::metadata_store::providers::objstore::version_repository::{
VersionRepository, VersionRepositoryError,
};
use restate_types::Version;
use std::sync::Arc;

#[derive(Debug)]
pub(crate) struct ObjectStoreVersionRepository {
Expand Down Expand Up @@ -48,15 +50,15 @@ impl VersionRepository for ObjectStoreVersionRepository {
.await
.map_err(|e| VersionRepositoryError::Network(e.into()))?
{
let name = version.location.filename().unwrap();
if !name.starts_with("v") {
continue;
}
let v = name[1..]
.parse::<u32>()
.map_err(|e| VersionRepositoryError::Network(e.into()))?;
if v > current {
current = v;
if let Some(name) = version.location.filename() {
if !name.starts_with("v") {
continue;
}
if let Ok(v) = name[1..].parse::<u32>() {
if v > current {
current = v;
}
}
}
}

Expand Down

0 comments on commit 2ca7fd8

Please sign in to comment.