Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] object store metastore #2309

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ bitflags = { version = "2.6.0" }
bytes = { version = "1.7", features = ["serde"] }
bytes-utils = "0.1.3"
bytestring = { version = "1.2", features = ["serde"] }
ciborium = { version = "0.2.2" }
chrono = { version = "0.4.38", default-features = false, features = ["clock"] }
comfy-table = { version = "7.1" }
chrono-humanize = { version = "0.2.3" }
Expand All @@ -98,6 +99,7 @@ datafusion = { version = "42.0.0", default-features = false, features = [
"regex_expressions",
"unicode_expressions",
] }
object_store = { version = "0.11.1"}
datafusion-expr = { version = "42.0.0" }
derive_builder = "0.20.0"
derive_more = { version = "1", features = ["full"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ assert2 = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
ciborium = { workspace = true}
dashmap = { workspace = true }
derive_builder = { workspace = true }
derive_more = { workspace = true }
Expand All @@ -39,6 +40,7 @@ hyper-util = { workspace = true }
metrics = { workspace = true }
opentelemetry = { workspace = true }
once_cell = { workspace = true }
object_store = { workspace = true, features = ["aws"] }
parking_lot = { workspace = true }
pin-project = { workspace = true }
prost = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/metadata_store/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@
// by the Apache License, Version 2.0.

mod etcd;
mod objstore;

pub use etcd::EtcdMetadataStore;
pub use objstore::create_object_store_based_meta_store;
181 changes: 181 additions & 0 deletions crates/core/src/metadata_store/providers/objstore/glue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::pin;

use bytestring::ByteString;
use restate_types::Version;
use tokio::select;
use tokio::sync::oneshot::Sender;
use tracing::warn;

use crate::cancellation_watcher;
use crate::metadata_store::providers::objstore::optimistic_store::OptimisticLockingMetadataStoreBuilder;
use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError};

#[derive(Debug)]
pub(crate) enum Commands {
Get {
key: ByteString,
tx: Sender<Result<Option<VersionedValue>, ReadError>>,
},
GetVersion {
key: ByteString,
tx: Sender<Result<Option<Version>, ReadError>>,
},
Put {
key: ByteString,
value: VersionedValue,
precondition: Precondition,
tx: Sender<Result<(), WriteError>>,
},
Delete {
key: ByteString,
precondition: Precondition,
tx: Sender<Result<(), WriteError>>,
},
}

pub(crate) struct Server {
receiver: tokio::sync::mpsc::UnboundedReceiver<Commands>,
builder: OptimisticLockingMetadataStoreBuilder,
}

impl Server {
pub(crate) fn new(
builder: OptimisticLockingMetadataStoreBuilder,
receiver: tokio::sync::mpsc::UnboundedReceiver<Commands>,
) -> Self {
Self { builder, receiver }
}

pub(crate) async fn run(self) -> anyhow::Result<()> {
let Server {
mut receiver,
builder,
} = self;

let mut shutdown = pin::pin!(cancellation_watcher());

let mut delegate = match builder.build().await {
Ok(delegate) => delegate,
Err(err) => {
warn!(error = ?err, "error while loading latest metastore version.");
return Err(err);
}
};

loop {
select! {
_ = &mut shutdown => {
// stop accepting messages
return Ok(());
}
Some(cmd) = receiver.recv() => {
match cmd {
Commands::Get{key,tx } => {
let res = delegate.get(key).await;
let _ = tx.send(res);
}
Commands::GetVersion{key,tx } => {
let _ = tx.send(delegate.get_version(key).await);
}
Commands::Put{key,value,precondition,tx } => {
let _ = tx.send(delegate.put(key, value, precondition).await);
}
Commands::Delete{key,precondition,tx } => {
let _ = tx.send(delegate.delete(key, precondition).await);
}
}
}
else => {
tracing::info!("input channel is closed, exiting the metadata polling loop.");
break;
}
}
}
Ok(())
}
}

#[derive(Debug, Clone)]
pub struct Client {
sender: tokio::sync::mpsc::UnboundedSender<Commands>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a bounded sender also work?

}

impl Client {
pub fn new(sender: tokio::sync::mpsc::UnboundedSender<Commands>) -> Self {
Self { sender }
}
}

#[async_trait::async_trait]
impl MetadataStore for Client {
async fn get(&self, key: ByteString) -> Result<Option<VersionedValue>, ReadError> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.sender
.send(Commands::Get { key, tx })
.map_err(|_| ReadError::Internal("Object store fetch channel ".into()))?;

rx.await.map_err(|_| {
ReadError::Internal("Object store fetch channel disconnected".to_string())
})?
}

async fn get_version(&self, key: ByteString) -> Result<Option<Version>, ReadError> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.sender
.send(Commands::GetVersion { key, tx })
.map_err(|_| ReadError::Internal("Object store fetch channel ".into()))?;

rx.await.map_err(|_| {
ReadError::Internal("Object store fetch channel disconnected".to_string())
})?
}

async fn put(
&self,
key: ByteString,
value: VersionedValue,
precondition: Precondition,
) -> Result<(), WriteError> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.sender
.send(Commands::Put {
key,
value,
precondition,
tx,
})
.map_err(|_| WriteError::Internal("Object store channel ".into()))?;

rx.await
.map_err(|_| WriteError::Internal("Object store channel disconnected".to_string()))?
}

async fn delete(&self, key: ByteString, precondition: Precondition) -> Result<(), WriteError> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.sender
.send(Commands::Delete {
key,
precondition,
tx,
})
.map_err(|_| WriteError::Internal("Object store fetch channel ".into()))?;

rx.await.map_err(|_| {
WriteError::Internal("Object store fetch channel disconnected".to_string())
})?
}
}
Loading
Loading