Skip to content

Commit

Permalink
[TaskCenter] Migrate tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Nov 22, 2024
1 parent fad6f95 commit 25798f9
Show file tree
Hide file tree
Showing 4 changed files with 526 additions and 578 deletions.
11 changes: 0 additions & 11 deletions crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,3 @@ pub use service::BifrostService;
pub use types::*;

pub const SMALL_BATCH_THRESHOLD_COUNT: usize = 4;

#[cfg(any(test, feature = "test-util"))]
pub(crate) fn setup_panic_handler() {
// Make sure that panics exits the process.
let orig_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
// invoke the default handler and exit the process
orig_hook(panic_info);
std::process::exit(1);
}));
}
91 changes: 39 additions & 52 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ use tokio::task::{JoinHandle, JoinSet};
use tokio_stream::StreamExt;
use tracing::info;

use restate_core::{task_center, TaskHandle, TaskKind};
use restate_core::{TaskCenter, TaskCenterFutureExt, TaskHandle, TaskKind};
use restate_test_util::let_assert;
use restate_types::logs::metadata::{LogletConfig, SegmentIndex};
use restate_types::logs::{KeyFilter, Lsn, SequenceNumber, TailState};

use super::Loglet;
use crate::loglet::AppendError;
use crate::loglet_wrapper::LogletWrapper;
use crate::setup_panic_handler;

