Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2024"
[dependencies]
rand = { workspace = true }
clap = { version = "4.5.32", features = ["derive"] }
tidehunter = { version = "0.1.0", path = "../../tidehunter" }
tidehunter = { version = "0.1.0", path = "../../tidehunter", features = ["test-utils"] }
parking_lot = { workspace = true }
histogram = "0.11.3"
prometheus = { workspace = true }
Expand Down
25 changes: 11 additions & 14 deletions benchmarks/benchmark/src/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ pub struct StressClientParameters {
/// Relocation configuration. None means disabled, Some(config) enables continuous relocation
#[serde(skip_serializing_if = "Option::is_none")]
pub relocation: Option<RelocationConfig>,
/// Ratio of writes that overwrite existing keys (0.0 to 1.0, default 0.0)
#[serde(default = "defaults::default_overwrite_ratio")]
pub overwrite_ratio: f64,
/// Ratio of writes that are deletes (0.0 to 1.0, default 0.0)
#[serde(default = "defaults::default_delete_ratio")]
pub delete_ratio: f64,
}

impl Default for StressClientParameters {
Expand All @@ -205,7 +205,7 @@ impl Default for StressClientParameters {
read_percentage: defaults::default_read_percentage(),
zipf_exponent: defaults::default_zipf_exponent(),
relocation: None,
overwrite_ratio: defaults::default_overwrite_ratio(),
delete_ratio: defaults::default_delete_ratio(),
}
}
}
Expand Down Expand Up @@ -282,7 +282,7 @@ pub mod defaults {
0.0
}

pub fn default_overwrite_ratio() -> f64 {
pub fn default_delete_ratio() -> f64 {
0.0
}
}
Expand Down Expand Up @@ -388,11 +388,8 @@ pub struct StressArgs {
help = "Relocation strategy (wal or index). Enables continuous relocation"
)]
relocation: Option<String>,
#[arg(
long,
help = "Ratio of writes that overwrite existing keys (0.0 to 1.0)"
)]
overwrite_ratio: Option<f64>,
#[arg(long, help = "Ratio of writes that are deletes (0.0 to 1.0)")]
delete_ratio: Option<f64>,
}

