Skip to content

Commit

Permalink
Add an object_store backed metadata store
Browse files Browse the repository at this point in the history
  • Loading branch information
igalshilman committed Nov 22, 2024
1 parent 7c7b16b commit 7397d7b
Show file tree
Hide file tree
Showing 12 changed files with 846 additions and 3 deletions.
28 changes: 25 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ datafusion = { version = "42.0.0", default-features = false, features = [
"regex_expressions",
"unicode_expressions",
] }
object_store = { version = "0.11.1"}
datafusion-expr = { version = "42.0.0" }
derive_builder = "0.20.0"
derive_more = { version = "1", features = ["full"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ hyper-util = { workspace = true }
metrics = { workspace = true }
opentelemetry = { workspace = true }
once_cell = { workspace = true }
object_store = { workspace = true, features = ["aws"] }
parking_lot = { workspace = true }
pin-project = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
rand = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true}
serde_with = { workspace = true }
static_assertions = { workspace = true }
strum = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/metadata_store/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@
// by the Apache License, Version 2.0.

mod etcd;
mod objstore;

pub use etcd::EtcdMetadataStore;
pub use objstore::create_object_store_based_meta_store;
181 changes: 181 additions & 0 deletions crates/core/src/metadata_store/providers/objstore/glue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::pin;

use bytestring::ByteString;
use restate_types::Version;
use tokio::select;
use tokio::sync::oneshot::Sender;
use tracing::warn;

use crate::cancellation_watcher;
use crate::metadata_store::providers::objstore::optimistic_store::OptimisticLockingMetadataStoreBuilder;
use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError};

#[derive(Debug)]
pub(crate) enum Commands {
Get {
key: ByteString,
tx: Sender<Result<Option<VersionedValue>, ReadError>>,
},
GetVersion {
key: ByteString,
tx: Sender<Result<Option<Version>, ReadError>>,
},
Put {
key: ByteString,
value: VersionedValue,
precondition: Precondition,
tx: Sender<Result<(), WriteError>>,
},
Delete {
key: ByteString,
precondition: Precondition,
tx: Sender<Result<(), WriteError>>,
},
}

pub(crate) struct Server {
receiver: tokio::sync::mpsc::UnboundedReceiver<Commands>,
builder: OptimisticLockingMetadataStoreBuilder,
}

impl Server {
pub(crate) fn new(
builder: OptimisticLockingMetadataStoreBuilder,
receiver: tokio::sync::mpsc::UnboundedReceiver<Commands>,
) -> Self {
Self { builder, receiver }
}

pub(crate) async fn run(self) -> anyhow::Result<()> {
let Server {
mut receiver,
builder,
} = self;

let mut shutdown = pin::pin!(cancellation_watcher());

let mut delegate = match builder.build().await {
Ok(delegate) => delegate,
Err(err) => {
warn!(error = ?err, "error while loading latest metastore version.");
return Err(err);
}
};

loop {
select! {
_ = &mut shutdown => {
// stop accepting messages
return Ok(());
}
Some(cmd) = receiver.recv() => {
match cmd {
Commands::Get{key,tx } => {
let res = delegate.get(key).await;
let _ = tx.send(res);
}
Commands::GetVersion{key,tx } => {
let _ = tx.send(delegate.get_version(key).await);
}
Commands::Put{key,value,precondition,tx } => {
let _ = tx.send(delegate.put(key, value, precondition).await);
}
Commands::Delete{key,precondition,tx } => {
let _ = tx.send(delegate.delete(key, precondition).await);
}
}
}
else => {
tracing::info!("input channel is closed, exiting the metadata polling loop.");
break;
}
}
}
Ok(())
}
}

#[derive(Debug, Clone)]
pub struct Client {
sender: tokio::sync::mpsc::UnboundedSender<Commands>,
}

impl Client {
pub fn new(sender: tokio::sync::mpsc::UnboundedSender<Commands>) -> Self {
Self { sender }
}
}

#[async_trait::async_trait]
impl MetadataStore for Client {
async fn get(&self, key: ByteString) -> Result<Option<VersionedValue>, ReadError> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.sender
.send(Commands::Get { key, tx })
.map_err(|_| ReadError::Internal("Object store fetch channel ".into()))?;

rx.await.map_err(|_| {
ReadError::Internal("Object store fetch channel disconnected".to_string())
})?
}

async fn get_version(&self, key: ByteString) -> Result<Option<Version>, ReadError> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.sender
.send(Commands::GetVersion { key, tx })
.map_err(|_| ReadError::Internal("Object store fetch channel ".into()))?;

rx.await.map_err(|_| {
ReadError::Internal("Object store fetch channel disconnected".to_string())
})?
}

async fn put(
&self,
key: ByteString,
value: VersionedValue,
precondition: Precondition,
) -> Result<(), WriteError> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.sender
.send(Commands::Put {
key,
value,
precondition,
tx,
})
.map_err(|_| WriteError::Internal("Object store channel ".into()))?;

rx.await
.map_err(|_| WriteError::Internal("Object store channel disconnected".to_string()))?
}

async fn delete(&self, key: ByteString, precondition: Precondition) -> Result<(), WriteError> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.sender
.send(Commands::Delete {
key,
precondition,
tx,
})
.map_err(|_| WriteError::Internal("Object store fetch channel ".into()))?;

rx.await.map_err(|_| {
WriteError::Internal("Object store fetch channel disconnected".to_string())
})?
}
}
Loading

0 comments on commit 7397d7b

Please sign in to comment.