Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt the non-blocking write in bg #1565

Merged
merged 5 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"crates/*",
"crates/codederror/derive",
"server",
"tools/mock-service-endpoint",
"tools/service-protocol-wireshark-dissector",
"tools/xtask",
"tools/bifrost-benchpress",
Expand Down Expand Up @@ -107,6 +108,7 @@ hyper = { version = "0.14.24", default-features = false }
hyper-rustls = { version = "0.24.1", features = ["http2"] }
itertools = "0.11.0"
metrics = { version = "0.22" }
metrics-exporter-prometheus = { version = "0.14", default-features = false, features = ["async-runtime"] }
once_cell = "1.18"
opentelemetry = { version = "0.22.0" }
opentelemetry-http = { version = "0.11.1" }
Expand Down
4 changes: 3 additions & 1 deletion crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ futures = { workspace = true }
humantime = { workspace = true }
metrics = { workspace = true }
once_cell = { workspace = true }
pin-project = { workspace = true }
rocksdb = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
Expand All @@ -39,7 +40,8 @@ strum = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tokio-stream = { workspace = true, features = ["sync"] }
tokio-util = { workspace = true }
tracing = { workspace = true }


Expand Down
82 changes: 49 additions & 33 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use restate_types::Version;

use crate::loglet::{LogletBase, LogletProvider, LogletWrapper};
use crate::watchdog::{WatchdogCommand, WatchdogSender};
use crate::{Error, FindTailAttributes, LogReadStream, LogRecord, SMALL_BATCH_THRESHOLD_COUNT};
use crate::{
Error, FindTailAttributes, LogReadStream, LogRecord, Result, SMALL_BATCH_THRESHOLD_COUNT,
};

