Skip to content

Commit

Permalink
Check for newer versioned on get's and cas failures
Browse files Browse the repository at this point in the history
  • Loading branch information
igalshilman committed Nov 22, 2024
1 parent 2dd67ca commit cb9f837
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 89 deletions.
19 changes: 6 additions & 13 deletions crates/core/src/metadata_store/providers/objstore/glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::cancellation_watcher;
use crate::metadata_store::providers::objstore::optimistic_store::OptimisticLockingMetadataStoreBuilder;
use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError};
use std::pin;

use bytestring::ByteString;
use restate_types::Version;
use std::pin;
use std::time::Duration;
use tokio::select;
use tokio::sync::oneshot::Sender;
use tracing::warn;

use crate::cancellation_watcher;
use crate::metadata_store::providers::objstore::optimistic_store::OptimisticLockingMetadataStoreBuilder;
use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError};

#[derive(Debug)]
pub(crate) enum Commands {
Get {
Expand Down Expand Up @@ -71,20 +72,12 @@ impl Server {
}
};

let mut refresh = tokio::time::interval(Duration::from_secs(2));

loop {
select! {
_ = &mut shutdown => {
// stop accepting messages
return Ok(());
}
_ = refresh.tick() => {
// periodically check for a newer version
if let Err(e) = delegate.load_latest_version().await {
warn!(error = ?e, "error while loading latest metastore version, will retry in the next interval.");
}
}
Some(cmd) = receiver.recv() => {
match cmd {
Commands::Get{key,tx } => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::metadata_store::{Precondition, VersionedValue, WriteError};
use std::collections::BTreeMap;

use bytes::Bytes;
use bytestring::ByteString;

use crate::metadata_store::{Precondition, VersionedValue, WriteError};
use restate_types::errors::GenericError;
use restate_types::Version;
use std::collections::BTreeMap;

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
pub(crate) struct ImmutableStore {
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/metadata_store/providers/objstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use crate::metadata_store::providers::objstore::object_store_version_repository::ObjectStoreVersionRepository;
use crate::metadata_store::providers::objstore::optimistic_store::OptimisticLockingMetadataStoreBuilder;
use crate::metadata_store::providers::objstore::version_repository::VersionRepository;
use crate::metadata_store::MetadataStore;
use crate::{task_center, TaskKind};
use restate_types::config::MetadataStoreClient;
use restate_types::errors::GenericError;
use std::sync::Arc;

mod glue;
mod immutable_store;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@

use std::sync::Arc;

use crate::metadata_store::providers::objstore::version_repository::{
VersionRepository, VersionRepositoryError,
};
use bytes::Bytes;
use futures::TryStreamExt;
use object_store::aws::AmazonS3Builder;
use object_store::aws::S3ConditionalPut::ETagMatch;
use object_store::path::Path;
use object_store::{Error, ObjectStore, PutMode, PutOptions, PutPayload};

use crate::metadata_store::providers::objstore::version_repository::{
VersionRepository, VersionRepositoryError,
};
use restate_types::config::MetadataStoreClient;
use restate_types::Version;

Expand Down
218 changes: 148 additions & 70 deletions crates/core/src/metadata_store/providers/objstore/optimistic_store.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use rand::Rng;
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use arc_swap::ArcSwap;
use bytestring::ByteString;
use tokio::time::Instant;

use crate::metadata_store::providers::objstore::immutable_store::ImmutableStore;
use crate::metadata_store::providers::objstore::version_repository::{
VersionRepository, VersionRepositoryError,
};
use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError};
use anyhow::anyhow;
use arc_swap::ArcSwap;
use bytestring::ByteString;
use rand::Rng;
use restate_types::config::MetadataStoreClient;
use restate_types::errors::GenericError;
use restate_types::Version;
use std::cmp::Ordering;
use std::sync::Arc;
use tokio::time::Instant;
use tracing::log::warn;

pub(crate) struct OptimisticLockingMetadataStoreBuilder {
pub(crate) version_repository: Arc<dyn VersionRepository>,
Expand All @@ -22,85 +32,135 @@ pub(crate) struct OptimisticLockingMetadataStoreBuilder {

impl OptimisticLockingMetadataStoreBuilder {
pub(crate) async fn build(&self) -> anyhow::Result<OptimisticLockingMetadataStore> {
let store = OptimisticLockingMetadataStore::new(self.version_repository.clone());
for _ in 0..30 {
match store.load_latest_version().await {
Ok(_) => return Ok(store),
Err(e) => {
warn!("Error loading latest version, retrying in 1 second {}", e);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
}
let refresh_duration = Duration::from_secs(5);
Ok(OptimisticLockingMetadataStore::new(
refresh_duration,
self.version_repository.clone(),
))
}
}

struct CachedStore {
store: ImmutableStore,
created_at: Instant,
}

impl CachedStore {
fn new(store: ImmutableStore) -> Self {
Self {
store,
created_at: Instant::now(),
}
}

fn is_stall(&self, duration: Duration) -> bool {
self.created_at.elapsed() > duration
}
}

impl Default for CachedStore {
fn default() -> Self {
Self {
store: Default::default(),
created_at: Instant::now(),
}
Err(anyhow!("Max retries reached. unable to create a metastore client. please check your configurations"))
}
}

pub struct OptimisticLockingMetadataStore {
version_repository: Arc<dyn VersionRepository>,
latest_store_cache: ArcSwap<ImmutableStore>,
latest_store_cache: ArcSwap<Option<CachedStore>>,
refresh_interval: Duration,
}

impl OptimisticLockingMetadataStore {
fn new(version_repository: Arc<dyn VersionRepository>) -> Self {
fn new(refresh_interval: Duration, version_repository: Arc<dyn VersionRepository>) -> Self {
Self {
version_repository,
refresh_interval,
latest_store_cache: Default::default(),
}
}

pub(crate) async fn load_latest_version(&self) -> Result<(), GenericError> {
let latest_version = self.version_repository.get_latest_version().await?;
async fn maybe_update_cached_store(&self, force_update: bool) -> anyhow::Result<()> {
let needs_refresh = force_update
|| self
.latest_store_cache
.load()
.as_ref()
.as_ref()
.map(|store| store.is_stall(self.refresh_interval))
.unwrap_or(true);

if !needs_refresh {
return Ok(());
}

let maybe_global_latest_version = self
.version_repository
.get_latest_version()
.await
.map_err(|e| anyhow!(e))?;

let Some(latest) = latest_version else {
let maybe_cached_version = self
.latest_store_cache
.load()
.as_ref()
.as_ref()
.map(|cache| cache.store.current_version());

let Some(global_latest_version) = maybe_global_latest_version else {
// nothing to do, currently there is no known (or visible) latest version.
// we will use our local cached version.
// we will optimistically create an empty store, assuming that there is no
// previous metadata stored.
if maybe_cached_version.is_none() {
self.latest_store_cache
.store(Arc::new(Some(CachedStore::default())));
}
return Ok(());
};

let cached_version = self.latest_store_cache.load().current_version();
match cached_version.cmp(&latest) {
Ordering::Equal => {
// cache hit
tracing::trace!(
"No new version detected (current version: {})",
cached_version
);
Ok(())
}
Ordering::Less => {
// cache miss
tracing::info!(
"New metadata version discovered {} (previously known was {})",
latest,
cached_version
);
let latest_store_buf = self.version_repository.get_version(latest).await?;
let last_store = ImmutableStore::deserialize(latest_store_buf)?;
self.latest_store_cache.store(Arc::new(last_store));

Ok(())
}
Ordering::Greater => {
// TODO(igal): this is very suspicious, theoretically this should not happen,
// but i don't think we should panic here, because there could be an object store
// that has an eventual consistent listing? so momentarily it might appear that our cached version
// is larger than the latest cached version.
// this should not affect correctness however.
Ok(())
}
if maybe_cached_version.is_none() || maybe_cached_version.unwrap() < global_latest_version {
tracing::info!(
"New metadata version discovered {} (previously known was {:?})",
global_latest_version,
maybe_cached_version
);
let latest_store_buf = self
.version_repository
.get_version(global_latest_version)
.await
.map_err(|e| anyhow!(e))?;

let last_store =
ImmutableStore::deserialize(latest_store_buf).map_err(WriteError::Network)?;

let cache = CachedStore::new(last_store);
self.latest_store_cache.store(Arc::new(Some(cache)));
return Ok(());
}
Ok(())
}

async fn get_or_update_store(&self) -> anyhow::Result<Arc<Option<CachedStore>>> {
self.maybe_update_cached_store(false).await?;
let maybe_current_store = self.latest_store_cache.load_full();
Ok(maybe_current_store)
}

async fn compare_and_swap(
&self,
mut op: impl FnMut(&ImmutableStore) -> Result<ImmutableStore, WriteError>,
) -> Result<(), WriteError> {
for _retry_count in 0..25 {
// TODO: make retry count+strategy configurable
let current_store = self.latest_store_cache.load();
let res = op(&current_store);
let current_store = self
.get_or_update_store()
.await
.map_err(|e| WriteError::Network(e.into()))?;
let res = match current_store.as_ref() {
Some(cached_store) => op(&cached_store.store),
None => unreachable!(),
};
let Ok(new_store) = res else {
// a precondition failure. the actual failure is reflected in the Err.
return res.map(|_| ());
Expand All @@ -119,20 +179,20 @@ impl OptimisticLockingMetadataStore {
"successfully published a new metadata store version {}",
new_store.current_version()
);
self.latest_store_cache.store(Arc::new(new_store));
self.latest_store_cache
.store(Arc::new(Some(CachedStore::new(new_store))));
return Ok(());
}
Err(VersionRepositoryError::AlreadyExists(_)) => {
// TODO: use a more sophisticated and configurable jitter
// refresh and try again
let delay = {
let mut rng = rand::thread_rng();
rng.gen_range(50..300)
};
tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
// refresh and try again
self.load_latest_version()
tokio::time::sleep(Duration::from_millis(delay)).await;
self.maybe_update_cached_store(true)
.await
.map_err(WriteError::Store)?;
.map_err(|e| WriteError::Network(e.into()))?;
}
Err(e) => {
return Err(WriteError::Store(e.into()));
Expand All @@ -146,13 +206,31 @@ impl OptimisticLockingMetadataStore {
#[async_trait::async_trait]
impl MetadataStore for OptimisticLockingMetadataStore {
async fn get(&self, key: ByteString) -> Result<Option<VersionedValue>, ReadError> {
let guard = self.latest_store_cache.load();
Ok(guard.get(key))
let cached_store = self
.get_or_update_store()
.await
.map_err(|e| ReadError::Network(e.into()))?;

let store = cached_store
.as_ref()
.as_ref()
.expect("BUG: a cached value must be stored after a call to a get_or_update_store()");

Ok(store.store.get(key))
}

async fn get_version(&self, key: ByteString) -> Result<Option<Version>, ReadError> {
let guard = self.latest_store_cache.load();
Ok(guard.get_version(key))
let cached_store = self
.get_or_update_store()
.await
.map_err(|e| ReadError::Network(e.into()))?;

let store = cached_store
.as_ref()
.as_ref()
.expect("BUG: a cached value must be stored after a call to a get_or_update_store()");

Ok(store.store.get_version(key))
}

async fn put(
Expand Down

0 comments on commit cb9f837

Please sign in to comment.