/// Override default arguments with the ones provided by the user
Expand Down Expand Up @@ -477,12 +474,12 @@ pub fn override_default_args(args: StressArgs, mut config: StressTestConfigs) ->
}
}
}
if let Some(overwrite_ratio) = args.overwrite_ratio {
if !(0.0..=1.0).contains(&overwrite_ratio) {
eprintln!("Error: overwrite_ratio must be between 0.0 and 1.0");
if let Some(delete_ratio) = args.delete_ratio {
if !(0.0..=1.0).contains(&delete_ratio) {
eprintln!("Error: delete_ratio must be between 0.0 and 1.0");
std::process::exit(1);
}
config.stress_client_parameters.overwrite_ratio = overwrite_ratio;
config.stress_client_parameters.delete_ratio = delete_ratio;
}

config
Expand Down
173 changes: 145 additions & 28 deletions benchmarks/benchmark/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ pub fn main() {
format!("0.0.0.0:{METRICS_PORT}").parse().unwrap(),
&registry,
);
let mut tidehunter_db: Option<Arc<tidehunter::db::Db>> = None;
let mut relocation_handle: Option<JoinHandle<()>> = None;
let relocation_shutdown = Arc::new(AtomicBool::new(false));
let storage: Arc<dyn Storage> = match config.stress_client_parameters.backend {
Backend::Tidehunter => {
if config.db_parameters.direct_io {
Expand Down Expand Up @@ -142,8 +145,14 @@ pub fn main() {
);
let db_clone = storage.db.clone();
let relocation_config = relocation_config.clone();
thread::spawn(move || {
let shutdown_clone = relocation_shutdown.clone();
relocation_handle = Some(thread::spawn(move || {
loop {
// Check shutdown flag before starting new relocation
if shutdown_clone.load(Ordering::Relaxed) {
break;
}

// Convert RelocationConfig to RelocationStrategy for this iteration
let strategy = match &relocation_config {
RelocationConfig::Wal => RelocationStrategy::WalBased,
Expand All @@ -161,12 +170,18 @@ pub fn main() {
// Start relocation and let it run to completion
db_clone.start_blocking_relocation_with_strategy(strategy);

// Take a 30 second break between relocations
thread::sleep(Duration::from_secs(30));
// Take a 30 second break between relocations (check shutdown every second)
for _ in 0..30 {
if shutdown_clone.load(Ordering::Relaxed) {
break;
}
thread::sleep(Duration::from_secs(1));
}
}
});
}));
}

tidehunter_db = Some(storage.db.clone());
Arc::new(storage)
}
Backend::Rocksdb => {
Expand Down Expand Up @@ -253,6 +268,29 @@ pub fn main() {
ops_sec,
byte_div(total_bytes / msecs * 1000),
);
// Cooldown phase - wait for background work to finish
if let Some(handle) = relocation_handle {
// Relocation enabled: wait for relocation thread to finish
report!(report, "Waiting for current relocation to complete...");
relocation_shutdown.store(true, Ordering::Relaxed);
handle.join().expect("Relocation thread panicked");
report!(report, "Relocation thread finished");
} else if let Some(ref db) = tidehunter_db {
// Relocation disabled: wait for all pending flushes to complete
report!(report, "Waiting for pending flushes to complete...");
db.flush_barrier();
report!(report, "All flushes complete");
}

// Measure final storage (for tidehunter backend)
if tidehunter_db.is_some() {
let storage_len = fs_extra::dir::get_size(&path).unwrap();
report!(
report,
"Storage used after cooldown: {:.1} Gb",
storage_len as f64 / 1024. / 1024. / 1024.
);
}
if print_report {
report!(report, "Writing report file");
fs::write("report.txt", &report.lines).unwrap();
Expand All @@ -279,6 +317,11 @@ pub fn main() {
)
.unwrap();
}
// Dump relocation metrics at the end of benchmark
if let Some(db) = &tidehunter_db {
dump_relocation_metrics(db, &mut report);
}

report!(report, "BENCHMARK_END");

if stress.parameters.preserve {
Expand Down Expand Up @@ -590,35 +633,48 @@ impl StressThread {
self.latency_errors.fetch_add(1, Ordering::Relaxed);
}
} else {
// Perform a write operation
let should_overwrite = thread_rng.r#gen::<f64>() < self.parameters.overwrite_ratio
// Perform a write operation (insert or delete)
let should_delete = thread_rng.r#gen::<f64>() < self.parameters.delete_ratio
&& local_write_pos_counter > 0;

let pos = if should_overwrite {
// Select existing key to overwrite using same logic as reads
if should_delete {
// Select existing key to delete using same logic as reads
let highest_local_pos = local_write_pos_counter.saturating_sub(1);
self.select_existing_key(&mut thread_rng, highest_local_pos)
let pos = self.select_existing_key(&mut thread_rng, highest_local_pos);
let key = self.key(pos);
let timer = Instant::now();
self.db.delete(key.into());
// Clamp to the histogram's max recordable value (2^LATENCY_HISTOGRAM_MAX_VALUE_POWER)
let latency = timer
.elapsed()
.as_micros()
.min((1u128 << LATENCY_HISTOGRAM_MAX_VALUE_POWER) - 1);
self.benchmark_metrics
.bench_deletes
.with_label_values(&[self.db.name()])
.observe(latency as f64);
if self.latency.increment(latency as u64).is_err() {
self.latency_errors.fetch_add(1, Ordering::Relaxed);
}
} else {
// Create new key (current behavior)
// Create new key and insert
let pos = self.global_pos(local_write_pos_counter);
local_write_pos_counter += 1;
pos
};

let (key, value) = self.key_value(pos);
let timer = Instant::now();
self.db.insert(key.into(), value.into());
// Clamp to the histogram's max recordable value (2^LATENCY_HISTOGRAM_MAX_VALUE_POWER)
let latency = timer
.elapsed()
.as_micros()
.min((1u128 << LATENCY_HISTOGRAM_MAX_VALUE_POWER) - 1);
self.benchmark_metrics
.bench_writes
.with_label_values(&[self.db.name()])
.observe(latency as f64);
if self.latency.increment(latency as u64).is_err() {
self.latency_errors.fetch_add(1, Ordering::Relaxed);
let (key, value) = self.key_value(pos);
let timer = Instant::now();
self.db.insert(key.into(), value.into());
// Clamp to the histogram's max recordable value (2^LATENCY_HISTOGRAM_MAX_VALUE_POWER)
let latency = timer
.elapsed()
.as_micros()
.min((1u128 << LATENCY_HISTOGRAM_MAX_VALUE_POWER) - 1);
self.benchmark_metrics
.bench_writes
.with_label_values(&[self.db.name()])
.observe(latency as f64);
if self.latency.increment(latency as u64).is_err() {
self.latency_errors.fetch_add(1, Ordering::Relaxed);
}
}
}

Expand All @@ -627,7 +683,6 @@ impl StressThread {
}
}

#[allow(dead_code)]
fn key(&self, pos: u64) -> Vec<u8> {
let (key, _) = self.key_and_rng(pos);
key
Expand Down Expand Up @@ -674,3 +729,65 @@ impl StressThread {
StdRng::from_seed(seed)
}
}

fn dump_relocation_metrics(db: &tidehunter::db::Db, report: &mut Report) {
let metrics = db.test_get_metrics();

report!(report, "=== Relocation Metrics ===");
report!(
report,
"relocation_target_position: {}",
metrics.relocation_target_position.get()
);
report!(
report,
"relocation_terminal_position: {}",
metrics.relocation_terminal_position.get()
);
report!(
report,
"gc_position[wal]: {}",
metrics.gc_position.with_label_values(&["wal"]).get()
);
report!(
report,
"gc_position[index]: {}",
metrics.gc_position.with_label_values(&["index"]).get()
);
report!(
report,
"wal_deleted_bytes: {}",
metrics.wal_deleted_bytes.get()
);
report!(
report,
"stale_index_bytes: {}",
metrics.stale_index_bytes.get()
);
report!(
report,
"relocation_kept[root]: {}",
metrics.relocation_kept.with_label_values(&["root"]).get()
);
report!(
report,
"relocation_removed[root]: {}",
metrics
.relocation_removed
.with_label_values(&["root"])
.get()
);
report!(
report,
"relocation_cells_processed[root]: {}",
metrics
.relocation_cells_processed
.with_label_values(&["root"])
.get()
);
report!(
report,
"relocation_current_keyspace: {}",
metrics.relocation_current_keyspace.get()
);
}
9 changes: 9 additions & 0 deletions benchmarks/benchmark/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
pub struct BenchmarkMetrics {
pub bench_reads: HistogramVec,
pub bench_writes: HistogramVec,
pub bench_deletes: HistogramVec,
pub get_lt_result: IntCounterVec,

// Parameter gauge metrics
Expand Down Expand Up @@ -44,6 +45,14 @@ impl BenchmarkMetrics {
registry
)
.unwrap(),
bench_deletes: prometheus::register_histogram_vec_with_registry!(
"bench_deletes",
"bench_deletes",
&["db"],
latency_buckets.clone(),
registry
)
.unwrap(),
get_lt_result: prometheus::register_int_counter_vec_with_registry!(
"get_lt_result",
"get_lt_result",
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/benchmark/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@ pub trait Storage: Sync + Send + 'static {

fn exists(&self, k: &[u8]) -> bool;

fn delete(&self, k: Bytes);

fn name(&self) -> &'static str;
}
4 changes: 4 additions & 0 deletions benchmarks/benchmark/src/storage/rocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ impl Storage for Arc<RocksStorage> {
self.db.get(k).unwrap().is_some()
}

fn delete(&self, k: Bytes) {
self.db.delete(&k).unwrap()
}

fn name(&self) -> &'static str {
match self.mode {
RocksMode::Plain => "rocksdb",
Expand Down
4 changes: 4 additions & 0 deletions benchmarks/benchmark/src/storage/tidehunter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ impl Storage for Arc<TidehunterStorage> {
self.db.exists(self.ks, k).unwrap()
}

fn delete(&self, k: Bytes) {
self.db.remove(self.ks, k).unwrap()
}

fn name(&self) -> &'static str {
"tidehunter"
}
Expand Down
Loading
Loading