/// Bifrost is Restate's durable interconnect system
///
Expand Down Expand Up @@ -65,27 +67,23 @@ impl Bifrost {
/// Appends a single record to a log. The log id must exist, otherwise the
/// operation fails with [`Error::UnknownLogId`]
#[instrument(level = "debug", skip(self, payload), err)]
pub async fn append(&mut self, log_id: LogId, payload: Payload) -> Result<Lsn, Error> {
pub async fn append(&mut self, log_id: LogId, payload: Payload) -> Result<Lsn> {
self.inner.append(log_id, payload).await
}

/// Appends a batch of records to a log. The log id must exist, otherwise the
/// operation fails with [`Error::UnknownLogId`]. The returned Lsn is the Lsn of the first
/// record in this batch. This will only return after all records have been stored.
#[instrument(level = "debug", skip(self, payloads), err)]
pub async fn append_batch(
&mut self,
log_id: LogId,
payloads: &[Payload],
) -> Result<Lsn, Error> {
pub async fn append_batch(&mut self, log_id: LogId, payloads: &[Payload]) -> Result<Lsn> {
self.inner.append_batch(log_id, payloads).await
}

/// Read the next record after the LSN provided. The `start` indicates the LSN where we will
/// read after. This means that the record returned will have a LSN strictly greater than
/// `after`. If no records are committed yet after this LSN, this read operation will "wait"
/// for such records to appear.
pub async fn read_next_single(&self, log_id: LogId, after: Lsn) -> Result<LogRecord, Error> {
pub async fn read_next_single(&self, log_id: LogId, after: Lsn) -> Result<LogRecord> {
self.inner.read_next_single(log_id, after).await
}

Expand All @@ -96,12 +94,19 @@ impl Bifrost {
&self,
log_id: LogId,
after: Lsn,
) -> Result<Option<LogRecord>, Error> {
) -> Result<Option<LogRecord>> {
self.inner.read_next_single_opt(log_id, after).await
}

pub fn create_reader(&self, log_id: LogId, after: Lsn) -> LogReadStream {
LogReadStream::new(self.inner.clone(), log_id, after)
/// Create a read stream. Until is inclusive. Pass [[`Lsn::Max`]] for a tailing stream. Use
/// Lsn::INVALID in _after_ to read from the start (head) of the log.
pub async fn create_reader(
&self,
log_id: LogId,
after: Lsn,
until: Lsn,
) -> Result<LogReadStream> {
LogReadStream::create(self.inner.clone(), log_id, after, until).await
}

/// Finds the current readable tail LSN of a log.
Expand All @@ -110,8 +115,8 @@ impl Bifrost {
&self,
log_id: LogId,
attributes: FindTailAttributes,
) -> Result<Option<Lsn>, Error> {
self.inner.find_tail(log_id, attributes).await
) -> Result<Option<Lsn>> {
Ok(self.inner.find_tail(log_id, attributes).await?.1)
}

/// The lsn of the slot **before** the first readable record (if it exists), or the offset
Expand All @@ -138,16 +143,22 @@ impl Bifrost {

/// Read a full log with the given id. To be used only in tests!!!
#[cfg(any(test, feature = "test-util"))]
pub async fn read_all(&self, log_id: LogId) -> Result<Vec<LogRecord>, Error> {
pub async fn read_all(&self, log_id: LogId) -> Result<Vec<LogRecord>> {
use futures::TryStreamExt;

self.inner.fail_if_shutting_down()?;

let mut v = vec![];
let mut reader = self.create_reader(log_id, Lsn::INVALID);
while let Some(r) = reader.read_next_opt().await? {
v.push(r);
}
let current_tail = self
.find_tail(log_id, FindTailAttributes::default())
.await?;
let Some(current_tail) = current_tail else {
return Ok(Vec::default());
};

Ok(v)
let reader = self
.create_reader(log_id, Lsn::INVALID, current_tail)
.await?;
reader.try_collect().await
}
}

Expand Down Expand Up @@ -182,15 +193,15 @@ impl BifrostInner {

/// Appends a single record to a log. The log id must exist, otherwise the
/// operation fails with [`Error::UnknownLogId`]
pub async fn append(&self, log_id: LogId, payload: Payload) -> Result<Lsn, Error> {
pub async fn append(&self, log_id: LogId, payload: Payload) -> Result<Lsn> {
self.fail_if_shutting_down()?;
let loglet = self.writeable_loglet(log_id).await?;
let mut buf = BytesMut::default();
StorageCodec::encode(payload, &mut buf).expect("serialization to bifrost is infallible");
loglet.append(buf.freeze()).await
}

pub async fn append_batch(&self, log_id: LogId, payloads: &[Payload]) -> Result<Lsn, Error> {
pub async fn append_batch(&self, log_id: LogId, payloads: &[Payload]) -> Result<Lsn> {
let loglet = self.writeable_loglet(log_id).await?;
let raw_payloads: SmallVec<[_; SMALL_BATCH_THRESHOLD_COUNT]> = payloads
.iter()
Expand All @@ -204,7 +215,7 @@ impl BifrostInner {
loglet.append_batch(&raw_payloads).await
}

pub async fn read_next_single(&self, log_id: LogId, after: Lsn) -> Result<LogRecord, Error> {
pub async fn read_next_single(&self, log_id: LogId, after: Lsn) -> Result<LogRecord> {
self.fail_if_shutting_down()?;

let loglet = self.find_loglet_for_lsn(log_id, after.next()).await?;
Expand All @@ -219,7 +230,7 @@ impl BifrostInner {
&self,
log_id: LogId,
after: Lsn,
) -> Result<Option<LogRecord>, Error> {
) -> Result<Option<LogRecord>> {
self.fail_if_shutting_down()?;

let loglet = self.find_loglet_for_lsn(log_id, after.next()).await?;
Expand All @@ -234,10 +245,11 @@ impl BifrostInner {
&self,
log_id: LogId,
_attributes: FindTailAttributes,
) -> Result<Option<Lsn>, Error> {
) -> Result<(LogletWrapper, Option<Lsn>)> {
self.fail_if_shutting_down()?;
let loglet = self.writeable_loglet(log_id).await?;
loglet.find_tail().await
let tail = loglet.find_tail().await?;
Ok((loglet, tail))
}

async fn get_trim_point(&self, log_id: LogId) -> Result<Option<Lsn>, Error> {
Expand Down Expand Up @@ -291,7 +303,7 @@ impl BifrostInner {
}

#[inline]
fn fail_if_shutting_down(&self) -> Result<(), Error> {
fn fail_if_shutting_down(&self) -> Result<()> {
if self.shutting_down.load(Ordering::Relaxed) {
Err(Error::Shutdown(restate_core::ShutdownError))
} else {
Expand All @@ -300,7 +312,7 @@ impl BifrostInner {
}

/// Immediately fetch new metadata from metadata store.
pub async fn sync_metadata(&self) -> Result<(), Error> {
pub async fn sync_metadata(&self) -> Result<()> {
self.fail_if_shutting_down()?;
self.metadata
.sync(MetadataKind::Logs)
Expand Down Expand Up @@ -342,7 +354,7 @@ impl BifrostInner {
assert!(self.providers[kind].try_insert(provider).is_ok());
}

async fn writeable_loglet(&self, log_id: LogId) -> Result<LogletWrapper, Error> {
async fn writeable_loglet(&self, log_id: LogId) -> Result<LogletWrapper> {
let tail_segment = self
.metadata
.logs()
Expand All @@ -351,7 +363,11 @@ impl BifrostInner {
self.get_loglet(&tail_segment).await
}

async fn find_loglet_for_lsn(&self, log_id: LogId, lsn: Lsn) -> Result<LogletWrapper, Error> {
pub(crate) async fn find_loglet_for_lsn(
&self,
log_id: LogId,
lsn: Lsn,
) -> Result<LogletWrapper> {
let segment = self
.metadata
.logs()
Expand Down Expand Up @@ -390,7 +406,7 @@ mod tests {

#[tokio::test]
#[traced_test]
async fn test_append_smoke() -> Result<()> {
async fn test_append_smoke() -> googletest::Result<()> {
let num_partitions = 5;
let node_env = TestCoreEnvBuilder::new_with_mock_network()
.with_partition_table(FixedPartitionTable::new(Version::MIN, num_partitions))
Expand Down Expand Up @@ -464,7 +480,7 @@ mod tests {
}

#[tokio::test(start_paused = true)]
async fn test_lazy_initialization() -> Result<()> {
async fn test_lazy_initialization() -> googletest::Result<()> {
let node_env = TestCoreEnv::create_with_mock_nodes_config(1, 1).await;
let tc = node_env.tc;
tc.run_in_scope("test", None, async {
Expand All @@ -491,7 +507,7 @@ mod tests {
}

#[test(tokio::test)]
async fn trim_log_smoke_test() -> Result<()> {
async fn trim_log_smoke_test() -> googletest::Result<()> {
let node_env = TestCoreEnvBuilder::new_with_mock_network()
.set_provider_kind(ProviderKind::Local)
.build()
Expand Down
6 changes: 4 additions & 2 deletions crates/bifrost/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@

use restate_core::{ShutdownError, SyncError};
use std::sync::Arc;
use thiserror::Error;

use restate_types::logs::{LogId, Lsn};

use crate::loglets::local_loglet::LogStoreError;
use crate::types::SealReason;

#[derive(Error, Debug, Clone)]
/// Result type for bifrost operations.
pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(thiserror::Error, Debug, Clone)]
pub enum Error {
#[error("log '{0}' is sealed")]
LogSealed(LogId, SealReason),
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod types;
mod watchdog;

pub use bifrost::Bifrost;
pub use error::{Error, ProviderError};
pub use error::{Error, ProviderError, Result};
pub use read_stream::LogReadStream;
pub use record::*;
pub use service::BifrostService;
Expand Down
Loading
Loading