diff --git a/Cargo.lock b/Cargo.lock index c2c38a6de..004caf32a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2566,6 +2566,16 @@ dependencies = [ "dirs-sys", ] +[[package]] +name = "dirs-next" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" +dependencies = [ + "cfg-if", + "dirs-sys-next", +] + [[package]] name = "dirs-sys" version = "0.4.1" @@ -2578,6 +2588,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "dirs-sys-next" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "doc-comment" version = "0.3.3" @@ -2957,6 +2978,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -2980,6 +3010,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "getset" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e45727250e75cc04ff2846a66397da8ef2b3db8e40e0cef4df67950a07621eb9" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "gimli" version = "0.28.1" @@ -5098,6 +5140,36 @@ dependencies = [ "prost 0.13.1", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +dependencies = [ + "bytes", +] + +[[package]] +name = "protobuf-build" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2df9942df2981178a930a72d442de47e2f0df18ad68e50a30f816f1848215ad0" +dependencies = [ + "bitflags 1.3.2", + "protobuf", + "protobuf-codegen", + "regex", +] + +[[package]] +name = "protobuf-codegen" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "033460afb75cf755fcfc16dfaed20b86468082a2ea24e05ac35ab4a099a017d6" +dependencies = [ + "protobuf", +] + [[package]] name = "quanta" version = "0.12.3" @@ -5188,6 +5260,36 @@ dependencies = [ "nibble_vec", ] +[[package]] +name = "raft" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f12688b23a649902762d4c11d854d73c49c9b93138f2de16403ef9f571ad5bae" +dependencies = [ + "bytes", + "fxhash", + "getset", + "protobuf", + "raft-proto", + "rand", + "slog", + "slog-envlogger", + "slog-stdlog", + "slog-term", + "thiserror", +] + +[[package]] +name = "raft-proto" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb6884896294f553e8d5cfbdb55080b9f5f2f43394afff59c9f077e0f4b46d6b" +dependencies = [ + "bytes", + "protobuf", + "protobuf-build", +] + [[package]] name = "rand" version = "0.8.5" @@ -5939,12 +6041,15 @@ dependencies = [ "hyper-util", "prost 0.13.1", "prost-types 0.13.1", + "protobuf", + "raft", "restate-core", "restate-rocksdb", "restate-types", "rocksdb", "schemars", "serde", + "slog", "static_assertions", "tempfile", "test-log", @@ -5959,7 +6064,9 @@ dependencies = [ "tower", "tower-http 0.5.2", "tracing", + "tracing-slog", "tracing-subscriber", + "ulid", ] [[package]] @@ -7091,6 +7198,74 @@ dependencies = [ "autocfg", ] +[[package]] +name = "slog" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8347046d4ebd943127157b94d63abb990fcf729dc4e9978927fdf4ac3c998d06" + +[[package]] +name = "slog-async" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72c8038f898a2c79507940990f05386455b3a317d8f18d4caea7cbc3d5096b84" +dependencies = [ + "crossbeam-channel", + "slog", + "take_mut", + "thread_local", +] + +[[package]] +name = "slog-envlogger" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "906a1a0bc43fed692df4b82a5e2fbfc3733db8dad8bb514ab27a4f23ad04f5c0" +dependencies = [ + "log", + "regex", + "slog", + "slog-async", + "slog-scope", + "slog-stdlog", + "slog-term", +] + +[[package]] +name = "slog-scope" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f95a4b4c3274cd2869549da82b57ccc930859bdbf5bcea0424bc5f140b3c786" +dependencies = [ + "arc-swap", + "lazy_static", + "slog", +] + +[[package]] +name = "slog-stdlog" +version = "4.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6706b2ace5bbae7291d3f8d2473e2bfab073ccd7d03670946197aec98471fa3e" +dependencies = [ + "log", + "slog", + "slog-scope", +] + +[[package]] +name = "slog-term" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6e022d0b998abfe5c3782c1f03551a596269450ccd677ea51c56f8b214610e8" +dependencies = [ + "is-terminal", + "slog", + "term", + "thread_local", + "time", +] + [[package]] name = "smallvec" version = "1.13.2" @@ -7331,6 +7506,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "take_mut" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" + [[package]] name = "tempfile" version = "3.10.1" @@ -7343,6 +7524,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "term" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f" +dependencies = [ + "dirs-next", + "rustversion", + "winapi", +] + [[package]] name = "termcolor" version = "1.4.1" @@ -7910,6 +8102,17 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-slog" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9306d2ca06aa9dfc8aa729ff884e9dca181f588a298cc5c59d4fdd91372570bf" +dependencies = [ + "once_cell", + "slog", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" @@ -8029,6 +8232,7 @@ checksum = "34778c17965aa2a08913b57e1f34db9b4a63f5de31768b55bf20d2795f921259" dependencies = [ "getrandom", "rand", + "serde", "web-time", ] diff --git a/crates/metadata-store/Cargo.toml b/crates/metadata-store/Cargo.toml index b04d107c8..b1bf2753b 100644 --- a/crates/metadata-store/Cargo.toml +++ b/crates/metadata-store/Cargo.toml @@ -18,6 +18,7 @@ restate-rocksdb = { workspace = true } restate-types = { workspace = true } anyhow = { workspace = true } +assert2 = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } bytestring = { workspace = true } @@ -29,9 +30,12 @@ hyper = { workspace = true } hyper-util = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } +protobuf = "2.28.0" +raft = { version = "0.7.0" } rocksdb = { workspace = true } schemars = { workspace = true, optional = true } serde = { workspace = true } +slog = { version = "2.7.0" } static_assertions = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } @@ -43,6 +47,8 @@ tonic-health = { workspace = true } tower = { workspace = true } tower-http = { workspace = true, features = ["trace"] } tracing = { workspace = true } +tracing-slog = { version = "0.3.0" } +ulid = { workspace = true, features = ["serde"] } [dev-dependencies] restate-core = { workspace = true, features = ["test-util"] } diff --git a/crates/metadata-store/src/local/grpc/client.rs b/crates/metadata-store/src/grpc/client.rs similarity index 93% rename from crates/metadata-store/src/local/grpc/client.rs rename to crates/metadata-store/src/grpc/client.rs index 601be26e9..1272cf673 100644 --- a/crates/metadata-store/src/local/grpc/client.rs +++ b/crates/metadata-store/src/grpc/client.rs @@ -20,16 +20,16 @@ use restate_core::network::net_util::create_tonic_channel_from_advertised_addres use restate_types::net::AdvertisedAddress; use restate_types::Version; +use crate::grpc::pb_conversions::ConversionError; use crate::grpc_svc::metadata_store_svc_client::MetadataStoreSvcClient; use crate::grpc_svc::{DeleteRequest, GetRequest, PutRequest}; -use crate::local::grpc::pb_conversions::ConversionError; -/// Client end to interact with the [`LocalMetadataStore`]. +/// Client end to interact with the metadata store. #[derive(Debug, Clone)] -pub struct LocalMetadataStoreClient { +pub struct GrpcMetadataStoreClient { svc_client: MetadataStoreSvcClient, } -impl LocalMetadataStoreClient { +impl GrpcMetadataStoreClient { pub fn new(metadata_store_address: AdvertisedAddress) -> Self { let channel = create_tonic_channel_from_advertised_address(metadata_store_address) .expect("should not fail"); @@ -41,7 +41,7 @@ impl LocalMetadataStoreClient { } #[async_trait] -impl MetadataStore for LocalMetadataStoreClient { +impl MetadataStore for GrpcMetadataStoreClient { async fn get(&self, key: ByteString) -> Result, ReadError> { let response = self .svc_client diff --git a/crates/metadata-store/src/local/grpc/handler.rs b/crates/metadata-store/src/grpc/handler.rs similarity index 89% rename from crates/metadata-store/src/local/grpc/handler.rs rename to crates/metadata-store/src/grpc/handler.rs index 79d948ddd..54a40224f 100644 --- a/crates/metadata-store/src/local/grpc/handler.rs +++ b/crates/metadata-store/src/grpc/handler.rs @@ -8,28 +8,28 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::grpc::pb_conversions::ConversionError; use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvc; use crate::grpc_svc::{DeleteRequest, GetRequest, GetResponse, GetVersionResponse, PutRequest}; -use crate::local::grpc::pb_conversions::ConversionError; -use crate::local::store::{Error, MetadataStoreRequest, RequestSender}; +use crate::{MetadataStoreRequest, RequestError, RequestSender}; use async_trait::async_trait; use tokio::sync::oneshot; use tonic::{Request, Response, Status}; -/// Grpc svc handler for the [`LocalMetadataStore`]. +/// Grpc svc handler for the metadata store. #[derive(Debug)] -pub struct LocalMetadataStoreHandler { +pub struct MetadataStoreHandler { request_tx: RequestSender, } -impl LocalMetadataStoreHandler { +impl MetadataStoreHandler { pub fn new(request_tx: RequestSender) -> Self { Self { request_tx } } } #[async_trait] -impl MetadataStoreSvc for LocalMetadataStoreHandler { +impl MetadataStoreSvc for MetadataStoreHandler { async fn get(&self, request: Request) -> Result, Status> { let (result_tx, result_rx) = oneshot::channel(); @@ -129,10 +129,11 @@ impl MetadataStoreSvc for LocalMetadataStoreHandler { } } -impl From for Status { - fn from(err: Error) -> Self { +impl From for Status { + fn from(err: RequestError) -> Self { match err { - Error::FailedPrecondition(msg) => Status::failed_precondition(msg), + RequestError::FailedPrecondition(err) => Status::failed_precondition(err.to_string()), + RequestError::Unavailable(err) => Status::unavailable(err.to_string()), err => Status::internal(err.to_string()), } } diff --git a/crates/metadata-store/src/local/grpc/mod.rs b/crates/metadata-store/src/grpc/mod.rs similarity index 99% rename from crates/metadata-store/src/local/grpc/mod.rs rename to crates/metadata-store/src/grpc/mod.rs index 6d34ffad7..fdf08f7b1 100644 --- a/crates/metadata-store/src/local/grpc/mod.rs +++ b/crates/metadata-store/src/grpc/mod.rs @@ -10,6 +10,8 @@ pub mod client; pub mod handler; +pub mod server; +pub mod service_builder; pub mod pb_conversions { use crate::grpc_svc; diff --git a/crates/metadata-store/src/grpc/server.rs b/crates/metadata-store/src/grpc/server.rs new file mode 100644 index 000000000..1e669751f --- /dev/null +++ b/crates/metadata-store/src/grpc/server.rs @@ -0,0 +1,62 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use http::Request; +use hyper::body::Incoming; +use hyper_util::service::TowerToHyperService; +use restate_core::network::net_util; +use restate_core::ShutdownError; +use restate_types::net::BindAddress; +use tonic::body::boxed; +use tonic::service::Routes; +use tower::ServiceExt; + +pub struct GrpcServer { + bind_address: BindAddress, + routes: Routes, +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed running grpc server: {0}")] + GrpcServer(#[from] net_util::Error), + #[error("system is shutting down")] + Shutdown(#[from] ShutdownError), +} + +impl GrpcServer { + pub fn new(bind_address: BindAddress, routes: Routes) -> Self { + Self { + bind_address, + routes, + } + } + + pub async fn run(self) -> Result<(), Error> { + // Trace layer + let span_factory = tower_http::trace::DefaultMakeSpan::new() + .include_headers(true) + .level(tracing::Level::ERROR); + + let server_builder = tonic::transport::Server::builder() + .layer(tower_http::trace::TraceLayer::new_for_grpc().make_span_with(span_factory)) + .add_routes(self.routes); + + let service = TowerToHyperService::new( + server_builder + .into_service() + .map_request(|req: Request| req.map(boxed)), + ); + + net_util::run_hyper_server(&self.bind_address, service, "metadata-store-grpc").await?; + + Ok(()) + } +} diff --git a/crates/metadata-store/src/grpc/service_builder.rs b/crates/metadata-store/src/grpc/service_builder.rs new file mode 100644 index 000000000..80d55131a --- /dev/null +++ b/crates/metadata-store/src/grpc/service_builder.rs @@ -0,0 +1,81 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use http::{Request, Response}; +use std::convert::Infallible; +use tonic::body::BoxBody; +use tonic::server::NamedService; +use tonic::service::{Routes, RoutesBuilder}; +use tonic_health::ServingStatus; +use tower::Service; + +#[derive(Debug)] +pub struct GrpcServiceBuilder<'a> { + reflection_service_builder: Option>, + routes_builder: RoutesBuilder, + svc_names: Vec<&'static str>, +} + +impl<'a> Default for GrpcServiceBuilder<'a> { + fn default() -> Self { + let routes_builder = RoutesBuilder::default(); + + Self { + reflection_service_builder: Some(tonic_reflection::server::Builder::configure()), + routes_builder, + svc_names: Vec::default(), + } + } +} + +impl<'a> GrpcServiceBuilder<'a> { + pub fn add_service(&mut self, svc: S) + where + S: Service, Response = Response, Error = Infallible> + + NamedService + + Clone + + Send + + 'static, + S::Future: Send + 'static, + { + self.svc_names.push(S::NAME); + self.routes_builder.add_service(svc); + } + + pub fn register_file_descriptor_set_for_reflection<'b: 'a>( + &mut self, + encoded_file_descriptor_set: &'b [u8], + ) { + self.reflection_service_builder = Some( + self.reflection_service_builder + .take() + .expect("be present") + .register_encoded_file_descriptor_set(encoded_file_descriptor_set), + ); + } + + pub async fn build(mut self) -> Result { + let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); + + for svc_name in self.svc_names { + health_reporter + .set_service_status(svc_name, ServingStatus::Serving) + .await; + } + + self.routes_builder.add_service(health_service); + self.routes_builder.add_service( + self.reflection_service_builder + .expect("be present") + .build()?, + ); + Ok(self.routes_builder.routes()) + } +} diff --git a/crates/metadata-store/src/lib.rs b/crates/metadata-store/src/lib.rs index 198840cc9..da43f361e 100644 --- a/crates/metadata-store/src/lib.rs +++ b/crates/metadata-store/src/lib.rs @@ -8,9 +8,131 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod grpc; mod grpc_svc; pub mod local; +pub mod raft; +mod util; +use bytestring::ByteString; +use restate_core::metadata_store::VersionedValue; pub use restate_core::metadata_store::{ MetadataStoreClient, Precondition, ReadError, ReadModifyWriteError, WriteError, }; +use restate_core::ShutdownError; +use restate_types::errors::GenericError; +use restate_types::storage::{StorageDecodeError, StorageEncodeError}; +use restate_types::Version; +use tokio::sync::{mpsc, oneshot}; + +pub type BoxedMetadataStoreService = Box; + +pub type RequestSender = mpsc::Sender; +pub type RequestReceiver = mpsc::Receiver; + +#[derive(Debug, thiserror::Error)] +pub enum RequestError { + #[error("internal error: {0}")] + Internal(GenericError), + #[error("service currently unavailable: {0}")] + Unavailable(GenericError), + #[error("failed precondition: {0}")] + FailedPrecondition(#[from] PreconditionViolation), + #[error("invalid argument: {0}")] + InvalidArgument(String), + #[error("encode error: {0}")] + Encode(#[from] StorageEncodeError), + #[error("decode error: {0}")] + Decode(#[from] StorageDecodeError), +} + +#[derive(Debug, thiserror::Error)] +pub enum PreconditionViolation { + #[error("key-value pair already exists")] + Exists, + #[error("expected version '{expected}' but found version '{actual:?}'")] + VersionMismatch { + expected: Version, + actual: Option, + }, +} + +impl PreconditionViolation { + fn kv_pair_exists() -> Self { + PreconditionViolation::Exists + } + + fn version_mismatch(expected: Version, actual: Option) -> Self { + PreconditionViolation::VersionMismatch { expected, actual } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("error while running server grpc reflection service: {0}")] + GrpcReflection(#[from] tonic_reflection::server::Error), + #[error("system is shutting down")] + Shutdown(#[from] ShutdownError), + #[error(transparent)] + Generic(#[from] GenericError), +} + +impl Error { + pub fn generic(err: impl Into) -> Error { + Error::Generic(err.into()) + } +} + +#[async_trait::async_trait] +pub trait MetadataStoreServiceBoxed { + async fn run_boxed(self: Box) -> Result<(), Error>; +} + +#[async_trait::async_trait] +impl MetadataStoreServiceBoxed for T { + async fn run_boxed(self: Box) -> Result<(), Error> { + (*self).run().await + } +} + +#[async_trait::async_trait] +pub trait MetadataStoreService: MetadataStoreServiceBoxed + Send { + async fn run(self) -> Result<(), Error>; + + fn boxed(self) -> BoxedMetadataStoreService + where + Self: Sized + 'static, + { + Box::new(self) + } +} + +#[async_trait::async_trait] +impl MetadataStoreService for Box { + async fn run(self) -> Result<(), Error> { + self.run_boxed().await + } +} + +#[derive(Debug)] +pub enum MetadataStoreRequest { + Get { + key: ByteString, + result_tx: oneshot::Sender, RequestError>>, + }, + GetVersion { + key: ByteString, + result_tx: oneshot::Sender, RequestError>>, + }, + Put { + key: ByteString, + value: VersionedValue, + precondition: Precondition, + result_tx: oneshot::Sender>, + }, + Delete { + key: ByteString, + precondition: Precondition, + result_tx: oneshot::Sender>, + }, +} diff --git a/crates/metadata-store/src/local/mod.rs b/crates/metadata-store/src/local/mod.rs index 67f5df7a5..f8a68e797 100644 --- a/crates/metadata-store/src/local/mod.rs +++ b/crates/metadata-store/src/local/mod.rs @@ -8,7 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -mod grpc; mod store; mod service; @@ -20,9 +19,9 @@ use restate_types::{ }; pub use service::LocalMetadataStoreService; -use crate::local::grpc::client::LocalMetadataStoreClient; +use crate::grpc::client::GrpcMetadataStoreClient; -/// Creates a [`MetadataStoreClient`] for the [`LocalMetadataStoreService`]. +/// Creates a [`MetadataStoreClient`] for the [`GrpcMetadataStoreClient`]. pub async fn create_client( metadata_store_client_options: MetadataStoreClientOptions, ) -> Result { @@ -30,7 +29,7 @@ pub async fn create_client( let client = match metadata_store_client_options.metadata_store_client { MetadataStoreClientConfig::Embedded { address } => { - let store = LocalMetadataStoreClient::new(address); + let store = GrpcMetadataStoreClient::new(address); MetadataStoreClient::new(store, backoff_policy) } MetadataStoreClientConfig::Etcd { addresses } => { diff --git a/crates/metadata-store/src/local/service.rs b/crates/metadata-store/src/local/service.rs index f0f4d9444..394cb8cc8 100644 --- a/crates/metadata-store/src/local/service.rs +++ b/crates/metadata-store/src/local/service.rs @@ -8,39 +8,24 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::grpc_svc; +use crate::grpc::handler::MetadataStoreHandler; +use crate::grpc::server::GrpcServer; +use crate::grpc::service_builder::GrpcServiceBuilder; use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer; -use crate::local::grpc::handler::LocalMetadataStoreHandler; use crate::local::store::LocalMetadataStore; -use http::Request; -use hyper::body::Incoming; -use hyper_util::service::TowerToHyperService; -use restate_core::network::net_util; -use restate_core::{task_center, ShutdownError, TaskKind}; -use restate_rocksdb::RocksError; +use crate::{grpc_svc, Error, MetadataStoreService}; +use futures::TryFutureExt; +use restate_core::{task_center, TaskKind}; use restate_types::config::{MetadataStoreOptions, RocksDbOptions}; use restate_types::live::BoxedLiveLoad; -use tonic::body::boxed; +#[cfg(test)] use tonic::server::NamedService; -use tower::ServiceExt; pub struct LocalMetadataStoreService { opts: BoxedLiveLoad, rocksdb_options: BoxedLiveLoad, } -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("failed running grpc server: {0}")] - GrpcServer(#[from] net_util::Error), - #[error("error while running server server grpc reflection service: {0}")] - GrpcReflection(#[from] tonic_reflection::server::Error), - #[error("system is shutting down")] - Shutdown(#[from] ShutdownError), - #[error("rocksdb error: {0}")] - RocksDB(#[from] RocksError), -} - impl LocalMetadataStoreService { pub fn from_options( opts: BoxedLiveLoad, @@ -52,53 +37,38 @@ impl LocalMetadataStoreService { } } - pub fn grpc_service_name(&self) -> &str { - MetadataStoreSvcServer::::NAME + #[cfg(test)] + pub fn grpc_service_name() -> &'static str { + MetadataStoreSvcServer::::NAME } +} - pub async fn run(self) -> Result<(), Error> { +#[async_trait::async_trait] +impl MetadataStoreService for LocalMetadataStoreService { + async fn run(self) -> Result<(), Error> { let LocalMetadataStoreService { mut opts, rocksdb_options, } = self; let options = opts.live_load(); let bind_address = options.bind_address.clone(); - let store = LocalMetadataStore::create(options, rocksdb_options).await?; - // Trace layer - let span_factory = tower_http::trace::DefaultMakeSpan::new() - .include_headers(true) - .level(tracing::Level::ERROR); - - let reflection_service_builder = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(grpc_svc::FILE_DESCRIPTOR_SET); - - let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); - health_reporter - .set_serving::>() - .await; + let store = LocalMetadataStore::create(options, rocksdb_options) + .await + .map_err(|err| Error::Generic(err.into()))?; + let mut builder = GrpcServiceBuilder::default(); - let server_builder = tonic::transport::Server::builder() - .layer(tower_http::trace::TraceLayer::new_for_grpc().make_span_with(span_factory)) - .add_service(health_service) - .add_service(MetadataStoreSvcServer::new(LocalMetadataStoreHandler::new( - store.request_sender(), - ))) - .add_service(reflection_service_builder.build()?); + builder.register_file_descriptor_set_for_reflection(grpc_svc::FILE_DESCRIPTOR_SET); + builder.add_service(MetadataStoreSvcServer::new(MetadataStoreHandler::new( + store.request_sender(), + ))); - let service = TowerToHyperService::new( - server_builder - .into_service() - .map_request(|req: Request| req.map(boxed)), - ); + let grpc_server = GrpcServer::new(bind_address, builder.build().await?); task_center().spawn_child( TaskKind::RpcServer, "metadata-store-grpc", None, - async move { - net_util::run_hyper_server(&bind_address, service, "metadata-store-grpc").await?; - Ok(()) - }, + grpc_server.run().map_err(Into::into), )?; store.run().await; diff --git a/crates/metadata-store/src/local/store.rs b/crates/metadata-store/src/local/store.rs index 632f456b1..fc874bc6b 100644 --- a/crates/metadata-store/src/local/store.rs +++ b/crates/metadata-store/src/local/store.rs @@ -8,6 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::{ + util, MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender, +}; use bytes::BytesMut; use bytestring::ByteString; use restate_core::cancellation_watcher; @@ -18,75 +21,16 @@ use restate_rocksdb::{ }; use restate_types::config::{MetadataStoreOptions, RocksDbOptions}; use restate_types::live::BoxedLiveLoad; -use restate_types::storage::{ - StorageCodec, StorageDecode, StorageDecodeError, StorageEncode, StorageEncodeError, -}; +use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode}; use restate_types::Version; -use rocksdb::{BoundColumnFamily, DBCompressionType, WriteBatch, WriteOptions, DB}; +use rocksdb::{BoundColumnFamily, WriteBatch, WriteOptions, DB}; use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use tracing::{debug, trace}; -pub type RequestSender = mpsc::Sender; -pub type RequestReceiver = mpsc::Receiver; - -type Result = std::result::Result; - const DB_NAME: &str = "local-metadata-store"; const KV_PAIRS: &str = "kv_pairs"; -#[derive(Debug)] -pub enum MetadataStoreRequest { - Get { - key: ByteString, - result_tx: oneshot::Sender>>, - }, - GetVersion { - key: ByteString, - result_tx: oneshot::Sender>>, - }, - Put { - key: ByteString, - value: VersionedValue, - precondition: Precondition, - result_tx: oneshot::Sender>, - }, - Delete { - key: ByteString, - precondition: Precondition, - result_tx: oneshot::Sender>, - }, -} - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("storage error: {0}")] - Storage(#[from] rocksdb::Error), - #[error("rocksdb error: {0}")] - RocksDb(#[from] RocksError), - #[error("failed precondition: {0}")] - FailedPrecondition(String), - #[error("invalid argument: {0}")] - InvalidArgument(String), - #[error("encode error: {0}")] - Encode(#[from] StorageEncodeError), - #[error("decode error: {0}")] - Decode(#[from] StorageDecodeError), -} - -impl Error { - fn kv_pair_exists() -> Self { - Error::FailedPrecondition("key-value pair already exists".to_owned()) - } - - fn version_mismatch(expected: Version, actual: Option) -> Self { - Error::FailedPrecondition(format!( - "Expected version '{}' but found version '{:?}'", - expected, actual - )) - } -} - /// Single node metadata store which stores the key value pairs in RocksDB. /// /// In order to avoid issues arising from concurrency, we run the metadata @@ -112,13 +56,17 @@ impl LocalMetadataStore { let db_name = DbName::new(DB_NAME); let db_manager = RocksDbManager::get(); let cfs = vec![CfName::new(KV_PAIRS)]; - let db_spec = DbSpecBuilder::new(db_name.clone(), options.data_dir(), db_options(options)) - .add_cf_pattern( - CfPrefixPattern::ANY, - cf_options(options.rocksdb_memory_budget()), - ) - .ensure_column_families(cfs) - .build_as_db(); + let db_spec = DbSpecBuilder::new( + db_name.clone(), + options.data_dir(), + util::db_options(options), + ) + .add_cf_pattern( + CfPrefixPattern::ANY, + util::cf_options(options.rocksdb_memory_budget()), + ) + .ensure_column_families(cfs) + .build_as_db(); let db = db_manager .open_db(updateable_rocksdb_options.clone(), db_spec) @@ -215,9 +163,12 @@ impl LocalMetadataStore { }; } - fn get(&self, key: &ByteString) -> Result> { + fn get(&self, key: &ByteString) -> Result, RequestError> { let cf_handle = self.kv_cf_handle(); - let slice = self.db.get_pinned_cf(&cf_handle, key)?; + let slice = self + .db + .get_pinned_cf(&cf_handle, key) + .map_err(|err| RequestError::Internal(err.into()))?; if let Some(bytes) = slice { Ok(Some(Self::decode(bytes)?)) @@ -226,9 +177,12 @@ impl LocalMetadataStore { } } - fn get_version(&self, key: &ByteString) -> Result> { + fn get_version(&self, key: &ByteString) -> Result, RequestError> { let cf_handle = self.kv_cf_handle(); - let slice = self.db.get_pinned_cf(&cf_handle, key)?; + let slice = self + .db + .get_pinned_cf(&cf_handle, key) + .map_err(|err| RequestError::Internal(err.into()))?; if let Some(bytes) = slice { // todo only deserialize the version part @@ -244,7 +198,7 @@ impl LocalMetadataStore { key: &ByteString, value: &VersionedValue, precondition: Precondition, - ) -> Result<()> { + ) -> Result<(), RequestError> { match precondition { Precondition::None => Ok(self.write_versioned_kv_pair(key, value).await?), Precondition::DoesNotExist => { @@ -252,7 +206,7 @@ impl LocalMetadataStore { if current_version.is_none() { Ok(self.write_versioned_kv_pair(key, value).await?) } else { - Err(Error::kv_pair_exists()) + Err(PreconditionViolation::kv_pair_exists())? } } Precondition::MatchesVersion(version) => { @@ -260,7 +214,10 @@ impl LocalMetadataStore { if current_version == Some(version) { Ok(self.write_versioned_kv_pair(key, value).await?) } else { - Err(Error::version_mismatch(version, current_version)) + Err(PreconditionViolation::version_mismatch( + version, + current_version, + ))? } } } @@ -270,7 +227,7 @@ impl LocalMetadataStore { &mut self, key: &ByteString, value: &VersionedValue, - ) -> Result<()> { + ) -> Result<(), RequestError> { self.buffer.clear(); Self::encode(value, &mut self.buffer)?; @@ -278,8 +235,7 @@ impl LocalMetadataStore { let cf_handle = self.kv_cf_handle(); let mut wb = WriteBatch::default(); wb.put_cf(&cf_handle, key, self.buffer.as_ref()); - Ok(self - .rocksdb + self.rocksdb .write_batch( "local-metadata-write-batch", Priority::High, @@ -287,10 +243,11 @@ impl LocalMetadataStore { write_options, wb, ) - .await?) + .await + .map_err(|err| RequestError::Internal(err.into())) } - fn delete(&mut self, key: &ByteString, precondition: Precondition) -> Result<()> { + fn delete(&mut self, key: &ByteString, precondition: Precondition) -> Result<(), RequestError> { match precondition { Precondition::None => self.delete_kv_pair(key), // this condition does not really make sense for the delete operation @@ -301,7 +258,7 @@ impl LocalMetadataStore { // nothing to do Ok(()) } else { - Err(Error::kv_pair_exists()) + Err(PreconditionViolation::kv_pair_exists())? } } Precondition::MatchesVersion(version) => { @@ -310,72 +267,35 @@ impl LocalMetadataStore { if current_version == Some(version) { self.delete_kv_pair(key) } else { - Err(Error::version_mismatch(version, current_version)) + Err(PreconditionViolation::version_mismatch( + version, + current_version, + ))? } } } } - fn delete_kv_pair(&mut self, key: &ByteString) -> Result<()> { + fn delete_kv_pair(&mut self, key: &ByteString) -> Result<(), RequestError> { let write_options = self.write_options(); self.db .delete_cf_opt(&self.kv_cf_handle(), key, &write_options) - .map_err(Into::into) + .map_err(|err| RequestError::Internal(err.into())) } - fn encode(value: &T, buf: &mut BytesMut) -> Result<()> { + fn encode(value: &T, buf: &mut BytesMut) -> Result<(), RequestError> { StorageCodec::encode(value, buf)?; Ok(()) } - fn decode(buf: impl AsRef<[u8]>) -> Result { + fn decode(buf: impl AsRef<[u8]>) -> Result { let value = StorageCodec::decode(&mut buf.as_ref())?; Ok(value) } - fn log_error(result: &Result, request: &str) { + fn log_error(result: &Result, request: &str) { if let Err(err) = &result { debug!("failed to process request '{}': '{}'", request, err) } } } - -fn db_options(_options: &MetadataStoreOptions) -> rocksdb::Options { - rocksdb::Options::default() -} - -fn cf_options( - memory_budget: usize, -) -> impl Fn(rocksdb::Options) -> rocksdb::Options + Send + Sync + 'static { - move |mut opts| { - set_memory_related_opts(&mut opts, memory_budget); - opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); - opts.set_num_levels(3); - - opts.set_compression_per_level(&[ - DBCompressionType::None, - DBCompressionType::None, - DBCompressionType::Zstd, - ]); - - // - opts - } -} - -fn set_memory_related_opts(opts: &mut rocksdb::Options, memtables_budget: usize) { - // We set the budget to allow 1 mutable + 3 immutable. - opts.set_write_buffer_size(memtables_budget / 4); - - // merge 2 memtables when flushing to L0 - opts.set_min_write_buffer_number_to_merge(2); - opts.set_max_write_buffer_number(4); - // start flushing L0->L1 as soon as possible. each file on level0 is - // (memtable_memory_budget / 2). This will flush level 0 when it's bigger than - // memtable_memory_budget. - opts.set_level_zero_file_num_compaction_trigger(2); - // doesn't really matter much, but we don't want to create too many files - opts.set_target_file_size_base(memtables_budget as u64 / 8); - // make Level1 size equal to Level0 size, so that L0->L1 compactions are fast - opts.set_max_bytes_for_level_base(memtables_budget as u64); -} diff --git a/crates/metadata-store/src/local/tests.rs b/crates/metadata-store/src/local/tests.rs index ea23b3e6a..43e6b6866 100644 --- a/crates/metadata-store/src/local/tests.rs +++ b/crates/metadata-store/src/local/tests.rs @@ -30,9 +30,9 @@ use restate_types::net::{AdvertisedAddress, BindAddress}; use restate_types::retries::RetryPolicy; use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned}; -use crate::local::grpc::client::LocalMetadataStoreClient; +use crate::grpc::client::GrpcMetadataStoreClient; use crate::local::service::LocalMetadataStoreService; -use crate::{MetadataStoreClient, Precondition, WriteError}; +use crate::{MetadataStoreClient, MetadataStoreService, Precondition, WriteError}; #[derive(Debug, Clone, PartialOrd, PartialEq, Serialize, Deserialize)] struct Value { @@ -309,7 +309,7 @@ async fn durable_storage() -> anyhow::Result<()> { Ok(()) } -/// Creates a test environment with the [`RocksDBMetadataStore`] and a [`MetadataStoreClient`] +/// Creates a test environment with the [`RocksDBMetadataStore`] and a [`GrpcMetadataStoreClient`] /// connected to it. async fn create_test_environment( opts: &MetadataStoreOptions, @@ -354,7 +354,7 @@ async fn start_metadata_store( task_center: &TaskCenter, ) -> anyhow::Result { let service = LocalMetadataStoreService::from_options(opts, updateables_rocksdb_options); - let grpc_service_name = service.grpc_service_name().to_owned(); + let grpc_service_name = LocalMetadataStoreService::grpc_service_name().to_owned(); task_center.spawn( TaskKind::MetadataStore, @@ -387,9 +387,9 @@ async fn start_metadata_store( }) .await?; - let rocksdb_client = LocalMetadataStoreClient::new(address); + let grpc_client = GrpcMetadataStoreClient::new(address); let client = MetadataStoreClient::new( - rocksdb_client, + grpc_client, Some(metadata_store_client_options.metadata_store_client_backoff_policy), ); diff --git a/crates/metadata-store/src/raft/mod.rs b/crates/metadata-store/src/raft/mod.rs new file mode 100644 index 000000000..7c14a5b27 --- /dev/null +++ b/crates/metadata-store/src/raft/mod.rs @@ -0,0 +1,13 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub mod service; +mod storage; +mod store; diff --git a/crates/metadata-store/src/raft/service.rs b/crates/metadata-store/src/raft/service.rs new file mode 100644 index 000000000..e788c58c7 --- /dev/null +++ b/crates/metadata-store/src/raft/service.rs @@ -0,0 +1,59 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::grpc::handler::MetadataStoreHandler; +use crate::grpc::server::GrpcServer; +use crate::grpc::service_builder::GrpcServiceBuilder; +use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer; +use crate::raft::store::RaftMetadataStore; +use crate::{grpc_svc, Error, MetadataStoreService}; +use futures::TryFutureExt; +use restate_core::{task_center, TaskKind}; +use restate_types::config::MetadataStoreOptions; +use restate_types::live::BoxedLiveLoad; + +pub struct RaftMetadataStoreService { + options: BoxedLiveLoad, +} + +impl RaftMetadataStoreService { + pub fn new(options: BoxedLiveLoad) -> Self { + Self { options } + } +} + +#[async_trait::async_trait] +impl MetadataStoreService for RaftMetadataStoreService { + async fn run(mut self) -> Result<(), Error> { + let store_options = self.options.live_load(); + let store = RaftMetadataStore::create().await.map_err(Error::generic)?; + + let mut builder = GrpcServiceBuilder::default(); + + builder.register_file_descriptor_set_for_reflection(grpc_svc::FILE_DESCRIPTOR_SET); + builder.add_service(MetadataStoreSvcServer::new(MetadataStoreHandler::new( + store.request_sender(), + ))); + + let grpc_server = + GrpcServer::new(store_options.bind_address.clone(), builder.build().await?); + + task_center().spawn_child( + TaskKind::RpcServer, + "metadata-store-grpc", + None, + grpc_server.run().map_err(Into::into), + )?; + + store.run().await.map_err(Error::generic)?; + + Ok(()) + } +} diff --git a/crates/metadata-store/src/raft/storage.rs b/crates/metadata-store/src/raft/storage.rs new file mode 100644 index 000000000..0e37bbbce --- /dev/null +++ b/crates/metadata-store/src/raft/storage.rs @@ -0,0 +1,420 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::util; +use protobuf::{Message, ProtobufError}; +use raft::eraftpb::{ConfState, Entry, Snapshot}; +use raft::prelude::HardState; +use raft::{GetEntriesContext, RaftState, Storage, StorageError}; +use restate_rocksdb::{ + CfName, CfPrefixPattern, DbName, DbSpecBuilder, IoMode, Priority, RocksDb, RocksDbManager, + RocksError, +}; +use restate_types::config::{MetadataStoreOptions, RocksDbOptions}; +use restate_types::live::BoxedLiveLoad; +use rocksdb::{BoundColumnFamily, ReadOptions, WriteBatch, WriteOptions, DB}; +use std::mem::size_of; +use std::sync::Arc; +use std::{error, mem}; + +const DB_NAME: &str = "raft-metadata-store"; +const RAFT_CF: &str = "raft"; + +const FIRST_RAFT_INDEX: u64 = 1; + +const RAFT_ENTRY_DISCRIMINATOR: u8 = 0x01; +const HARD_STATE_DISCRIMINATOR: u8 = 0x02; +const CONF_STATE_DISCRIMINATOR: u8 = 0x03; + +const RAFT_ENTRY_KEY_LENGTH: usize = 9; + +#[derive(Debug, thiserror::Error)] +pub enum BuildError { + #[error("failed creating RocksDb: {0}")] + RocksDb(#[from] RocksError), +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed reading/writing from/to RocksDb: {0}")] + RocksDb(#[from] RocksError), + #[error("failed reading/writing from/to raw RocksDb: {0}")] + RocksDbRaw(#[from] rocksdb::Error), + #[error("failed encoding value: {0}")] + Encode(#[from] ProtobufError), + #[error("index '{index}' is out of bounds; last index is '{last_index}'")] + IndexOutOfBounds { index: u64, last_index: u64 }, + #[error("raft log has been compacted; first index is {0}")] + Compacted(u64), +} + +/// Map our internal error type to [`raft::Error`] to fit the [`Storage`] trait definition. +impl From for raft::Error { + fn from(value: Error) -> Self { + match value { + err @ Error::RocksDb(_) + | err @ Error::RocksDbRaw(_) + | err @ Error::IndexOutOfBounds { .. } => storage_error(err), + Error::Encode(err) => raft::Error::CodecError(err), + Error::Compacted(_) => raft::Error::Store(StorageError::Compacted), + } + } +} + +pub struct RocksDbStorage { + db: Arc, + rocksdb: Arc, + + last_index: u64, + buffer: Vec, +} + +impl RocksDbStorage { + pub async fn create( + options: &MetadataStoreOptions, + rocksdb_options: BoxedLiveLoad, + ) -> Result { + let db_name = DbName::new(DB_NAME); + let db_manager = RocksDbManager::get(); + let cfs = vec![CfName::new(RAFT_CF)]; + let db_spec = DbSpecBuilder::new( + db_name.clone(), + options.data_dir(), + util::db_options(options), + ) + .add_cf_pattern( + CfPrefixPattern::ANY, + util::cf_options(options.rocksdb_memory_budget()), + ) + .ensure_column_families(cfs) + .build_as_db(); + + let db = db_manager.open_db(rocksdb_options, db_spec).await?; + let rocksdb = db_manager + .get_db(db_name) + .expect("raft metadata store db is open"); + + let last_index = Self::find_last_index(&db); + + Ok(Self { + db, + rocksdb, + last_index, + buffer: Vec::with_capacity(1024), + }) + } +} + +impl RocksDbStorage { + fn write_options(&self) -> WriteOptions { + let mut write_opts = WriteOptions::default(); + write_opts.disable_wal(false); + // always sync to not lose data + write_opts.set_sync(true); + write_opts + } + + fn find_last_index(db: &DB) -> u64 { + let cf = db.cf_handle(RAFT_CF).expect("RAFT_CF exists"); + let start = Self::raft_entry_key(0); + // end is exclusive so switch to the next discriminator + let mut end = [0; 9]; + end[0] = RAFT_ENTRY_DISCRIMINATOR + 1; + + let mut options = ReadOptions::default(); + options.set_async_io(true); + options.set_iterate_range(start..end); + let mut iterator = db.raw_iterator_cf_opt(&cf, options); + + iterator.seek_to_last(); + + if iterator.valid() { + let key_bytes = iterator.key().expect("key should be present"); + assert_eq!( + key_bytes.len(), + RAFT_ENTRY_KEY_LENGTH, + "raft entry keys must consist of '{}' bytes", + RAFT_ENTRY_KEY_LENGTH + ); + u64::from_be_bytes( + key_bytes[1..(1 + size_of::())] + .try_into() + .expect("buffer should be long enough"), + ) + } else { + // the first valid raft index starts at 1, so 0 means there are no replicated raft entries + 0 + } + } + + pub fn get_hard_state(&self) -> Result { + let key = Self::hard_state_key(); + self.get_value(key) + .map(|hard_state| hard_state.unwrap_or_default()) + } + + pub async fn store_hard_state(&mut self, hard_state: HardState) -> Result<(), Error> { + let key = Self::hard_state_key(); + self.put_value(key, hard_state).await + } + + pub fn get_conf_state(&self) -> Result { + let key = Self::conf_state_key(); + self.get_value(key) + .map(|hard_state| hard_state.unwrap_or_default()) + } + + pub async fn store_conf_state(&mut self, conf_state: ConfState) -> Result<(), Error> { + let key = Self::conf_state_key(); + self.put_value(key, conf_state).await + } + + pub fn get_entry(&self, idx: u64) -> Result, Error> { + let key = Self::raft_entry_key(idx); + self.get_value(key) + } + + fn get_value(&self, key: impl AsRef<[u8]>) -> Result, Error> { + let cf = self.get_cf_handle(); + let bytes = self.db.get_pinned_cf(&cf, key)?; + + if let Some(bytes) = bytes { + let mut value = T::default(); + value.merge_from_bytes(bytes.as_ref())?; + Ok(Some(value)) + } else { + Ok(None) + } + } + + async fn put_value( + &mut self, + key: impl AsRef<[u8]>, + value: T, + ) -> Result<(), Error> { + self.buffer.clear(); + value.write_to_vec(&mut self.buffer)?; + + let cf = self.get_cf_handle(); + let mut write_batch = WriteBatch::default(); + write_batch.put_cf(&cf, key.as_ref(), &self.buffer); + self.rocksdb + .write_batch( + "put_value", + Priority::High, + IoMode::Default, + self.write_options(), + write_batch, + ) + .await + .map_err(Into::into) + } + + pub async fn append(&mut self, entries: &Vec) -> Result<(), Error> { + let mut write_batch = WriteBatch::default(); + let mut buffer = mem::take(&mut self.buffer); + let mut last_index = self.last_index; + + { + let cf = self.get_cf_handle(); + + for entry in entries { + assert_eq!(last_index + 1, entry.index, "Expect raft log w/o holes"); + let key = Self::raft_entry_key(entry.index); + + buffer.clear(); + entry.write_to_vec(&mut buffer)?; + + write_batch.put_cf(&cf, key, &buffer); + last_index = entry.index; + } + } + + let result = self + .rocksdb + .write_batch( + "append", + Priority::High, + IoMode::Default, + self.write_options(), + write_batch, + ) + .await + .map_err(Into::into); + + self.buffer = buffer; + self.last_index = last_index; + + result + } + + fn get_cf_handle(&self) -> Arc { + self.db.cf_handle(RAFT_CF).expect("RAFT_CF exists") + } + + fn raft_entry_key(idx: u64) -> [u8; RAFT_ENTRY_KEY_LENGTH] { + let mut key = [0; RAFT_ENTRY_KEY_LENGTH]; + key[0] = RAFT_ENTRY_DISCRIMINATOR; + key[1..9].copy_from_slice(&idx.to_be_bytes()); + key + } + + fn hard_state_key() -> [u8; 1] { + [HARD_STATE_DISCRIMINATOR] + } + + fn conf_state_key() -> [u8; 1] { + [CONF_STATE_DISCRIMINATOR] + } + + fn check_index(&self, idx: u64) -> Result<(), Error> { + if idx < self.first_index() { + return Err(Error::Compacted(self.first_index())); + } else if idx > self.last_index() { + return Err(Error::IndexOutOfBounds { + index: idx, + last_index: self.last_index(), + }); + } + + Ok(()) + } + + fn check_range(&self, low: u64, high: u64) -> Result<(), Error> { + assert!(low < high, "Low '{low}' must be smaller than high '{high}'"); + + if low < self.first_index() { + return Err(Error::Compacted(self.first_index())); + } + + if high > self.last_index() + 1 { + return Err(Error::IndexOutOfBounds { + index: high, + last_index: self.last_index(), + }); + } + + Ok(()) + } + + fn last_index(&self) -> u64 { + self.last_index + } + + fn first_index(&self) -> u64 { + FIRST_RAFT_INDEX + } + + pub fn apply_snapshot(&mut self, _snapshot: Snapshot) -> Result<(), Error> { + unimplemented!("snapshots are currently not supported"); + } +} + +impl Storage for RocksDbStorage { + fn initial_state(&self) -> raft::Result { + let hard_state = self.get_hard_state()?; + Ok(RaftState::new(hard_state, self.get_conf_state()?)) + } + + fn entries( + &self, + low: u64, + high: u64, + max_size: impl Into>, + _context: GetEntriesContext, + ) -> raft::Result> { + self.check_range(low, high)?; + let start_key = Self::raft_entry_key(low); + let end_key = Self::raft_entry_key(high); + + let cf = self.get_cf_handle(); + let mut opts = ReadOptions::default(); + opts.set_iterate_range(start_key..end_key); + opts.set_async_io(true); + + let mut iterator = self.db.raw_iterator_cf_opt(&cf, opts); + iterator.seek(start_key); + + let mut result = + Vec::with_capacity(usize::try_from(high - low).expect("u64 fits into usize")); + + let max_size = + usize::try_from(max_size.into().unwrap_or(u64::MAX)).expect("u64 fits into usize"); + let mut size = 0; + let mut expected_idx = low; + + while iterator.valid() { + if size > 0 && size >= max_size { + break; + } + + if let Some(value) = iterator.value() { + let mut entry = Entry::default(); + entry.merge_from_bytes(value)?; + + if expected_idx != entry.index { + if expected_idx == low { + Err(StorageError::Compacted)?; + } else { + // missing raft entries :-( + Err(StorageError::Unavailable)?; + } + } + + result.push(entry); + expected_idx += 1; + size += value.len(); + } + + iterator.next(); + } + + // check for an occurred error + iterator + .status() + .map_err(|err| StorageError::Other(err.into()))?; + + Ok(result) + } + + fn term(&self, idx: u64) -> raft::Result { + // todo handle first_index - 1 once truncation is supported + if idx == 0 { + return Ok(0); + } + + self.check_index(idx)?; + self.get_entry(idx) + .map(|entry| entry.expect("should exist").term) + .map_err(Into::into) + } + + fn first_index(&self) -> raft::Result { + Ok(self.first_index()) + } + + fn last_index(&self) -> raft::Result { + Ok(self.last_index()) + } + + fn snapshot(&self, _request_index: u64, _to: u64) -> raft::Result { + // time is relative as some clever people figured out + Err(raft::Error::Store( + StorageError::SnapshotTemporarilyUnavailable, + )) + } +} + +pub fn storage_error(error: E) -> raft::Error +where + E: Into>, +{ + raft::Error::Store(StorageError::Other(error.into())) +} diff --git a/crates/metadata-store/src/raft/store.rs b/crates/metadata-store/src/raft/store.rs new file mode 100644 index 000000000..e859b89f8 --- /dev/null +++ b/crates/metadata-store/src/raft/store.rs @@ -0,0 +1,561 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::raft::storage; +use crate::raft::storage::RocksDbStorage; +use crate::{ + MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender, +}; +use assert2::let_assert; +use bytes::{Bytes, BytesMut}; +use bytestring::ByteString; +use protobuf::{Message as ProtobufMessage, ProtobufError}; +use raft::prelude::{ConfChange, ConfChangeV2, ConfState, Entry, EntryType, Message}; +use raft::{Config, RawNode}; +use restate_core::cancellation_watcher; +use restate_core::metadata_store::{Precondition, VersionedValue}; +use restate_types::config::Configuration; +use restate_types::storage::{StorageCodec, StorageDecodeError, StorageEncodeError}; +use restate_types::{flexbuffers_storage_encode_decode, Version}; +use slog::o; +use std::collections::HashMap; +use std::time::Duration; +use tokio::sync::{mpsc, oneshot}; +use tokio::time; +use tokio::time::MissedTickBehavior; +use tracing::{debug, info, warn}; +use tracing_slog::TracingSlogDrain; +use ulid::Ulid; + +#[derive(Debug, thiserror::Error)] +pub enum BuildError { + #[error("failed creating raft node: {0}")] + Raft(#[from] raft::Error), + #[error("failed creating raft storage: {0}")] + Storage(#[from] storage::BuildError), + #[error("failed bootstrapping conf state: {0}")] + BootstrapConfState(#[from] storage::Error), +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed appending entries: {0}")] + Append(#[from] raft::Error), + #[error("failed deserializing raft serialized requests: {0}")] + DecodeRequest(StorageDecodeError), + #[error("failed deserializing conf change: {0}")] + DecodeConf(ProtobufError), + #[error("failed applying conf change: {0}")] + ApplyConfChange(raft::Error), + #[error("failed reading/writing from/to storage: {0}")] + Storage(#[from] storage::Error), +} + +pub struct RaftMetadataStore { + _logger: slog::Logger, + raw_node: RawNode, + tick_interval: time::Interval, + + callbacks: HashMap, + kv_entries: HashMap, + + request_tx: RequestSender, + request_rx: RequestReceiver, +} + +impl RaftMetadataStore { + pub async fn create() -> Result { + let (request_tx, request_rx) = mpsc::channel(2); + + let config = Config { + id: 1, + ..Default::default() + }; + + let rocksdb_options = Configuration::updateable() + .map(|configuration| &configuration.common.rocksdb) + .boxed(); + let mut metadata_store_options = + Configuration::updateable().map(|configuration| &configuration.metadata_store); + let mut store = + RocksDbStorage::create(metadata_store_options.live_load(), rocksdb_options).await?; + let conf_state = ConfState::from((vec![1], vec![])); + store.store_conf_state(conf_state).await?; + + let drain = TracingSlogDrain; + let logger = slog::Logger::root(drain, o!()); + + let raw_node = RawNode::new(&config, store, &logger)?; + + let mut tick_interval = time::interval(Duration::from_millis(100)); + tick_interval.set_missed_tick_behavior(MissedTickBehavior::Burst); + + Ok(Self { + // we only need to keep it alive + _logger: logger, + raw_node, + tick_interval, + callbacks: HashMap::default(), + kv_entries: HashMap::default(), + request_rx, + request_tx, + }) + } + + pub fn request_sender(&self) -> RequestSender { + self.request_tx.clone() + } + + pub async fn run(mut self) -> Result<(), Error> { + let mut cancellation = std::pin::pin!(cancellation_watcher()); + + loop { + tokio::select! { + _ = &mut cancellation => { + break; + } + Some(request) = self.request_rx.recv() => { + // todo: Unclear whether every replica should be allowed to propose. Maybe + // only the leader should propose and respond to clients. + let (callback, request) = Self::split_request(request); + + if let Err(err) = request + .encode_to_vec() + .map_err(Into::into) + .and_then(|request| self.raw_node + .propose(vec![], request) + .map_err(RequestError::from)) { + info!("Failed processing request: {err}"); + callback.fail(err); + continue; + } + + self.register_callback(callback); + } + _ = self.tick_interval.tick() => { + self.raw_node.tick(); + } + } + + self.on_ready().await?; + } + + debug!("Stop running RaftMetadataStore."); + + Ok(()) + } + + async fn on_ready(&mut self) -> Result<(), Error> { + if !self.raw_node.has_ready() { + return Ok(()); + } + + let mut ready = self.raw_node.ready(); + + // first need to send outgoing messages + if !ready.messages().is_empty() { + self.send_messages(ready.take_messages()); + } + + // apply snapshot if one was sent + if !ready.snapshot().is_empty() { + if let Err(err) = self + .raw_node + .mut_store() + .apply_snapshot(ready.snapshot().clone()) + { + warn!("failed applying snapshot: {err}"); + } + } + + // then handle committed entries + self.handle_committed_entries(ready.take_committed_entries()) + .await?; + + // append new Raft entries to storage + self.raw_node.mut_store().append(ready.entries()).await?; + + // update the hard state if an update was produced (e.g. vote has happened) + if let Some(hs) = ready.hs() { + self.raw_node + .mut_store() + .store_hard_state(hs.clone()) + .await?; + } + + // send persisted messages (after entries were appended and hard state was updated) + if !ready.persisted_messages().is_empty() { + self.send_messages(ready.take_persisted_messages()); + } + + // advance the raft node + let mut light_ready = self.raw_node.advance(ready); + + // update the commit index if it changed + if let Some(_commit) = light_ready.commit_index() { + // update commit index in cached hard_state; no need to persist it though + } + + // send outgoing messages + if !light_ready.messages().is_empty() { + self.send_messages(light_ready.take_messages()); + } + + // handle committed entries + if !light_ready.committed_entries().is_empty() { + self.handle_committed_entries(light_ready.take_committed_entries()) + .await?; + } + + self.raw_node.advance_apply(); + + Ok(()) + } + + fn register_callback(&mut self, callback: Callback) { + self.callbacks.insert(callback.request_id, callback); + } + + fn send_messages(&self, _messages: Vec) { + // todo: Send messages to other peers + } + + async fn handle_committed_entries( + &mut self, + committed_entries: Vec, + ) -> Result<(), Error> { + for entry in committed_entries { + if entry.data.is_empty() { + // new leader was elected + continue; + } + + match entry.get_entry_type() { + EntryType::EntryNormal => self.handle_normal_entry(entry)?, + EntryType::EntryConfChange => self.handle_conf_change(entry).await?, + EntryType::EntryConfChangeV2 => self.handle_conf_change_v2(entry).await?, + } + } + + Ok(()) + } + + fn handle_normal_entry(&mut self, entry: Entry) -> Result<(), Error> { + let request = Request::decode_from_bytes(entry.data).map_err(Error::DecodeRequest)?; + self.handle_request(request); + + Ok(()) + } + + fn handle_request(&mut self, request: Request) { + match request.kind { + RequestKind::Get { key } => { + let result = self.get(key); + if let Some(callback) = self.callbacks.remove(&request.request_id) { + callback.complete_get(result); + } + } + RequestKind::GetVersion { key } => { + let result = self.get_version(key); + if let Some(callback) = self.callbacks.remove(&request.request_id) { + callback.complete_get_version(result); + } + } + RequestKind::Put { + key, + value, + precondition, + } => { + let result = self.put(key, value, precondition); + if let Some(callback) = self.callbacks.remove(&request.request_id) { + callback.complete_put(result.map_err(Into::into)); + } + } + RequestKind::Delete { key, precondition } => { + let result = self.delete(key, precondition); + if let Some(callback) = self.callbacks.remove(&request.request_id) { + callback.complete_delete(result.map_err(Into::into)); + } + } + } + } + + fn get(&self, key: ByteString) -> Option { + self.kv_entries.get(&key).cloned() + } + + fn get_version(&self, key: ByteString) -> Option { + self.kv_entries.get(&key).map(|entry| entry.version) + } + + fn put( + &mut self, + key: ByteString, + value: VersionedValue, + precondition: Precondition, + ) -> Result<(), PreconditionViolation> { + match precondition { + Precondition::None => { + self.kv_entries.insert(key, value); + } + Precondition::DoesNotExist => { + if self.kv_entries.contains_key(&key) { + return Err(PreconditionViolation::kv_pair_exists()); + } + + self.kv_entries.insert(key, value); + } + Precondition::MatchesVersion(expected_version) => { + let actual_version = self.kv_entries.get(&key).map(|entry| entry.version); + + if actual_version == Some(expected_version) { + self.kv_entries.insert(key, value); + } else { + return Err(PreconditionViolation::version_mismatch( + expected_version, + actual_version, + )); + } + } + } + + Ok(()) + } + + fn delete( + &mut self, + key: ByteString, + precondition: Precondition, + ) -> Result<(), PreconditionViolation> { + match precondition { + Precondition::None => { + self.kv_entries.remove(&key); + } + Precondition::DoesNotExist => { + if self.kv_entries.contains_key(&key) { + return Err(PreconditionViolation::kv_pair_exists()); + } + } + Precondition::MatchesVersion(expected_version) => { + let actual_version = self.kv_entries.get(&key).map(|entry| entry.version); + + if actual_version == Some(expected_version) { + self.kv_entries.remove(&key); + } else { + return Err(PreconditionViolation::version_mismatch( + expected_version, + actual_version, + )); + } + } + } + + Ok(()) + } + + async fn handle_conf_change(&mut self, entry: Entry) -> Result<(), Error> { + let mut cc = ConfChange::default(); + cc.merge_from_bytes(&entry.data) + .map_err(Error::DecodeConf)?; + let cs = self + .raw_node + .apply_conf_change(&cc) + .map_err(Error::ApplyConfChange)?; + self.raw_node.mut_store().store_conf_state(cs).await?; + Ok(()) + } + + async fn handle_conf_change_v2(&mut self, entry: Entry) -> Result<(), Error> { + let mut cc = ConfChangeV2::default(); + cc.merge_from_bytes(&entry.data) + .map_err(Error::DecodeConf)?; + let cs = self + .raw_node + .apply_conf_change(&cc) + .map_err(Error::ApplyConfChange)?; + self.raw_node.mut_store().store_conf_state(cs).await?; + Ok(()) + } + + fn split_request(request: MetadataStoreRequest) -> (Callback, Request) { + let (request_kind, callback_kind) = match request { + MetadataStoreRequest::Get { key, result_tx } => { + (RequestKind::Get { key }, CallbackKind::Get { result_tx }) + } + MetadataStoreRequest::GetVersion { key, result_tx } => ( + RequestKind::GetVersion { key }, + CallbackKind::GetVersion { result_tx }, + ), + MetadataStoreRequest::Put { + key, + value, + precondition, + result_tx, + } => ( + RequestKind::Put { + key, + value, + precondition, + }, + CallbackKind::Put { result_tx }, + ), + MetadataStoreRequest::Delete { + key, + precondition, + result_tx, + } => ( + RequestKind::Delete { key, precondition }, + CallbackKind::Delete { result_tx }, + ), + }; + + let request_id = Ulid::new(); + + let callback = Callback { + request_id, + kind: callback_kind, + }; + + let request = Request { + request_id, + kind: request_kind, + }; + + (callback, request) + } +} + +struct Callback { + request_id: Ulid, + kind: CallbackKind, +} + +impl Callback { + fn fail(self, err: impl Into) { + match self.kind { + CallbackKind::Get { result_tx } => { + // err only if the oneshot receiver has gone away + let _ = result_tx.send(Err(err.into())); + } + CallbackKind::GetVersion { result_tx } => { + // err only if the oneshot receiver has gone away + let _ = result_tx.send(Err(err.into())); + } + CallbackKind::Put { result_tx } => { + // err only if the oneshot receiver has gone away + let _ = result_tx.send(Err(err.into())); + } + CallbackKind::Delete { result_tx } => { + // err only if the oneshot receiver has gone away + let _ = result_tx.send(Err(err.into())); + } + }; + } + + fn complete_get(self, result: Option) { + let_assert!( + CallbackKind::Get { result_tx } = self.kind, + "expected 'Get' callback" + ); + // err if caller has gone + let _ = result_tx.send(Ok(result)); + } + + fn complete_get_version(self, result: Option) { + let_assert!( + CallbackKind::GetVersion { result_tx } = self.kind, + "expected 'GetVersion' callback" + ); + // err if caller has gone + let _ = result_tx.send(Ok(result)); + } + + fn complete_put(self, result: Result<(), RequestError>) { + let_assert!( + CallbackKind::Put { result_tx } = self.kind, + "expected 'Put' callback" + ); + // err if caller has gone + let _ = result_tx.send(result); + } + + fn complete_delete(self, result: Result<(), RequestError>) { + let_assert!( + CallbackKind::Delete { result_tx } = self.kind, + "expected 'Delete' callback" + ); + // err if caller has gone + let _ = result_tx.send(result); + } +} + +enum CallbackKind { + Get { + result_tx: oneshot::Sender, RequestError>>, + }, + GetVersion { + result_tx: oneshot::Sender, RequestError>>, + }, + Put { + result_tx: oneshot::Sender>, + }, + Delete { + result_tx: oneshot::Sender>, + }, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +struct Request { + request_id: Ulid, + kind: RequestKind, +} + +flexbuffers_storage_encode_decode!(Request); + +impl Request { + fn encode_to_vec(&self) -> Result, StorageEncodeError> { + let mut buffer = BytesMut::new(); + // todo: Removing support for BufMut requires an extra copy from BytesMut to Vec :-( + StorageCodec::encode(self, &mut buffer)?; + Ok(buffer.to_vec()) + } + + fn decode_from_bytes(mut bytes: Bytes) -> Result { + StorageCodec::decode::(&mut bytes) + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +enum RequestKind { + Get { + key: ByteString, + }, + GetVersion { + key: ByteString, + }, + Put { + key: ByteString, + value: VersionedValue, + precondition: Precondition, + }, + Delete { + key: ByteString, + precondition: Precondition, + }, +} + +impl From for RequestError { + fn from(value: raft::Error) -> Self { + match value { + err @ raft::Error::ProposalDropped => RequestError::Unavailable(err.into()), + err => RequestError::Internal(err.into()), + } + } +} diff --git a/crates/metadata-store/src/util.rs b/crates/metadata-store/src/util.rs new file mode 100644 index 000000000..41b971da4 --- /dev/null +++ b/crates/metadata-store/src/util.rs @@ -0,0 +1,52 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_types::config::MetadataStoreOptions; +use rocksdb::DBCompressionType; + +pub fn db_options(_options: &MetadataStoreOptions) -> rocksdb::Options { + rocksdb::Options::default() +} + +pub fn cf_options( + memory_budget: usize, +) -> impl Fn(rocksdb::Options) -> rocksdb::Options + Send + Sync + 'static { + move |mut opts| { + set_memory_related_opts(&mut opts, memory_budget); + opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); + opts.set_num_levels(3); + + opts.set_compression_per_level(&[ + DBCompressionType::None, + DBCompressionType::None, + DBCompressionType::Zstd, + ]); + + // + opts + } +} + +pub fn set_memory_related_opts(opts: &mut rocksdb::Options, memtables_budget: usize) { + // We set the budget to allow 1 mutable + 3 immutable. + opts.set_write_buffer_size(memtables_budget / 4); + + // merge 2 memtables when flushing to L0 + opts.set_min_write_buffer_number_to_merge(2); + opts.set_max_write_buffer_number(4); + // start flushing L0->L1 as soon as possible. each file on level0 is + // (memtable_memory_budget / 2). This will flush level 0 when it's bigger than + // memtable_memory_budget. + opts.set_level_zero_file_num_compaction_trigger(2); + // doesn't really matter much, but we don't want to create too many files + opts.set_target_file_size_base(memtables_budget as u64 / 8); + // make Level1 size equal to Level0 size, so that L0->L1 compactions are fast + opts.set_max_bytes_for_level_base(memtables_budget as u64); +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index c28fd95e4..eecd0477c 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -31,9 +31,12 @@ use restate_core::{task_center, TaskKind}; #[cfg(feature = "replicated-loglet")] use restate_log_server::LogServerService; use restate_metadata_store::local::LocalMetadataStoreService; -use restate_metadata_store::MetadataStoreClient; +use restate_metadata_store::raft::service::RaftMetadataStoreService; +use restate_metadata_store::{ + BoxedMetadataStoreService, MetadataStoreClient, MetadataStoreService, +}; use restate_types::cluster_controller::SchedulingPlan; -use restate_types::config::{CommonOptions, Configuration}; +use restate_types::config::{CommonOptions, Configuration, Kind}; use restate_types::live::Live; use restate_types::logs::metadata::{bootstrap_logs_metadata, Logs}; use restate_types::metadata_store::keys::{ @@ -109,7 +112,7 @@ pub struct Node { metadata_manager: MetadataManager, metadata_store_client: MetadataStoreClient, bifrost: BifrostService, - metadata_store_role: Option, + metadata_store_role: Option, admin_role: Option, worker_role: Option, #[cfg(feature = "replicated-loglet")] @@ -118,7 +121,7 @@ pub struct Node { } impl Node { - pub async fn create(updateable_config: Live) -> Result { + pub async fn create(mut updateable_config: Live) -> Result { let tc = task_center(); let config = updateable_config.pinned(); // ensure we have cluster admin role if bootstrapping. @@ -138,13 +141,7 @@ impl Node { cluster_marker::validate_and_update_cluster_marker(config.common.cluster_name())?; let metadata_store_role = if config.has_role(Role::MetadataStore) { - Some(LocalMetadataStoreService::from_options( - updateable_config.clone().map(|c| &c.metadata_store).boxed(), - updateable_config - .clone() - .map(|config| &config.metadata_store.rocksdb) - .boxed(), - )) + Some(Self::create_metadata_store(&mut updateable_config)) } else { None }; @@ -270,6 +267,27 @@ impl Node { }) } + fn create_metadata_store( + updateable_config: &mut Live, + ) -> BoxedMetadataStoreService { + let config = updateable_config.live_load(); + + match config.metadata_store.kind { + Kind::Local => LocalMetadataStoreService::from_options( + updateable_config.clone().map(|c| &c.metadata_store).boxed(), + updateable_config + .clone() + .map(|config| &config.metadata_store.rocksdb) + .boxed(), + ) + .boxed(), + Kind::Raft => RaftMetadataStoreService::new( + updateable_config.clone().map(|c| &c.metadata_store).boxed(), + ) + .boxed(), + } + } + pub async fn start(self) -> Result<(), anyhow::Error> { let tc = task_center(); diff --git a/crates/types/src/config/metadata_store.rs b/crates/types/src/config/metadata_store.rs index 370f6094e..e034444ff 100644 --- a/crates/types/src/config/metadata_store.rs +++ b/crates/types/src/config/metadata_store.rs @@ -60,6 +60,20 @@ pub struct MetadataStoreOptions { /// /// The RocksDB options which will be used to configure the metadata store's RocksDB instance. pub rocksdb: RocksDbOptions, + + /// # Type of metadata store to start + /// + /// The type of metadata store to start when running the metadata store role. + pub kind: Kind, +} + +#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[serde(rename_all = "kebab-case")] +pub enum Kind { + #[default] + Local, + Raft, } impl MetadataStoreOptions { @@ -112,6 +126,7 @@ impl Default for MetadataStoreOptions { rocksdb_memory_budget: None, rocksdb_memory_ratio: 0.01, rocksdb, + kind: Kind::default(), } } }