Skip to content

Commit

Permalink
Add an object_store backed metadata store
Browse files Browse the repository at this point in the history
  • Loading branch information
igalshilman committed Nov 19, 2024
1 parent 73fa9df commit 638607a
Show file tree
Hide file tree
Showing 12 changed files with 707 additions and 2 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ datafusion = { version = "42.0.0", default-features = false, features = [
"regex_expressions",
"unicode_expressions",
] }
object_store = { version = "0.11.1"}
datafusion-expr = { version = "42.0.0" }
derive_builder = "0.20.0"
derive_more = { version = "1", features = ["full"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ hyper-util = { workspace = true }
metrics = { workspace = true }
opentelemetry = { workspace = true }
once_cell = { workspace = true }
object_store = { workspace = true }
parking_lot = { workspace = true }
pin-project = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
rand = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true}
serde_with = { workspace = true }
static_assertions = { workspace = true }
strum = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/metadata_store/providers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod etcd;
mod objstore;

pub use etcd::EtcdMetadataStore;
pub use objstore::create_object_store_based_meta_store;
202 changes: 202 additions & 0 deletions crates/core/src/metadata_store/providers/objstore/glue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::time::Duration;

use bytestring::ByteString;
use tokio::select;
use tokio::sync::oneshot::Sender;

use restate_types::Version;

use crate::metadata_store::providers::objstore::metadata_impl::ObjectStoreMetastoreImpl;
use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError};

#[derive(Debug)]
pub(crate) enum Commands {
Get {
key: ByteString,
tx: Sender<Result<Option<VersionedValue>, ReadError>>,
},
GetVersion {
key: ByteString,
tx: Sender<Result<Option<Version>, ReadError>>,
},
Put {
key: ByteString,
value: VersionedValue,
precondition: Precondition,
tx: Sender<Result<(), WriteError>>,
},
Delete {
key: ByteString,
precondition: Precondition,
tx: Sender<Result<(), WriteError>>,
},
Shutdown,
}

pub(crate) struct Server {
receiver: tokio::sync::mpsc::UnboundedReceiver<Commands>,
delegate: ObjectStoreMetastoreImpl,
}

impl Server {
pub(crate) fn new(
store_implementation: ObjectStoreMetastoreImpl,
receiver: tokio::sync::mpsc::UnboundedReceiver<Commands>,
) -> Self {
Self {
delegate: store_implementation,
receiver,
}
}

pub(crate) async fn run(self) {
let Server {
mut receiver,
delegate,
} = self;

// pre-fetch the latest version
// TODO: should there be a limit before we give up? figure out how to exit gracefully?
loop {
match delegate.load_latest_version().await {
Ok(_) => {
break;
}
Err(e) => {
tracing::warn!(error = ?e, "error while loading latest metastore version, retrying in 1 sec...");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}

let mut refresh = tokio::time::interval(Duration::from_secs(5));

loop {
select! {
_ = refresh.tick() => {
// periodically check for a newer version
if let Err(e) = delegate.load_latest_version().await {
tracing::warn!(error = ?e, "error while loading latest metastore version, will retry in the next interval.");
}
}
Some(cmd) = receiver.recv() => {
match cmd {
Commands::Get{key,tx } => {
let res = delegate.get(key).await;
let _ = tx.send(res);
}
Commands::GetVersion{key,tx } => {
let _ = tx.send(delegate.get_version(key).await);
}
Commands::Put{key,value,precondition,tx } => {
let _ = tx.send(delegate.put(key, value, precondition).await);
}
Commands::Delete{key,precondition,tx } => {
let _ = tx.send(delegate.delete(key, precondition).await);
}
Commands::Shutdown => {
tracing::info!("received a shutdown request, exiting the polling metadata loop");
break;
}
}
}
else => {
tracing::info!("input channel is closed, exiting the metadata polling loop.");
break;
}
}
}
}
}

#[derive(Debug, Clone)]
pub struct Client {
sender: tokio::sync::mpsc::UnboundedSender<Commands>,
}

impl Client {
pub fn new(sender: tokio::sync::mpsc::UnboundedSender<Commands>) -> Self {
Self { sender }
}
}

impl Drop for Client {
fn drop(&mut self) {
let _ = self.sender.send(Commands::Shutdown);
}
}

#[async_trait::async_trait]
impl MetadataStore for Client {
async fn get(&self, key: ByteString) -> Result<Option<VersionedValue>, ReadError> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.sender
.send(Commands::Get { key, tx })
.map_err(|_| ReadError::Internal("Object store fetch channel ".into()))?;

rx.await.map_err(|_| {
ReadError::Internal("Object store fetch channel disconnected".to_string())
})?
}

async fn get_version(&self, key: ByteString) -> Result<Option<Version>, ReadError> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.sender
.send(Commands::GetVersion { key, tx })
.map_err(|_| ReadError::Internal("Object store fetch channel ".into()))?;

rx.await.map_err(|_| {
ReadError::Internal("Object store fetch channel disconnected".to_string())
})?
}

async fn put(
&self,
key: ByteString,
value: VersionedValue,
precondition: Precondition,
) -> Result<(), WriteError> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.sender
.send(Commands::Put {
key,
value,
precondition,
tx,
})
.map_err(|_| WriteError::Internal("Object store fetch channel ".into()))?;

rx.await.map_err(|_| {
WriteError::Internal("Object store fetch channel disconnected".to_string())
})?
}

async fn delete(&self, key: ByteString, precondition: Precondition) -> Result<(), WriteError> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.sender
.send(Commands::Delete {
key,
precondition,
tx,
})
.map_err(|_| WriteError::Internal("Object store fetch channel ".into()))?;

rx.await.map_err(|_| {
WriteError::Internal("Object store fetch channel disconnected".to_string())
})?
}
}
Loading

0 comments on commit 638607a

Please sign in to comment.