Skip to content

Commit

Permalink
Default to low-latency local loglet configuration
Browse files Browse the repository at this point in the history
This PR changes the default configuration of the local loglet to disable time-based batching. Most users's first impressions and general use will expect interactive low-latency experience and we want this to be the default case.
A major performance win is the change in local loglet implementation. Tokio has 1ms timer resolution, so even if batching duration is set to zero, the runtime will still sleep an arbitrary amound between 0 and 1ms to resume the stream. The change switches to chunking ready items by default if time-based batching is zero. This results in 6X lower append path latency in my tests (even with fdatasync running on fast nvme)
  • Loading branch information
AhmedSoliman committed May 22, 2024
1 parent d271c67 commit 347768f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
26 changes: 21 additions & 5 deletions crates/bifrost/src/loglets/local_loglet/log_store_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@
// by the Apache License, Version 2.0.

use std::sync::Arc;
use std::time::Duration;

use bytes::{Bytes, BytesMut};
use futures::StreamExt as FutureStreamExt;
use metrics::histogram;
use restate_rocksdb::{IoMode, Priority, RocksDb};
use rocksdb::{BoundColumnFamily, WriteBatch};
use smallvec::SmallVec;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tokio_stream::StreamExt as TokioStreamExt;
use tracing::{debug, error, trace, warn};

use restate_core::{cancellation_watcher, task_center, ShutdownError, TaskKind};
Expand Down Expand Up @@ -73,6 +75,8 @@ impl LogStoreWriter {
) -> Result<RocksDbLogWriterHandle, ShutdownError> {
// big enough to allows a second full batch to queue up while the existing one is being processed
let batch_size = std::cmp::max(1, updateable.load().writer_batch_commit_count);
// leave twice as much space in the the channel to ensure we can enqueue up-to a full batch in
// the backlog while we process this one.
let (sender, receiver) = mpsc::channel(batch_size * 2);

task_center().spawn_child(
Expand All @@ -82,9 +86,19 @@ impl LogStoreWriter {
async move {
let opts = updateable.load();
let batch_size = std::cmp::max(1, opts.writer_batch_commit_count);
let batch_duration = opts.writer_batch_commit_duration.into();
let receiver =
ReceiverStream::new(receiver).chunks_timeout(batch_size, batch_duration);
let batch_duration: Duration = opts.writer_batch_commit_duration.into();
// We don't want to use chunks_timeout if time-based batching is disabled, why?
// because even if duration is zero, tokio's timer resolution is 1ms which means
// that we will delay every batch by 1ms for no reason.
let receiver = if batch_duration == Duration::ZERO {
ReceiverStream::new(receiver)
.ready_chunks(batch_size)
.boxed()
} else {
ReceiverStream::new(receiver)
.chunks_timeout(batch_size, batch_duration)
.boxed()
};
tokio::pin!(receiver);

loop {
Expand All @@ -93,7 +107,7 @@ impl LogStoreWriter {
_ = cancellation_watcher() => {
break;
}
Some(cmds) = receiver.next() => {
Some(cmds) = TokioStreamExt::next(&mut receiver) => {
let opts = updateable.load();
self.handle_commands(opts, cmds).await;
}
Expand Down Expand Up @@ -143,6 +157,8 @@ impl LogStoreWriter {
}
}

// todo: future optimization. pre-merge all updates within a batch before writing
// the merge to rocksdb.
if let Some(logstate_updates) = command.log_state_updates {
Self::update_log_state(
&metadata_cf,
Expand Down
2 changes: 1 addition & 1 deletion crates/types/src/config/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl Default for LocalLogletOptions {
batch_wal_flushes: true,
sync_wal_before_ack: true,
writer_batch_commit_count: 500,
writer_batch_commit_duration: Duration::from_nanos(5).into(),
writer_batch_commit_duration: Duration::ZERO.into(),
#[cfg(any(test, feature = "test-util"))]
data_dir: super::default_arc_tmp(),
}
Expand Down

0 comments on commit 347768f

Please sign in to comment.