diff --git a/src/moonlink/src/storage/mooncake_table.rs b/src/moonlink/src/storage/mooncake_table.rs index b988ce896..fdc917922 100644 --- a/src/moonlink/src/storage/mooncake_table.rs +++ b/src/moonlink/src/storage/mooncake_table.rs @@ -61,11 +61,12 @@ use crate::table_notify::TableEvent; use crate::NonEvictableHandle; use arrow::record_batch::RecordBatch; use arrow_schema::Schema; +use more_asserts as ma; use delete_vector::BatchDeletionVector; pub(crate) use disk_slice::DiskSliceWriter; use mem_slice::MemSlice; pub(crate) use snapshot::SnapshotTableState; -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeSet, BTreeMap, HashMap, HashSet}; use std::path::PathBuf; use std::sync::Arc; use table_snapshot::{IcebergSnapshotImportResult, IcebergSnapshotIndexMergeResult}; @@ -462,7 +463,11 @@ pub struct MooncakeTable { wal_manager: WalManager, /// LSN of ongoing flushes. - pub ongoing_flush_lsns: BTreeSet, + /// Maps from LSN to its count. + pub ongoing_flush_lsns: BTreeMap, + + /// LSN of flush operations, which have finished, but not recorded in mooncake snapshot. + unrecorded_flush_lsns: BTreeSet, /// Table replay sender. event_replay_tx: Option>, @@ -565,7 +570,8 @@ impl MooncakeTable { last_iceberg_snapshot_lsn, table_notify: None, wal_manager, - ongoing_flush_lsns: BTreeSet::new(), + ongoing_flush_lsns: BTreeMap::new(), + unrecorded_flush_lsns: BTreeSet::new(), event_replay_tx: None, }) } @@ -768,8 +774,10 @@ impl MooncakeTable { iceberg_snapshot_res: &IcebergSnapshotResult, ) { if let Some(event_replay_tx) = &self.event_replay_tx { - let table_event = - replay_events::create_iceberg_snapshot_event_completion(iceberg_snapshot_res.uuid); + let table_event = replay_events::create_iceberg_snapshot_event_completion( + iceberg_snapshot_res.uuid, + iceberg_snapshot_res.flush_lsn, + ); event_replay_tx .send(MooncakeTableEvent::IcebergSnapshotCompletion(table_event)) .unwrap(); @@ -847,15 +855,6 @@ impl MooncakeTable { xact_id: Option, event_id: uuid::Uuid, ) { - if let Some(lsn) = disk_slice.lsn() { - self.insert_ongoing_flush_lsn(lsn); - } else { - assert!( - xact_id.is_some(), - "LSN should be none for non streaming flush" - ); - } - let mut disk_slice_clone = disk_slice.clone(); tokio::task::spawn(async move { let flush_result = disk_slice_clone.write().await; @@ -907,39 +906,64 @@ impl MooncakeTable { .lsn() .expect("LSN should never be none for non streaming flush"); self.remove_ongoing_flush_lsn(lsn); + println!("set flush lsn {} at {:?}:{:?}", lsn, file!(), line!()); self.try_set_next_flush_lsn(lsn); self.next_snapshot_task.new_disk_slices.push(disk_slice); } // Attempts to set the flush LSN for the next iceberg snapshot. Note that we can only set the flush LSN if it's less than the current min pending flush LSN. Otherwise, LSNs will be persisted to iceberg in the wrong order. fn try_set_next_flush_lsn(&mut self, lsn: u64) { + println!("completed lsn = {}, min pending lsn = {}, ongoing = {:?}", lsn, self.get_min_ongoing_flush_lsn(), self.ongoing_flush_lsns); + let min_pending_lsn = self.get_min_ongoing_flush_lsn(); if lsn < min_pending_lsn { + if let Some(old_flush_lsn) = self.next_snapshot_task.new_flush_lsn { + ma::assert_le!(old_flush_lsn, lsn); + } self.next_snapshot_task.new_flush_lsn = Some(lsn); } + + // If there're no ongoing flushes, we're free to set flush LSN to the largest of completed ones. + if !self.has_ongoing_flush() && !self.unrecorded_flush_lsns.is_empty() { + let smallest_flush_lsn = self.unrecorded_flush_lsns.first().unwrap(); + ma::assert_le!(self.next_snapshot_task.new_flush_lsn.unwrap(), *smallest_flush_lsn); + let largest_flush_lsn = self.unrecorded_flush_lsns.last().unwrap(); + self.next_snapshot_task.new_flush_lsn = Some(*largest_flush_lsn); + self.unrecorded_flush_lsns.clear(); + } } // We fallback to u64::MAX if there are no pending flush LSNs so that the LSN is always greater than the flush LSN and the iceberg snapshot can proceed. pub fn get_min_ongoing_flush_lsn(&self) -> u64 { - self.ongoing_flush_lsns - .iter() - .next() - .copied() - .unwrap_or(u64::MAX) + if let Some((lsn, _)) = self.ongoing_flush_lsns.first_key_value() { + return *lsn; + } + u64::MAX } - pub fn insert_ongoing_flush_lsn(&mut self, lsn: u64) { - assert!( - self.ongoing_flush_lsns.insert(lsn), - "LSN {lsn} already in pending flush LSNs" - ); + pub fn insert_ongoing_flush_lsn(&mut self, lsn: u64, count: u32) { + *self.ongoing_flush_lsns.entry(lsn).or_insert(0) += count; } pub fn remove_ongoing_flush_lsn(&mut self, lsn: u64) { - assert!( - self.ongoing_flush_lsns.remove(&lsn), - "LSN {lsn} not found in pending flush LSNs" - ); + use std::collections::btree_map::Entry; + + match self.ongoing_flush_lsns.entry(lsn) { + Entry::Occupied(mut entry) => { + let counter = entry.get_mut(); + if *counter > 1 { + *counter -= 1; + } else { + entry.remove(); + } + } + Entry::Vacant(_) => { + panic!("Tried to remove LSN {lsn}, but it is not tracked"); + } + } + + // It's possible to have multiple completed flush operations for the same LSN. + self.unrecorded_flush_lsns.insert(lsn); } pub fn has_ongoing_flush(&self) -> bool { @@ -1205,7 +1229,8 @@ impl MooncakeTable { ); let table_notify_tx = self.table_notify.as_ref().unwrap().clone(); - if self.mem_slice.is_empty() || self.ongoing_flush_lsns.contains(&lsn) { + if self.mem_slice.is_empty() || self.ongoing_flush_lsns.contains_key(&lsn) { + println!("set flush lsn {} at {:?}:{:?}", lsn, file!(), line!()); self.try_set_next_flush_lsn(lsn); tokio::task::spawn(async move { table_notify_tx @@ -1234,6 +1259,7 @@ impl MooncakeTable { } let mut disk_slice = self.prepare_disk_slice(lsn)?; + self.insert_ongoing_flush_lsn(lsn, /*count=*/1); self.flush_disk_slice( &mut disk_slice, table_notify_tx, diff --git a/src/moonlink/src/storage/mooncake_table/replay/replay_events.rs b/src/moonlink/src/storage/mooncake_table/replay/replay_events.rs index 3793efbd1..5955d93ba 100644 --- a/src/moonlink/src/storage/mooncake_table/replay/replay_events.rs +++ b/src/moonlink/src/storage/mooncake_table/replay/replay_events.rs @@ -156,6 +156,8 @@ pub struct IcebergSnapshotEventInitiation { pub struct IcebergSnapshotEventCompletion { /// Event id. pub uuid: uuid::Uuid, + /// Flush LSN. + pub lsn: u64, } /// ===================== @@ -416,8 +418,9 @@ pub fn create_iceberg_snapshot_event_initiation( } pub fn create_iceberg_snapshot_event_completion( uuid: uuid::Uuid, + lsn: u64, ) -> IcebergSnapshotEventCompletion { - IcebergSnapshotEventCompletion { uuid } + IcebergSnapshotEventCompletion { uuid, lsn } } /// Create index merge events. pub fn create_index_merge_event_initiation( diff --git a/src/moonlink/src/storage/mooncake_table/snapshot.rs b/src/moonlink/src/storage/mooncake_table/snapshot.rs index b228ac7f1..f272d7594 100644 --- a/src/moonlink/src/storage/mooncake_table/snapshot.rs +++ b/src/moonlink/src/storage/mooncake_table/snapshot.rs @@ -597,6 +597,11 @@ impl SnapshotTableState { flush_lsn, committed_deletion_logs, )); + + println!("decide to perform an iceberg snapshot, flush lsn = {}, mooncake snapshot lsn = {}", + flush_lsn, + self.current_snapshot.snapshot_version, + ); } } diff --git a/src/moonlink/src/storage/mooncake_table/tests.rs b/src/moonlink/src/storage/mooncake_table/tests.rs index 5ca24042f..e654b4d6c 100644 --- a/src/moonlink/src/storage/mooncake_table/tests.rs +++ b/src/moonlink/src/storage/mooncake_table/tests.rs @@ -1300,9 +1300,9 @@ async fn test_ongoing_flush_lsns_tracking() -> Result<()> { .expect("Disk slice 3 should be present"); // Verify all LSNs are tracked - assert!(table.ongoing_flush_lsns.contains(&5)); - assert!(table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&15)); + assert!(table.ongoing_flush_lsns.contains_key(&5)); + assert!(table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&15)); assert_eq!(table.ongoing_flush_lsns.len(), 3); // Verify min is correctly calculated (should be 5) @@ -1310,15 +1310,15 @@ async fn test_ongoing_flush_lsns_tracking() -> Result<()> { // Complete flush with LSN 10 (out of order completion) table.apply_flush_result(disk_slice_2, uuid::Uuid::new_v4() /*placeholder*/); - assert!(table.ongoing_flush_lsns.contains(&5)); - assert!(!table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&15)); + assert!(table.ongoing_flush_lsns.contains_key(&5)); + assert!(!table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&15)); assert_eq!(table.get_min_ongoing_flush_lsn(), 5); // Still 5 // Complete flush with LSN 5 table.apply_flush_result(disk_slice_1, uuid::Uuid::new_v4() /*placeholder*/); - assert!(!table.ongoing_flush_lsns.contains(&5)); - assert!(table.ongoing_flush_lsns.contains(&15)); + assert!(!table.ongoing_flush_lsns.contains_key(&5)); + assert!(table.ongoing_flush_lsns.contains_key(&15)); assert_eq!(table.get_min_ongoing_flush_lsn(), 15); // Now 15 // Complete last flush @@ -1355,8 +1355,8 @@ async fn test_streaming_flush_lsns_tracking() -> Result<()> { .expect("Disk slice 2 should be present"); // Verify both streaming LSNs are tracked - assert!(table.ongoing_flush_lsns.contains(&100)); - assert!(table.ongoing_flush_lsns.contains(&50)); + assert!(table.ongoing_flush_lsns.contains_key(&100)); + assert!(table.ongoing_flush_lsns.contains_key(&50)); assert_eq!(table.get_min_ongoing_flush_lsn(), 50); // Mix with regular flush (must be higher than previous regular flush) @@ -1367,9 +1367,9 @@ async fn test_streaming_flush_lsns_tracking() -> Result<()> { .expect("Disk slice 3 should be present"); // Verify all three LSNs are tracked - assert!(table.ongoing_flush_lsns.contains(&100)); - assert!(table.ongoing_flush_lsns.contains(&50)); - assert!(table.ongoing_flush_lsns.contains(&75)); + assert!(table.ongoing_flush_lsns.contains_key(&100)); + assert!(table.ongoing_flush_lsns.contains_key(&50)); + assert!(table.ongoing_flush_lsns.contains_key(&75)); assert_eq!(table.get_min_ongoing_flush_lsn(), 50); // Complete streaming flushes @@ -1580,22 +1580,22 @@ async fn test_out_of_order_flush_completion() -> Result<()> { .expect("Disk slice should be present"); // Verify all are pending and min is 10 - assert!(table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&20)); - assert!(table.ongoing_flush_lsns.contains(&30)); + assert!(table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&20)); + assert!(table.ongoing_flush_lsns.contains_key(&30)); assert_eq!(table.get_min_ongoing_flush_lsn(), 10); // Complete flush 30 first (out of order) table.apply_flush_result(disk_slice_30, uuid::Uuid::new_v4() /*placeholder*/); - assert!(!table.ongoing_flush_lsns.contains(&30)); - assert!(table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&20)); + assert!(!table.ongoing_flush_lsns.contains_key(&30)); + assert!(table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&20)); assert_eq!(table.get_min_ongoing_flush_lsn(), 10); // Still 10 // Complete flush 10 (should update min to 20) table.apply_flush_result(disk_slice_10, uuid::Uuid::new_v4() /*placeholder*/); - assert!(!table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&20)); + assert!(!table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&20)); assert_eq!(table.get_min_ongoing_flush_lsn(), 20); // Complete flush 20 (should clear all) @@ -1631,8 +1631,8 @@ async fn test_mixed_regular_and_streaming_lsn_ordering() -> Result<()> { .expect("Streaming disk slice should be present"); // Verify both are tracked and min is 50 - assert!(table.ongoing_flush_lsns.contains(&100)); - assert!(table.ongoing_flush_lsns.contains(&50)); + assert!(table.ongoing_flush_lsns.contains_key(&100)); + assert!(table.ongoing_flush_lsns.contains_key(&50)); assert_eq!(table.get_min_ongoing_flush_lsn(), 50); // According to table handler logic, iceberg snapshots with flush_lsn >= 50 should be blocked @@ -1662,8 +1662,8 @@ async fn test_mixed_regular_and_streaming_lsn_ordering() -> Result<()> { streaming_disk_slice, uuid::Uuid::new_v4(), /*placeholder*/ ); - assert!(!table.ongoing_flush_lsns.contains(&50)); - assert!(table.ongoing_flush_lsns.contains(&100)); + assert!(!table.ongoing_flush_lsns.contains_key(&50)); + assert!(table.ongoing_flush_lsns.contains_key(&100)); assert_eq!(table.get_min_ongoing_flush_lsn(), 100); // Now iceberg snapshots with flush_lsn < 100 should be allowed @@ -1988,7 +1988,7 @@ async fn test_iceberg_snapshot_blocked_by_ongoing_flushes() -> Result<()> { table.commit(2); // Verify we have pending flushes - assert!(table.ongoing_flush_lsns.contains(&30)); + assert!(table.ongoing_flush_lsns.contains_key(&30)); assert_eq!(table.get_min_ongoing_flush_lsn(), 30); // Create a mooncake snapshot - this will create an iceberg payload @@ -2090,9 +2090,9 @@ async fn test_out_of_order_flush_completion_with_iceberg_snapshots() -> Result<( // Verify all pending flushes and min pending LSN assert_eq!(table.get_min_ongoing_flush_lsn(), 10); - assert!(table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&20)); - assert!(table.ongoing_flush_lsns.contains(&30)); + assert!(table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&20)); + assert!(table.ongoing_flush_lsns.contains_key(&30)); // Create snapshot and test constraint with min_ongoing_flush_lsn = 10 let created = table.create_snapshot(SnapshotOption { @@ -2136,16 +2136,16 @@ async fn test_out_of_order_flush_completion_with_iceberg_snapshots() -> Result<( // Complete flushes OUT OF ORDER - complete middle one first (LSN 20) table.apply_flush_result(disk_slice_2, uuid::Uuid::new_v4() /*placeholder*/); assert_eq!(table.get_min_ongoing_flush_lsn(), 10); // Should still be 10 - assert!(table.ongoing_flush_lsns.contains(&10)); - assert!(!table.ongoing_flush_lsns.contains(&20)); - assert!(table.ongoing_flush_lsns.contains(&30)); + assert!(table.ongoing_flush_lsns.contains_key(&10)); + assert!(!table.ongoing_flush_lsns.contains_key(&20)); + assert!(table.ongoing_flush_lsns.contains_key(&30)); // Complete the first flush (LSN 10) - now min should be 30 table.apply_flush_result(disk_slice_1, uuid::Uuid::new_v4() /*placeholder*/); assert_eq!(table.get_min_ongoing_flush_lsn(), 30); // Now should be 30 - assert!(!table.ongoing_flush_lsns.contains(&10)); - assert!(!table.ongoing_flush_lsns.contains(&20)); - assert!(table.ongoing_flush_lsns.contains(&30)); + assert!(!table.ongoing_flush_lsns.contains_key(&10)); + assert!(!table.ongoing_flush_lsns.contains_key(&20)); + assert!(table.ongoing_flush_lsns.contains_key(&30)); // Test constraint logic with new min pending flush LSN = 30 let can_initiate_low = TableHandlerState::can_initiate_iceberg_snapshot( diff --git a/src/moonlink/src/storage/mooncake_table/transaction_stream.rs b/src/moonlink/src/storage/mooncake_table/transaction_stream.rs index 9b07778fb..65acfad91 100644 --- a/src/moonlink/src/storage/mooncake_table/transaction_stream.rs +++ b/src/moonlink/src/storage/mooncake_table/transaction_stream.rs @@ -420,6 +420,7 @@ impl MooncakeTable { // Perform table flush completion notification. if let Some(lsn) = disk_slice.lsn() { self.remove_ongoing_flush_lsn(lsn); + println!("set flush lsn {} at {:?}:{:?}", lsn, file!(), line!()); self.try_set_next_flush_lsn(lsn); } @@ -446,6 +447,7 @@ impl MooncakeTable { let should_remove = stream_state.ongoing_flush_count == 0; let _ = stream_state; + println!("set flush lsn {} at {:?}:{:?}", commit_lsn, file!(), line!()); self.try_set_next_flush_lsn(commit_lsn); self.next_snapshot_task.new_disk_slices.push(disk_slice); if should_remove { @@ -555,6 +557,16 @@ impl MooncakeTable { .unwrap(); } + // Record ongoing flushes. + let ongoing_flush_count = { + let stream_state = self + .transaction_stream_states + .get_mut(&xact_id) + .expect("Stream state not found for xact_id: {xact_id}"); + stream_state.ongoing_flush_count + }; + self.insert_ongoing_flush_lsn(lsn, ongoing_flush_count); + // Perform commit operation. let stream_state = self .transaction_stream_states diff --git a/src/moonlink/src/table_handler/chaos_replay.rs b/src/moonlink/src/table_handler/chaos_replay.rs index 13838c699..a6bf40dff 100644 --- a/src/moonlink/src/table_handler/chaos_replay.rs +++ b/src/moonlink/src/table_handler/chaos_replay.rs @@ -9,6 +9,7 @@ use crate::storage::mooncake_table::snapshot::MooncakeSnapshotOutput; use crate::storage::mooncake_table::DataCompactionResult; use crate::storage::mooncake_table::DiskSliceWriter; use crate::storage::mooncake_table::MooncakeTable; +use crate::storage::mooncake_table::TableMetadata; use crate::storage::mooncake_table::{ table_creation_test_utils::*, FileIndiceMergeResult, IcebergSnapshotResult, }; @@ -16,6 +17,7 @@ use crate::storage::mooncake_table_config::DiskSliceWriterConfig; use crate::table_handler::chaos_table_metadata::ReplayTableMetadata; use crate::table_handler::test_utils::check_read_snapshot; use crate::table_notify::{TableEvent, TableMaintenanceStatus}; +use crate::IcebergTableConfig; use crate::MooncakeTableConfig; use crate::ReadStateManager; use crate::{Result, StorageConfig}; @@ -80,7 +82,7 @@ fn get_id_from_row(row: &MoonlinkRow) -> i32 { async fn create_mooncake_table_for_replay( replay_env: &ReplayEnvironment, lines: &mut tokio::io::Lines>, -) -> MooncakeTable { +) -> (Arc, IcebergTableConfig, MooncakeTable) { let line = lines.next_line().await.unwrap().unwrap(); let replay_table_metadata: ReplayTableMetadata = serde_json::from_str(&line).unwrap(); let local_table_directory = replay_env @@ -128,17 +130,60 @@ async fn create_mooncake_table_for_replay( atomic_write_dir: None, }; let iceberg_table_config = get_iceberg_table_config_with_storage_config(storage_config); - create_mooncake_table( - table_metadata, - iceberg_table_config, + let mooncake_table = create_mooncake_table( + table_metadata.clone(), + iceberg_table_config.clone(), Arc::new(object_storage_cache), ) - .await + .await; + (table_metadata, iceberg_table_config, mooncake_table) +} + +/// Test util function to check whether iceberg snapshot contains expected content. +async fn validate_persisted_iceberg_table( + mooncake_table_metadata: Arc, + iceberg_table_config: IcebergTableConfig, + snapshot_lsn: u64, + expected_ids: Vec, +) { + println!("validate iceberg lsn = {}", snapshot_lsn); + + let (event_sender, _event_receiver) = mpsc::channel(100); + let (replication_lsn_tx, replication_lsn_rx) = watch::channel(0u64); + let (last_commit_lsn_tx, last_commit_lsn_rx) = watch::channel(0u64); + replication_lsn_tx.send(snapshot_lsn).unwrap(); + last_commit_lsn_tx.send(snapshot_lsn).unwrap(); + + // Use a fresh new cache for new iceberg table manager. + let cache_temp_dir = tempdir().unwrap(); + let object_storage_cache = create_test_object_storage_cache(&cache_temp_dir); + + let mut table = create_mooncake_table( + mooncake_table_metadata.clone(), + iceberg_table_config.clone(), + object_storage_cache, + ) + .await; + table.register_table_notify(event_sender).await; + + let read_state_filepath_remap = std::sync::Arc::new(|local_filepath: String| local_filepath); + let read_state_manager = ReadStateManager::new( + &table, + replication_lsn_rx.clone(), + last_commit_lsn_rx, + read_state_filepath_remap, + ); + check_read_snapshot( + &read_state_manager, + Some(snapshot_lsn), + /*expected_ids=*/ &expected_ids, + ) + .await; } pub(crate) async fn replay() { // TODO(hjiang): Take an command line argument. - let replay_filepath = "/tmp/chaos_debug_clone_4848383"; + let replay_filepath = "/tmp/chaos_test_ar6k8n3c7cq6"; let cache_temp_dir = tempdir().unwrap(); let table_temp_dir = tempdir().unwrap(); let iceberg_temp_dir = tempdir().unwrap(); @@ -196,7 +241,8 @@ pub(crate) async fn replay() { let data_files = Arc::new(Mutex::new(HashMap::new())); let data_files_clone = data_files.clone(); - let mut table = create_mooncake_table_for_replay(&replay_env, &mut lines).await; + let (table_metadata, iceberg_table_config, mut table) = + create_mooncake_table_for_replay(&replay_env, &mut lines).await; let (table_event_sender, mut table_event_receiver) = mpsc::channel(100); let (event_replay_sender, _event_replay_receiver) = mpsc::unbounded_channel(); table.register_table_notify(table_event_sender).await; @@ -408,11 +454,14 @@ pub(crate) async fn replay() { let appended = std::mem::take(&mut uncommitted_appended_ids); let deleted = std::mem::take(&mut uncommitted_deleted_ids); { - for cur_append in appended.into_iter() { - assert!(committed_ids.insert(cur_append)); - } + // Consider update case, + // - It's possible to add an existing id. + // - We should apply deletion before addition. for cur_delete in deleted.into_iter() { - assert!(committed_ids.remove(&cur_delete)); + committed_ids.remove(&cur_delete); + } + for cur_append in appended.into_iter() { + committed_ids.insert(cur_append); } } assert!(versioned_committed_ids @@ -560,6 +609,24 @@ pub(crate) async fn replay() { // Otherwise block until the corresponding flush event completes. event_notification_clone.notified().await; } + + // Validate iceberg snapshot. + let commit_lsn = snapshot_completion_event.lsn; + let mut expected_ids = versioned_committed_ids + .get(&commit_lsn) + .as_ref() + .unwrap() + .iter() + .copied() + .collect::>(); + expected_ids.sort(); + validate_persisted_iceberg_table( + table_metadata.clone(), + iceberg_table_config.clone(), + commit_lsn, + expected_ids, + ) + .await; } // ===================== // Index merge events diff --git a/src/moonlink/src/table_handler/chaos_test.rs b/src/moonlink/src/table_handler/chaos_test.rs index b4fb57357..b94e44304 100644 --- a/src/moonlink/src/table_handler/chaos_test.rs +++ b/src/moonlink/src/table_handler/chaos_test.rs @@ -63,10 +63,11 @@ fn parse_chaos_test_args() -> ChaosTestArgs { // Default seed if not provided let seed = seed.unwrap_or_else(|| { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_nanos() as u64 + // SystemTime::now() + // .duration_since(UNIX_EPOCH) + // .unwrap() + // .as_nanos() as u64 + 1756240686726817373 }); ChaosTestArgs {