diff --git a/benchmarks/benchmark/Cargo.toml b/benchmarks/benchmark/Cargo.toml index cd313965..144ce360 100644 --- a/benchmarks/benchmark/Cargo.toml +++ b/benchmarks/benchmark/Cargo.toml @@ -6,7 +6,7 @@ edition = "2024" [dependencies] rand = { workspace = true } clap = { version = "4.5.32", features = ["derive"] } -tidehunter = { version = "0.1.0", path = "../../tidehunter" } +tidehunter = { version = "0.1.0", path = "../../tidehunter", features = ["test-utils"] } parking_lot = { workspace = true } histogram = "0.11.3" prometheus = { workspace = true } diff --git a/benchmarks/benchmark/src/configs.rs b/benchmarks/benchmark/src/configs.rs index 00885e56..670c3613 100644 --- a/benchmarks/benchmark/src/configs.rs +++ b/benchmarks/benchmark/src/configs.rs @@ -177,9 +177,9 @@ pub struct StressClientParameters { /// Relocation configuration. None means disabled, Some(config) enables continuous relocation #[serde(skip_serializing_if = "Option::is_none")] pub relocation: Option, - /// Ratio of writes that overwrite existing keys (0.0 to 1.0, default 0.0) - #[serde(default = "defaults::default_overwrite_ratio")] - pub overwrite_ratio: f64, + /// Ratio of writes that are deletes (0.0 to 1.0, default 0.0) + #[serde(default = "defaults::default_delete_ratio")] + pub delete_ratio: f64, } impl Default for StressClientParameters { @@ -205,7 +205,7 @@ impl Default for StressClientParameters { read_percentage: defaults::default_read_percentage(), zipf_exponent: defaults::default_zipf_exponent(), relocation: None, - overwrite_ratio: defaults::default_overwrite_ratio(), + delete_ratio: defaults::default_delete_ratio(), } } } @@ -282,7 +282,7 @@ pub mod defaults { 0.0 } - pub fn default_overwrite_ratio() -> f64 { + pub fn default_delete_ratio() -> f64 { 0.0 } } @@ -388,11 +388,8 @@ pub struct StressArgs { help = "Relocation strategy (wal or index). Enables continuous relocation" )] relocation: Option, - #[arg( - long, - help = "Ratio of writes that overwrite existing keys (0.0 to 1.0)" - )] - overwrite_ratio: Option, + #[arg(long, help = "Ratio of writes that are deletes (0.0 to 1.0)")] + delete_ratio: Option, } /// Override default arguments with the ones provided by the user @@ -477,12 +474,12 @@ pub fn override_default_args(args: StressArgs, mut config: StressTestConfigs) -> } } } - if let Some(overwrite_ratio) = args.overwrite_ratio { - if !(0.0..=1.0).contains(&overwrite_ratio) { - eprintln!("Error: overwrite_ratio must be between 0.0 and 1.0"); + if let Some(delete_ratio) = args.delete_ratio { + if !(0.0..=1.0).contains(&delete_ratio) { + eprintln!("Error: delete_ratio must be between 0.0 and 1.0"); std::process::exit(1); } - config.stress_client_parameters.overwrite_ratio = overwrite_ratio; + config.stress_client_parameters.delete_ratio = delete_ratio; } config diff --git a/benchmarks/benchmark/src/main.rs b/benchmarks/benchmark/src/main.rs index 86fc1e17..d10800a1 100644 --- a/benchmarks/benchmark/src/main.rs +++ b/benchmarks/benchmark/src/main.rs @@ -101,6 +101,9 @@ pub fn main() { format!("0.0.0.0:{METRICS_PORT}").parse().unwrap(), ®istry, ); + let mut tidehunter_db: Option> = None; + let mut relocation_handle: Option> = None; + let relocation_shutdown = Arc::new(AtomicBool::new(false)); let storage: Arc = match config.stress_client_parameters.backend { Backend::Tidehunter => { if config.db_parameters.direct_io { @@ -142,8 +145,14 @@ pub fn main() { ); let db_clone = storage.db.clone(); let relocation_config = relocation_config.clone(); - thread::spawn(move || { + let shutdown_clone = relocation_shutdown.clone(); + relocation_handle = Some(thread::spawn(move || { loop { + // Check shutdown flag before starting new relocation + if shutdown_clone.load(Ordering::Relaxed) { + break; + } + // Convert RelocationConfig to RelocationStrategy for this iteration let strategy = match &relocation_config { RelocationConfig::Wal => RelocationStrategy::WalBased, @@ -161,12 +170,18 @@ pub fn main() { // Start relocation and let it run to completion db_clone.start_blocking_relocation_with_strategy(strategy); - // Take a 30 second break between relocations - thread::sleep(Duration::from_secs(30)); + // Take a 30 second break between relocations (check shutdown every second) + for _ in 0..30 { + if shutdown_clone.load(Ordering::Relaxed) { + break; + } + thread::sleep(Duration::from_secs(1)); + } } - }); + })); } + tidehunter_db = Some(storage.db.clone()); Arc::new(storage) } Backend::Rocksdb => { @@ -253,6 +268,29 @@ pub fn main() { ops_sec, byte_div(total_bytes / msecs * 1000), ); + // Cooldown phase - wait for background work to finish + if let Some(handle) = relocation_handle { + // Relocation enabled: wait for relocation thread to finish + report!(report, "Waiting for current relocation to complete..."); + relocation_shutdown.store(true, Ordering::Relaxed); + handle.join().expect("Relocation thread panicked"); + report!(report, "Relocation thread finished"); + } else if let Some(ref db) = tidehunter_db { + // Relocation disabled: wait for all pending flushes to complete + report!(report, "Waiting for pending flushes to complete..."); + db.flush_barrier(); + report!(report, "All flushes complete"); + } + + // Measure final storage (for tidehunter backend) + if tidehunter_db.is_some() { + let storage_len = fs_extra::dir::get_size(&path).unwrap(); + report!( + report, + "Storage used after cooldown: {:.1} Gb", + storage_len as f64 / 1024. / 1024. / 1024. + ); + } if print_report { report!(report, "Writing report file"); fs::write("report.txt", &report.lines).unwrap(); @@ -279,6 +317,11 @@ pub fn main() { ) .unwrap(); } + // Dump relocation metrics at the end of benchmark + if let Some(db) = &tidehunter_db { + dump_relocation_metrics(db, &mut report); + } + report!(report, "BENCHMARK_END"); if stress.parameters.preserve { @@ -590,35 +633,48 @@ impl StressThread { self.latency_errors.fetch_add(1, Ordering::Relaxed); } } else { - // Perform a write operation - let should_overwrite = thread_rng.r#gen::() < self.parameters.overwrite_ratio + // Perform a write operation (insert or delete) + let should_delete = thread_rng.r#gen::() < self.parameters.delete_ratio && local_write_pos_counter > 0; - let pos = if should_overwrite { - // Select existing key to overwrite using same logic as reads + if should_delete { + // Select existing key to delete using same logic as reads let highest_local_pos = local_write_pos_counter.saturating_sub(1); - self.select_existing_key(&mut thread_rng, highest_local_pos) + let pos = self.select_existing_key(&mut thread_rng, highest_local_pos); + let key = self.key(pos); + let timer = Instant::now(); + self.db.delete(key.into()); + // Clamp to the histogram's max recordable value (2^LATENCY_HISTOGRAM_MAX_VALUE_POWER) + let latency = timer + .elapsed() + .as_micros() + .min((1u128 << LATENCY_HISTOGRAM_MAX_VALUE_POWER) - 1); + self.benchmark_metrics + .bench_deletes + .with_label_values(&[self.db.name()]) + .observe(latency as f64); + if self.latency.increment(latency as u64).is_err() { + self.latency_errors.fetch_add(1, Ordering::Relaxed); + } } else { - // Create new key (current behavior) + // Create new key and insert let pos = self.global_pos(local_write_pos_counter); local_write_pos_counter += 1; - pos - }; - - let (key, value) = self.key_value(pos); - let timer = Instant::now(); - self.db.insert(key.into(), value.into()); - // Clamp to the histogram's max recordable value (2^LATENCY_HISTOGRAM_MAX_VALUE_POWER) - let latency = timer - .elapsed() - .as_micros() - .min((1u128 << LATENCY_HISTOGRAM_MAX_VALUE_POWER) - 1); - self.benchmark_metrics - .bench_writes - .with_label_values(&[self.db.name()]) - .observe(latency as f64); - if self.latency.increment(latency as u64).is_err() { - self.latency_errors.fetch_add(1, Ordering::Relaxed); + let (key, value) = self.key_value(pos); + let timer = Instant::now(); + self.db.insert(key.into(), value.into()); + // Clamp to the histogram's max recordable value (2^LATENCY_HISTOGRAM_MAX_VALUE_POWER) + let latency = timer + .elapsed() + .as_micros() + .min((1u128 << LATENCY_HISTOGRAM_MAX_VALUE_POWER) - 1); + self.benchmark_metrics + .bench_writes + .with_label_values(&[self.db.name()]) + .observe(latency as f64); + if self.latency.increment(latency as u64).is_err() { + self.latency_errors.fetch_add(1, Ordering::Relaxed); + } } } @@ -627,7 +683,6 @@ impl StressThread { } } - #[allow(dead_code)] fn key(&self, pos: u64) -> Vec { let (key, _) = self.key_and_rng(pos); key @@ -674,3 +729,65 @@ impl StressThread { StdRng::from_seed(seed) } } + +fn dump_relocation_metrics(db: &tidehunter::db::Db, report: &mut Report) { + let metrics = db.test_get_metrics(); + + report!(report, "=== Relocation Metrics ==="); + report!( + report, + "relocation_target_position: {}", + metrics.relocation_target_position.get() + ); + report!( + report, + "relocation_terminal_position: {}", + metrics.relocation_terminal_position.get() + ); + report!( + report, + "gc_position[wal]: {}", + metrics.gc_position.with_label_values(&["wal"]).get() + ); + report!( + report, + "gc_position[index]: {}", + metrics.gc_position.with_label_values(&["index"]).get() + ); + report!( + report, + "wal_deleted_bytes: {}", + metrics.wal_deleted_bytes.get() + ); + report!( + report, + "stale_index_bytes: {}", + metrics.stale_index_bytes.get() + ); + report!( + report, + "relocation_kept[root]: {}", + metrics.relocation_kept.with_label_values(&["root"]).get() + ); + report!( + report, + "relocation_removed[root]: {}", + metrics + .relocation_removed + .with_label_values(&["root"]) + .get() + ); + report!( + report, + "relocation_cells_processed[root]: {}", + metrics + .relocation_cells_processed + .with_label_values(&["root"]) + .get() + ); + report!( + report, + "relocation_current_keyspace: {}", + metrics.relocation_current_keyspace.get() + ); +} diff --git a/benchmarks/benchmark/src/metrics.rs b/benchmarks/benchmark/src/metrics.rs index 8064a351..212c51ce 100644 --- a/benchmarks/benchmark/src/metrics.rs +++ b/benchmarks/benchmark/src/metrics.rs @@ -5,6 +5,7 @@ use std::sync::Arc; pub struct BenchmarkMetrics { pub bench_reads: HistogramVec, pub bench_writes: HistogramVec, + pub bench_deletes: HistogramVec, pub get_lt_result: IntCounterVec, // Parameter gauge metrics @@ -44,6 +45,14 @@ impl BenchmarkMetrics { registry ) .unwrap(), + bench_deletes: prometheus::register_histogram_vec_with_registry!( + "bench_deletes", + "bench_deletes", + &["db"], + latency_buckets.clone(), + registry + ) + .unwrap(), get_lt_result: prometheus::register_int_counter_vec_with_registry!( "get_lt_result", "get_lt_result", diff --git a/benchmarks/benchmark/src/storage/mod.rs b/benchmarks/benchmark/src/storage/mod.rs index a70acd1b..30eda75a 100644 --- a/benchmarks/benchmark/src/storage/mod.rs +++ b/benchmarks/benchmark/src/storage/mod.rs @@ -12,5 +12,7 @@ pub trait Storage: Sync + Send + 'static { fn exists(&self, k: &[u8]) -> bool; + fn delete(&self, k: Bytes); + fn name(&self) -> &'static str; } diff --git a/benchmarks/benchmark/src/storage/rocks.rs b/benchmarks/benchmark/src/storage/rocks.rs index 4b8493e4..440ee60e 100644 --- a/benchmarks/benchmark/src/storage/rocks.rs +++ b/benchmarks/benchmark/src/storage/rocks.rs @@ -155,6 +155,10 @@ impl Storage for Arc { self.db.get(k).unwrap().is_some() } + fn delete(&self, k: Bytes) { + self.db.delete(&k).unwrap() + } + fn name(&self) -> &'static str { match self.mode { RocksMode::Plain => "rocksdb", diff --git a/benchmarks/benchmark/src/storage/tidehunter.rs b/benchmarks/benchmark/src/storage/tidehunter.rs index 5c47a973..0c4d352e 100644 --- a/benchmarks/benchmark/src/storage/tidehunter.rs +++ b/benchmarks/benchmark/src/storage/tidehunter.rs @@ -41,6 +41,10 @@ impl Storage for Arc { self.db.exists(self.ks, k).unwrap() } + fn delete(&self, k: Bytes) { + self.db.remove(self.ks, k).unwrap() + } + fn name(&self) -> &'static str { "tidehunter" } diff --git a/scripts/generate_target_configs/src/main.rs b/scripts/generate_target_configs/src/main.rs index a28a89db..48d7a023 100644 --- a/scripts/generate_target_configs/src/main.rs +++ b/scripts/generate_target_configs/src/main.rs @@ -1,17 +1,20 @@ use anyhow::Result; -use benchmark::configs::{Backend, ReadMode, StressTestConfigs}; +use benchmark::configs::{ReadMode, RelocationConfig, StressTestConfigs}; use std::fs; use std::path::PathBuf; +const ONE_TB: usize = 1024 * 1024 * 1024 * 1024; // 1 TB + fn main() -> Result<()> { // Base config from Tidehunter defaults + benchmark defaults let mut base_item = StressTestConfigs::default(); + // default stress parameters base_item.stress_client_parameters.mixed_threads = 36; base_item.stress_client_parameters.write_threads = 36; - base_item.stress_client_parameters.write_size = 512; + base_item.stress_client_parameters.write_size = 64; base_item.stress_client_parameters.key_len = 32; - base_item.stress_client_parameters.writes = 83_000_000; + base_item.stress_client_parameters.writes = 50_000_000; base_item.stress_client_parameters.mixed_duration_secs = 600; base_item.stress_client_parameters.background_writes = 0; base_item.stress_client_parameters.no_snapshot = false; @@ -22,20 +25,39 @@ fn main() -> Result<()> { base_item.stress_client_parameters.zipf_exponent = 0.0; base_item.stress_client_parameters.path = Some("/opt/sui/db/".to_string()); - // Place all parameters we want to set/vary below, either as single values or in nested for loops - base_item.stress_client_parameters.read_percentage = 100; + // default db parameters + base_item.db_parameters.num_flusher_threads = 12; + base_item.db_parameters.max_dirty_keys = 1024; + base_item.db_parameters.max_maps = 128; + base_item.db_parameters.metrics_enabled = false; base_item.db_parameters.direct_io = false; + base_item.db_parameters.relocation_max_reclaim_pct = 20; // increased from 5% for more aggressive GC + + // Place all parameters we want to set/vary below, either as single values or in nested for loops + base_item.stress_client_parameters.relocation = + Some(RelocationConfig::Index { ratio: Some(0.2) }); + base_item.stress_client_parameters.read_mode = ReadMode::Get; + base_item.stress_client_parameters.write_size = 1024; + base_item.db_parameters.metrics_enabled = true; + let mut items: Vec = Vec::new(); - for backend in [Backend::Tidehunter, Backend::Rocksdb] { - for read_mode in [ReadMode::Get, ReadMode::Exists, ReadMode::Lt(1)] { - for zipf_exponent in [0.0, 2.0] { - let mut item = base_item.clone(); - item.stress_client_parameters.backend = backend.clone(); - item.stress_client_parameters.read_mode = read_mode.clone(); - item.stress_client_parameters.zipf_exponent = zipf_exponent; - let yaml = serde_yaml::to_string(&item)?; - println!("{yaml}"); - items.push(item); + for relocation in [None, Some(RelocationConfig::Index { ratio: Some(1.0) })] { + for delete_ratio in [1.0] { + for read_percentage in [0] { + for zipf_exponent in [0.0, 2.0] { + let mut item = base_item.clone(); + item.stress_client_parameters.relocation = relocation.clone(); + item.stress_client_parameters.delete_ratio = delete_ratio; + item.stress_client_parameters.read_percentage = read_percentage; + item.stress_client_parameters.zipf_exponent = zipf_exponent; + item.stress_client_parameters.writes = ONE_TB + / (item.stress_client_parameters.write_threads + * (item.stress_client_parameters.key_len + + item.stress_client_parameters.write_size)); + let yaml = serde_yaml::to_string(&item)?; + println!("{yaml}"); + items.push(item); + } } } } diff --git a/tidehunter/src/db.rs b/tidehunter/src/db.rs index ce6ed2e7..281d5f06 100644 --- a/tidehunter/src/db.rs +++ b/tidehunter/src/db.rs @@ -872,6 +872,25 @@ impl Db { receiver.recv().unwrap(); } + /// Wait for all pending flush operations to complete. + /// This is useful for ensuring all data is persisted before measuring storage. + pub fn flush_barrier(&self) { + use std::thread; + use std::time::Duration; + + // Poll until flush_pending_count reaches 0 + loop { + let pending = self + .metrics + .flush_pending_count + .load(std::sync::atomic::Ordering::Relaxed); + if pending == 0 { + break; + } + thread::sleep(Duration::from_millis(50)); + } + } + /// Wait for all background threads to finish by polling until no strong references remain. /// This ensures clean shutdown before database restart in tests. #[cfg(any(test, feature = "test-utils"))] diff --git a/tidehunter/src/large_table.rs b/tidehunter/src/large_table.rs index 935e7814..198666f1 100644 --- a/tidehunter/src/large_table.rs +++ b/tidehunter/src/large_table.rs @@ -486,6 +486,7 @@ impl LargeTable { let mutex = context.ks_config.mutex_for_cell(cell); let ks_table = self.ks_rows(&context.ks_config); + let mut wait_iterations = 0u32; loop { let mut row = ks_table.lock(mutex, &context.large_table_contention); @@ -493,6 +494,19 @@ impl LargeTable { && entry.pending_last_processed.is_some() { // Async flush is in progress, wait for it to complete + wait_iterations += 1; + if wait_iterations == 20 { + // Log after 1 second of waiting (20 * 50ms) + eprintln!( + "[relocation] lock_cell_waiting_for_flush: waited 1s for cell flush to complete" + ); + } else if wait_iterations.is_multiple_of(200) { + // Log every 10 seconds thereafter + eprintln!( + "[relocation] lock_cell_waiting_for_flush: waited {}s for cell flush to complete", + wait_iterations as u64 * 50 / 1000 + ); + } drop(row); thread::sleep(Duration::from_millis(50)); continue; @@ -651,6 +665,9 @@ impl LargeTable { > { let mut replay_from: Option = None; let mut max_wal_position: Option = None; + let mut replay_from_bottleneck_cell: Option = None; + let mut replay_from_bottleneck_dirty: bool = false; + let mut replay_from_bottleneck_pending: bool = false; let mut ks_data = Vec::with_capacity(ks_table.rows.mutexes().len()); for mutex in ks_table.rows.mutexes() { let mut row = mutex.lock(); @@ -672,14 +689,27 @@ impl LargeTable { } // Do not use last_processed from empty entries if !entry.state.is_empty() { - replay_from = Some(cmp::min( - replay_from.unwrap_or(u64::MAX), - entry.last_processed.as_u64(), - )); + let lp = entry.last_processed.as_u64(); + if lp < replay_from.unwrap_or(u64::MAX) { + replay_from_bottleneck_cell = Some(entry.cell.clone()); + replay_from_bottleneck_dirty = entry.state.is_dirty(); + replay_from_bottleneck_pending = entry.pending_last_processed.is_some(); + } + replay_from = Some(cmp::min(replay_from.unwrap_or(u64::MAX), lp)); } } ks_data.push(row_data); } + if let Some(rf) = replay_from { + eprintln!( + "[snapshot] ks={} replay_from={} bottleneck_cell={:?} dirty={} pending_flush={}", + ks_table.context.name(), + rf, + replay_from_bottleneck_cell, + replay_from_bottleneck_dirty, + replay_from_bottleneck_pending + ); + } let metric = self .metrics .index_distance_from_tail @@ -1070,13 +1100,23 @@ impl LargeTableEntry { pub fn insert(&mut self, k: Bytes, v: WalPosition) { self.state.mark_dirty(); self.insert_bloom_filter(&k); - self.data.make_mut().insert(k, v); + if let Some(old_position) = self.data.make_mut().insert(k, v) { + self.context + .metrics + .stale_index_bytes + .inc_by(old_position.frame_len_u32() as u64); + } self.report_loaded_keys_count(); } pub fn remove(&mut self, k: Bytes, v: WalPosition) { self.state.mark_dirty(); - self.data.make_mut().remove(k, v); + if let Some(old_position) = self.data.make_mut().remove(k, v) { + self.context + .metrics + .stale_index_bytes + .inc_by(old_position.frame_len_u32() as u64); + } self.report_loaded_keys_count(); } diff --git a/tidehunter/src/metrics.rs b/tidehunter/src/metrics.rs index 295b48a7..249c1604 100644 --- a/tidehunter/src/metrics.rs +++ b/tidehunter/src/metrics.rs @@ -192,6 +192,8 @@ pub struct Metrics { pub relocation_target_position: MetricIntGauge, pub relocation_terminal_position: MetricIntGauge, pub gc_position: MetricIntGaugeVec, + pub wal_deleted_bytes: MetricIntCounter, + pub stale_index_bytes: MetricIntCounter, pub relocation_kept: MetricIntCounterVec, pub relocation_removed: MetricIntCounterVec, @@ -403,6 +405,8 @@ impl Metrics { relocation_target_position: gauge!("relocation_target_position", registry, enabled), relocation_terminal_position: gauge!("relocation_terminal_position", registry, enabled), gc_position: gauge_vec!("gc_position", &["kind"], registry, enabled), + wal_deleted_bytes: counter!("wal_deleted_bytes", registry, enabled), + stale_index_bytes: counter!("stale_index_bytes", registry, enabled), relocation_kept: counter_vec!("relocation_kept", &["ks"], registry, enabled), relocation_removed: counter_vec!("relocation_removed", &["ks"], registry, enabled), diff --git a/tidehunter/src/relocation/mod.rs b/tidehunter/src/relocation/mod.rs index beb78103..daba9f0e 100644 --- a/tidehunter/src/relocation/mod.rs +++ b/tidehunter/src/relocation/mod.rs @@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize}; use std::path::PathBuf; use std::sync::{Arc, Weak, mpsc}; use std::thread::JoinHandle; +use std::time::{Duration, Instant}; mod cell_reference; mod watermark; @@ -100,6 +101,9 @@ struct CellProcessingContext { highest_wal_position: WalPosition, entries_removed: u64, entries_kept: u64, + entries_skipped: u64, + index_load_time: Duration, + entry_read_time: Duration, } impl CellProcessingContext { @@ -109,6 +113,9 @@ impl CellProcessingContext { highest_wal_position: WalPosition::new(0, 0), entries_removed: 0, entries_kept: 0, + entries_skipped: 0, + index_load_time: Duration::ZERO, + entry_read_time: Duration::ZERO, } } @@ -186,9 +193,12 @@ impl RelocationDriver { return Ok(()); } - let gc_watermark = std::cmp::min( - watermarks.gc_watermark(), - db.control_region_store.lock().last_position(), + let wm_gc = watermarks.gc_watermark(); + let cr_pos = db.control_region_store.lock().last_position(); + let gc_watermark = std::cmp::min(wm_gc, cr_pos); + eprintln!( + "[relocation] GC watermark components: watermarks_gc={}, control_region={}, result={}", + wm_gc, cr_pos, gc_watermark ); db.wal_writer.gc(gc_watermark)?; @@ -226,6 +236,16 @@ impl RelocationDriver { let should_restart = watermarks.data.next_to_process.is_none() || watermarks.data.target_position != target_position; + eprintln!( + "[relocation] Starting index-based relocation: upper_limit={}, effective_limit={}, target_position={:?}, should_restart={}, min_wal_position={}", + upper_limit, + effective_limit, + target_position, + should_restart, + db.wal.min_wal_position() + ); + let iteration_start = std::time::Instant::now(); + // Get starting cell reference from saved progress or restart let mut current_cell_ref = if should_restart { CellReference::first(&db, KeySpace::first()) @@ -233,26 +253,57 @@ impl RelocationDriver { watermarks.data.next_to_process.clone() }; - let mut cells_processed = 0; + let mut cells_processed = 0u64; let mut highest_wal_position = 0u64; let mut current_ks_id = None; + // Phase timing accumulators for periodic progress logging + let mut phase_a_total = Duration::ZERO; // index loading + let mut phase_b_total = Duration::ZERO; // entry iteration + WAL reads + let mut phase_c_total = Duration::ZERO; // write_relocated_batch (WAL write + flush) + let mut total_entries_kept = 0u64; + let mut total_entries_removed = 0u64; + let mut total_entries_skipped = 0u64; + let mut batch_log_start = Instant::now(); + while let Some(cell_ref) = current_cell_ref.take() { // Check for cancellation periodically - if cells_processed % Self::NUM_ITERATIONS_IN_BATCH == 0 { + if cells_processed.is_multiple_of(Self::NUM_ITERATIONS_IN_BATCH as u64) { if self.should_cancel_relocation() { break; } + // Log progress every NUM_ITERATIONS_IN_BATCH cells + if cells_processed > 0 { + let batch_elapsed = batch_log_start.elapsed(); + eprintln!( + "[relocation] Progress: cells={}, entries_kept={}, entries_removed={}, entries_skipped={}, \ + phase_a(index_load)={:.1}s, phase_b(entry_read)={:.1}s, phase_c(write+flush)={:.1}s, \ + batch_wall={:.1}s, total_elapsed={:.0}s", + cells_processed, + total_entries_kept, + total_entries_removed, + total_entries_skipped, + phase_a_total.as_secs_f64(), + phase_b_total.as_secs_f64(), + phase_c_total.as_secs_f64(), + batch_elapsed.as_secs_f64(), + iteration_start.elapsed().as_secs_f64(), + ); + batch_log_start = Instant::now(); + } + // Save progress periodically - if cells_processed % Self::NUM_ITERATIONS_TILL_SAVE == 0 { + if cells_processed.is_multiple_of(Self::NUM_ITERATIONS_TILL_SAVE as u64) + && cells_processed > 0 + { watermarks.set( Some(cell_ref.clone()), highest_wal_position, upper_limit, target_position, ); - self.save_progress(&db, &watermarks, true)?; - // Save watermark only + self.save_progress(&db, &watermarks, false)?; + // Save progress and run gc() } } @@ -266,6 +317,13 @@ impl RelocationDriver { // Process each cell let context = self.process_single_cell(&cell_ref, &db, effective_limit)?; + // Accumulate per-cell timing + phase_a_total += context.index_load_time; + phase_b_total += context.entry_read_time; + total_entries_kept += context.entries_kept; + total_entries_removed += context.entries_removed; + total_entries_skipped += context.entries_skipped; + // Track the highest WAL position seen if context.highest_wal_position.offset() > highest_wal_position { highest_wal_position = context.highest_wal_position.offset(); @@ -274,7 +332,9 @@ impl RelocationDriver { // Relocate entries if any were marked for keeping let keyspace_desc = &db.ks_context(cell_ref.keyspace).ks_config; if !context.batch.is_empty() { + let phase_c_start = Instant::now(); let successful = self.relocate_entries(context.batch, &db)?; + phase_c_total += phase_c_start.elapsed(); // Track successful relocations with existing metrics (same as WAL-based) self.metrics .relocation_kept @@ -295,6 +355,24 @@ impl RelocationDriver { } // Save final progress with upper_limit and highest WAL position + let total_elapsed = iteration_start.elapsed(); + eprintln!( + "[relocation] Completed index-based relocation: cells_processed={}, highest_wal_position={}, \ + upper_limit={}, effective_limit={}, elapsed_secs={}, \ + phase_a(index_load)={:.1}s, phase_b(entry_read)={:.1}s, phase_c(write+flush)={:.1}s, \ + entries_kept={}, entries_removed={}, entries_skipped={}", + cells_processed, + highest_wal_position, + upper_limit, + effective_limit, + total_elapsed.as_secs(), + phase_a_total.as_secs_f64(), + phase_b_total.as_secs_f64(), + phase_c_total.as_secs_f64(), + total_entries_kept, + total_entries_removed, + total_entries_skipped, + ); watermarks.set( current_cell_ref.clone(), highest_wal_position, @@ -426,6 +504,7 @@ impl RelocationDriver { let mut removed_count = 0; // Phase A: Get shared reference to cell index + let phase_a_start = Instant::now(); let index = match db.large_table.get_index_for_cell( db.ks_context(cell_ref.keyspace), &cell_ref.cell_id, @@ -433,61 +512,73 @@ impl RelocationDriver { )? { Some(index) => index, None => { + context.index_load_time = phase_a_start.elapsed(); // Cell doesn't exist or is empty return Ok(context); } }; + context.index_load_time = phase_a_start.elapsed(); - // Phase B: Read values from WAL and make decisions (no lock held, efficient iteration) - // TODO(#74): Optimization needed - add support for making relocation decisions without loading values - // This would be beneficial in two scenarios: - // 1. Applications without pruner callbacks - relocation just moves non-deleted entries - // 2. Applications that can make decisions based on key only, without needing the value - // - // This strategy is most useful for applications that don't need values for decision making. - // For applications like Sui that require values (similar to current behavior), - // WAL-based relocation remains the better choice. - // - // Implementation could involve: - // - Optional key-only decision callback in RelocationFilter trait - // - Skip WAL value reads when key-only decisions are possible - // - Fall back to current value-based approach when needed + // Phase B: Collect entries, sort by WAL position for sequential I/O, then read values. + // Sorting by position turns random mmap reads into sequential access, enabling OS readahead. + // TODO(#74): Further optimization possible - add key-only decision callback to + // RelocationFilter trait to skip WAL reads for entries decidable by key alone. + let phase_b_start = Instant::now(); let keyspace_desc = &db.ks_context(cell_ref.keyspace).ks_config; + + // Step 1: Collect eligible (key, position) pairs from the index + let mut entries: Vec<(Bytes, WalPosition)> = Vec::new(); for (key, position) in index.iter() { if position.offset() >= effective_limit { + context.entries_skipped += 1; continue; } + entries.push((key.clone(), position)); + } - // Read the actual value from WAL - let value = match db.read_record(position)? { - Some((_, val)) => val, - None => { - // Entry might have been deleted or corrupted, skip it - context.mark_entry_removed(position); - removed_count += 1; - continue; - } - }; - - // Simplified decision logic for index-based relocation. Since we're iterating through - // current index entries, we only need to check the relocation filter - let decision = keyspace_desc - .relocation_filter() - .map_or(Decision::Keep, |filter| filter(key, &value)); - - match decision { - Decision::Keep => { - context.add_entry_to_relocate(key.clone(), value, position); - } - Decision::Remove => { - context.mark_entry_removed(position); - removed_count += 1; - } - Decision::StopRelocation => { - break; + // Step 2: Sort by WAL position offset for sequential I/O + entries.sort_unstable_by_key(|(_key, pos)| pos.offset()); + + // Step 3: Read in position order, with or without filter + if let Some(filter) = keyspace_desc.relocation_filter() { + for (key, position) in &entries { + let value = match db.read_record(*position)? { + Some((_, val)) => val, + None => { + context.mark_entry_removed(*position); + removed_count += 1; + continue; + } + }; + + match filter(key, &value) { + Decision::Keep => { + context.add_entry_to_relocate(key.clone(), value, *position); + } + Decision::Remove => { + context.mark_entry_removed(*position); + removed_count += 1; + } + Decision::StopRelocation => { + break; + } } } + } else { + // No filter: all entries are unconditionally kept + for (key, position) in &entries { + let value = match db.read_record(*position)? { + Some((_, val)) => val, + None => { + context.mark_entry_removed(*position); + removed_count += 1; + continue; + } + }; + context.add_entry_to_relocate(key.clone(), value, *position); + } } + context.entry_read_time = phase_b_start.elapsed(); // Track removed entries with existing metrics (same as WAL-based) if removed_count > 0 { diff --git a/tidehunter/src/wal/mapper.rs b/tidehunter/src/wal/mapper.rs index 209bb9c4..5ed4e690 100644 --- a/tidehunter/src/wal/mapper.rs +++ b/tidehunter/src/wal/mapper.rs @@ -175,6 +175,10 @@ impl WalMapperThread { } // Update the WalFiles structure and maps by removing deleted files if num_files_deleted > 0 { + // Track bytes deleted + let bytes_deleted = (num_files_deleted as u64) * self.layout.wal_file_size; + self.metrics.wal_deleted_bytes.inc_by(bytes_deleted); + let new_files = wal_files.skip_first_n_files(num_files_deleted); let new_min_file_id = new_files.min_file_id; self.files.store(Arc::new(new_files));