async fn wait_for_trim(loglet: &LogletWrapper, required_trim_point: Lsn) -> anyhow::Result<()> {
for _ in 0..3 {
Expand All @@ -54,7 +53,6 @@ async fn wait_for_trim(loglet: &LogletWrapper, required_trim_point: Lsn) -> anyh
/// is started, initialized, and ready for reads and writes. It also assumes that this loglet
/// provide contiguous offsets that start from Lsn::OLDEST.
pub async fn gapless_loglet_smoke_test(loglet: Arc<dyn Loglet>) -> googletest::Result<()> {
setup_panic_handler();
let loglet = LogletWrapper::new(
SegmentIndex::from(1),
Lsn::OLDEST,
Expand Down Expand Up @@ -125,7 +123,7 @@ pub async fn gapless_loglet_smoke_test(loglet: Arc<dyn Loglet>) -> googletest::R
assert!(loglet.read_opt(Lsn::new(end)).await?.is_none());

let handle1: TaskHandle<googletest::Result<()>> =
task_center().spawn_unmanaged(TaskKind::TestRunner, "read", None, {
TaskCenter::current().spawn_unmanaged(TaskKind::TestRunner, "read", None, {
let loglet = loglet.clone();
async move {
// read future record 4
Expand All @@ -142,7 +140,7 @@ pub async fn gapless_loglet_smoke_test(loglet: Arc<dyn Loglet>) -> googletest::R

// Waiting for 10
let handle2: TaskHandle<googletest::Result<()>> =
task_center().spawn_unmanaged(TaskKind::TestRunner, "read", None, {
TaskCenter::current().spawn_unmanaged(TaskKind::TestRunner, "read", None, {
let loglet = loglet.clone();
async move {
// read future record 10
Expand Down Expand Up @@ -226,7 +224,6 @@ pub async fn gapless_loglet_smoke_test(loglet: Arc<dyn Loglet>) -> googletest::R
/// is started, initialized, and ready for reads and writes. It also assumes that this loglet
/// starts from Lsn::OLDEST.
pub async fn single_loglet_readstream(loglet: Arc<dyn Loglet>) -> googletest::Result<()> {
setup_panic_handler();
let loglet = LogletWrapper::new(
SegmentIndex::from(1),
Lsn::OLDEST,
Expand Down Expand Up @@ -305,7 +302,6 @@ pub async fn single_loglet_readstream(loglet: Arc<dyn Loglet>) -> googletest::Re
pub async fn single_loglet_readstream_with_trims(
loglet: Arc<dyn Loglet>,
) -> googletest::Result<()> {
setup_panic_handler();
let loglet = LogletWrapper::new(
SegmentIndex::from(1),
Lsn::OLDEST,
Expand Down Expand Up @@ -427,7 +423,6 @@ pub async fn single_loglet_readstream_with_trims(

/// Validates that appends fail after find_tail() returned Sealed()
pub async fn append_after_seal(loglet: Arc<dyn Loglet>) -> googletest::Result<()> {
setup_panic_handler();
let loglet = LogletWrapper::new(
SegmentIndex::from(1),
Lsn::OLDEST,
Expand Down Expand Up @@ -470,7 +465,6 @@ pub async fn append_after_seal_concurrent(loglet: Arc<dyn Loglet>) -> googletest
const WARMUP_APPENDS: usize = 200;
const CONCURRENT_APPENDERS: usize = 20;

setup_panic_handler();
let loglet = LogletWrapper::new(
SegmentIndex::from(1),
Lsn::OLDEST,
Expand All @@ -490,65 +484,59 @@ pub async fn append_after_seal_concurrent(loglet: Arc<dyn Loglet>) -> googletest
let append_barrier = Arc::new(Barrier::new(CONCURRENT_APPENDERS + 1));

let mut appenders: JoinSet<googletest::Result<_>> = JoinSet::new();
let tc = task_center();
for appender_id in 0..CONCURRENT_APPENDERS {
appenders.spawn({
let loglet = loglet.clone();
let append_barrier = append_barrier.clone();
let tc = tc.clone();
async move {
tc.run_in_scope("append", None, async move {
let mut i = 1;
let mut committed = Vec::new();
let mut warmup = true;
loop {
let res = loglet
.append(format!("appender-{}-record{}", appender_id, i).into())
.await;
i += 1;
if i > WARMUP_APPENDS && warmup {
println!("appender({}) - warmup complete....", appender_id);
append_barrier.wait().await;
warmup = false;
let mut i = 1;
let mut committed = Vec::new();
let mut warmup = true;
loop {
let res = loglet
.append(format!("appender-{}-record{}", appender_id, i).into())
.await;
i += 1;
if i > WARMUP_APPENDS && warmup {
println!("appender({}) - warmup complete....", appender_id);
append_barrier.wait().await;
warmup = false;
}
match res {
Ok(offset) => {
committed.push(offset);
}
Err(AppendError::Sealed) => {
println!("append failed({}) => SEALED", i);
break;
}
match res {
Ok(offset) => {
committed.push(offset);
}
Err(AppendError::Sealed) => {
println!("append failed({}) => SEALED", i);
break;
}
Err(AppendError::Shutdown(_)) => {
break;
}
Err(e) => fail!("unexpected error: {}", e)?,
Err(AppendError::Shutdown(_)) => {
break;
}
// give a chance to other tasks to work
tokio::task::yield_now().await;
Err(e) => fail!("unexpected error: {}", e)?,
}
Ok(committed)
})
.await
// give a chance to other tasks to work
tokio::task::yield_now().await;
}
Ok(committed)
}
.in_current_tc_as_task(TaskKind::TestRunner, "test-appender")
});
}

let first_observed_seal = tokio::task::spawn({
let loglet = loglet.clone();
async move {
tc.run_in_scope("find-tail", None, async move {
loop {
let res = loglet.find_tail().await.expect("find_tail succeeds");
if res.is_sealed() {
return res.offset();
}
// give a chance to other tasks to work
tokio::task::yield_now().await;
loop {
let res = loglet.find_tail().await.expect("find_tail succeeds");
if res.is_sealed() {
return res.offset();
}
})
.await
// give a chance to other tasks to work
tokio::task::yield_now().await;
}
}
.in_current_tc_as_task(TaskKind::TestRunner, "find-tail")
});

// Wait for some warmup appends
Expand Down Expand Up @@ -621,7 +609,6 @@ pub async fn append_after_seal_concurrent(loglet: Arc<dyn Loglet>) -> googletest

/// Validates that an empty loglet can be sealed
pub async fn seal_empty(loglet: Arc<dyn Loglet>) -> googletest::Result<()> {
setup_panic_handler();
let loglet = LogletWrapper::new(
SegmentIndex::from(1),
Lsn::OLDEST,
Expand Down
Loading

0 comments on commit 25798f9

Please sign in to comment.