Skip to content

Commit

Permalink
Make OptimisticStore mut
Browse files Browse the repository at this point in the history
  • Loading branch information
igalshilman committed Nov 22, 2024
1 parent 2f76ae8 commit f20aa93
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 46 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/metadata_store/providers/objstore/glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Server {

let mut shutdown = pin::pin!(cancellation_watcher());

let delegate = match builder.build().await {
let mut delegate = match builder.build().await {
Ok(delegate) => delegate,
Err(err) => {
warn!(error = ?err, "error while loading latest metastore version.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,14 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use arc_swap::ArcSwap;
use bytestring::ByteString;
use tokio::time::Instant;

use restate_types::config::MetadataStoreClient;
use crate::metadata_store::providers::objstore::immutable_store::ImmutableStore;
use crate::metadata_store::providers::objstore::version_repository::{
VersionRepository, VersionRepositoryError,
};

Check warning on line 22 in crates/core/src/metadata_store/providers/objstore/optimistic_store.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

Diff in /home/runner/work/restate/restate/crates/core/src/metadata_store/providers/objstore/optimistic_store.rs
use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError};
use restate_types::config::MetadataStoreClient;
use crate::metadata_store::{Precondition, ReadError, VersionedValue, WriteError};
use restate_types::Version;

pub(crate) struct OptimisticLockingMetadataStoreBuilder {
Expand All @@ -32,7 +30,7 @@ pub(crate) struct OptimisticLockingMetadataStoreBuilder {

impl OptimisticLockingMetadataStoreBuilder {
pub(crate) async fn build(&self) -> anyhow::Result<OptimisticLockingMetadataStore> {
let refresh_duration = Duration::from_secs(5);
let refresh_duration = Duration::from_secs(1);
Ok(OptimisticLockingMetadataStore::new(
refresh_duration,
self.version_repository.clone(),
Expand All @@ -53,7 +51,7 @@ impl CachedStore {
}
}

fn is_stall(&self, duration: Duration) -> bool {
fn is_stale(&self, duration: Duration) -> bool {
self.created_at.elapsed() > duration
}
}
Expand All @@ -69,7 +67,7 @@ impl Default for CachedStore {

pub struct OptimisticLockingMetadataStore {
version_repository: Arc<dyn VersionRepository>,
latest_store_cache: ArcSwap<Option<CachedStore>>,
latest_store_cache: Option<CachedStore>,
refresh_interval: Duration,
}

Expand All @@ -82,14 +80,12 @@ impl OptimisticLockingMetadataStore {
}
}

async fn maybe_update_cached_store(&self, force_update: bool) -> anyhow::Result<()> {
async fn maybe_update_cached_store(&mut self, force_update: bool) -> anyhow::Result<()> {
let needs_refresh = force_update
|| self
.latest_store_cache
.load()
.as_ref()
.as_ref()
.map(|store| store.is_stall(self.refresh_interval))
.map(|cache| cache.is_stale(self.refresh_interval))
.unwrap_or(true);

if !needs_refresh {
Expand All @@ -104,8 +100,6 @@ impl OptimisticLockingMetadataStore {

let maybe_cached_version = self
.latest_store_cache
.load()
.as_ref()
.as_ref()
.map(|cache| cache.store.current_version());

Expand All @@ -114,8 +108,7 @@ impl OptimisticLockingMetadataStore {
// we will optimistically create an empty store, assuming that there is no
// previous metadata stored.
if maybe_cached_version.is_none() {
self.latest_store_cache
.store(Arc::new(Some(CachedStore::default())));
self.latest_store_cache = Some(CachedStore::default());
}
return Ok(());
};
Expand All @@ -136,31 +129,31 @@ impl OptimisticLockingMetadataStore {
ImmutableStore::deserialize(latest_store_buf).map_err(WriteError::Network)?;

let cache = CachedStore::new(last_store);
self.latest_store_cache.store(Arc::new(Some(cache)));
self.latest_store_cache = Some(cache);
return Ok(());
}
Ok(())
}

async fn get_or_update_store(&self) -> anyhow::Result<Arc<Option<CachedStore>>> {
async fn get_or_update_store(&mut self) -> anyhow::Result<&CachedStore> {
self.maybe_update_cached_store(false).await?;
let maybe_current_store = self.latest_store_cache.load_full();
let maybe_current_store = self.latest_store_cache.as_ref().expect("bug");
Ok(maybe_current_store)
}

async fn compare_and_swap(
&self,
&mut self,
mut op: impl FnMut(&ImmutableStore) -> Result<ImmutableStore, WriteError>,
) -> Result<(), WriteError> {
for _retry_count in 0..25 {
let current_store = self
.get_or_update_store()
.await

Check warning on line 151 in crates/core/src/metadata_store/providers/objstore/optimistic_store.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

Diff in /home/runner/work/restate/restate/crates/core/src/metadata_store/providers/objstore/optimistic_store.rs
.map_err(|e| WriteError::Network(e.into()))?;
let res = match current_store.as_ref() {
Some(cached_store) => op(&cached_store.store),
None => unreachable!(),
};



let res = op(&current_store.store);
let Ok(new_store) = res else {
// a precondition failure. the actual failure is reflected in the Err.
return res.map(|_| ());
Expand All @@ -179,8 +172,7 @@ impl OptimisticLockingMetadataStore {
"successfully published a new metadata store version {}",
new_store.current_version()
);
self.latest_store_cache
.store(Arc::new(Some(CachedStore::new(new_store))));
self.latest_store_cache = Some(CachedStore::new(new_store));
return Ok(());
}
Err(VersionRepositoryError::AlreadyExists(_)) => {
Expand All @@ -201,40 +193,30 @@ impl OptimisticLockingMetadataStore {
}
Err(WriteError::Internal("retry count exhausted".to_string()))

Check warning on line 194 in crates/core/src/metadata_store/providers/objstore/optimistic_store.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

Diff in /home/runner/work/restate/restate/crates/core/src/metadata_store/providers/objstore/optimistic_store.rs
}
}

#[async_trait::async_trait]
impl MetadataStore for OptimisticLockingMetadataStore {
async fn get(&self, key: ByteString) -> Result<Option<VersionedValue>, ReadError> {
pub(crate) async fn get(&mut self, key: ByteString) -> Result<Option<VersionedValue>, ReadError> {
let cached_store = self
.get_or_update_store()
.await
.map_err(|e| ReadError::Network(e.into()))?;

let store = cached_store
.as_ref()
.as_ref()
.expect("BUG: a cached value must be stored after a call to a get_or_update_store()");

Ok(store.store.get(key))
Ok(cached_store.store.get(key))
}

async fn get_version(&self, key: ByteString) -> Result<Option<Version>, ReadError> {
let cached_store = self
pub(crate) async fn get_version(
&mut self,
key: ByteString,
) -> Result<Option<Version>, ReadError> {
let store = self
.get_or_update_store()
.await
.map_err(|e| ReadError::Network(e.into()))?;

let store = cached_store
.as_ref()
.as_ref()
.expect("BUG: a cached value must be stored after a call to a get_or_update_store()");

Ok(store.store.get_version(key))
}

async fn put(
&self,
pub(crate) async fn put(
&mut self,
key: ByteString,
value: VersionedValue,
precondition: Precondition,
Expand All @@ -245,7 +227,11 @@ impl MetadataStore for OptimisticLockingMetadataStore {
.await
}

async fn delete(&self, key: ByteString, precondition: Precondition) -> Result<(), WriteError> {
pub(crate) async fn delete(
&mut self,
key: ByteString,
precondition: Precondition,
) -> Result<(), WriteError> {
self.compare_and_swap(move |current_store| {
current_store.delete(key.clone(), precondition.clone())
})
Expand Down

0 comments on commit f20aa93

Please sign in to comment.