Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 55 additions & 29 deletions src/moonlink/src/storage/mooncake_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ use crate::table_notify::TableEvent;
use crate::NonEvictableHandle;
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use more_asserts as ma;
use delete_vector::BatchDeletionVector;
pub(crate) use disk_slice::DiskSliceWriter;
use mem_slice::MemSlice;
pub(crate) use snapshot::SnapshotTableState;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{BTreeSet, BTreeMap, HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;
use table_snapshot::{IcebergSnapshotImportResult, IcebergSnapshotIndexMergeResult};
Expand Down Expand Up @@ -462,7 +463,11 @@ pub struct MooncakeTable {
wal_manager: WalManager,

/// LSN of ongoing flushes.
pub ongoing_flush_lsns: BTreeSet<u64>,
/// Maps from LSN to its count.
pub ongoing_flush_lsns: BTreeMap<u64, u32>,

/// LSN of flush operations, which have finished, but not recorded in mooncake snapshot.
unrecorded_flush_lsns: BTreeSet<u64>,

/// Table replay sender.
event_replay_tx: Option<mpsc::UnboundedSender<MooncakeTableEvent>>,
Expand Down Expand Up @@ -565,7 +570,8 @@ impl MooncakeTable {
last_iceberg_snapshot_lsn,
table_notify: None,
wal_manager,
ongoing_flush_lsns: BTreeSet::new(),
ongoing_flush_lsns: BTreeMap::new(),
unrecorded_flush_lsns: BTreeSet::new(),
event_replay_tx: None,
})
}
Expand Down Expand Up @@ -768,8 +774,10 @@ impl MooncakeTable {
iceberg_snapshot_res: &IcebergSnapshotResult,
) {
if let Some(event_replay_tx) = &self.event_replay_tx {
let table_event =
replay_events::create_iceberg_snapshot_event_completion(iceberg_snapshot_res.uuid);
let table_event = replay_events::create_iceberg_snapshot_event_completion(
iceberg_snapshot_res.uuid,
iceberg_snapshot_res.flush_lsn,
);
event_replay_tx
.send(MooncakeTableEvent::IcebergSnapshotCompletion(table_event))
.unwrap();
Expand Down Expand Up @@ -847,15 +855,6 @@ impl MooncakeTable {
xact_id: Option<u32>,
event_id: uuid::Uuid,
) {
if let Some(lsn) = disk_slice.lsn() {
self.insert_ongoing_flush_lsn(lsn);
} else {
assert!(
xact_id.is_some(),
"LSN should be none for non streaming flush"
);
}

let mut disk_slice_clone = disk_slice.clone();
tokio::task::spawn(async move {
let flush_result = disk_slice_clone.write().await;
Expand Down Expand Up @@ -907,39 +906,64 @@ impl MooncakeTable {
.lsn()
.expect("LSN should never be none for non streaming flush");
self.remove_ongoing_flush_lsn(lsn);
println!("set flush lsn {} at {:?}:{:?}", lsn, file!(), line!());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This println! statement and others added in this PR (e.g., line 916, 1233, and in files snapshot.rs, transaction_stream.rs, chaos_replay.rs) appear to be for debugging. They should be removed before merging to keep the codebase clean.

self.try_set_next_flush_lsn(lsn);
self.next_snapshot_task.new_disk_slices.push(disk_slice);
}

// Attempts to set the flush LSN for the next iceberg snapshot. Note that we can only set the flush LSN if it's less than the current min pending flush LSN. Otherwise, LSNs will be persisted to iceberg in the wrong order.
fn try_set_next_flush_lsn(&mut self, lsn: u64) {
println!("completed lsn = {}, min pending lsn = {}, ongoing = {:?}", lsn, self.get_min_ongoing_flush_lsn(), self.ongoing_flush_lsns);

let min_pending_lsn = self.get_min_ongoing_flush_lsn();
if lsn < min_pending_lsn {
if let Some(old_flush_lsn) = self.next_snapshot_task.new_flush_lsn {
ma::assert_le!(old_flush_lsn, lsn);
}
self.next_snapshot_task.new_flush_lsn = Some(lsn);
}

// If there're no ongoing flushes, we're free to set flush LSN to the largest of completed ones.
if !self.has_ongoing_flush() && !self.unrecorded_flush_lsns.is_empty() {
let smallest_flush_lsn = self.unrecorded_flush_lsns.first().unwrap();
ma::assert_le!(self.next_snapshot_task.new_flush_lsn.unwrap(), *smallest_flush_lsn);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This assertion is incorrect and will cause a panic in scenarios with out-of-order flush completions.

For example, if flushes for LSNs {10, 20, 30} are processed and complete in the order {20, 10, 30}, new_flush_lsn will be Some(30) when try_set_next_flush_lsn(30) is called. At this point, smallest_flush_lsn will be 10, causing assert_le!(30, 10) to fail.

The logic to set new_flush_lsn to the largest completed LSN seems correct, but this assertion is flawed and should be removed.

let largest_flush_lsn = self.unrecorded_flush_lsns.last().unwrap();
self.next_snapshot_task.new_flush_lsn = Some(*largest_flush_lsn);
self.unrecorded_flush_lsns.clear();
}
}

// We fallback to u64::MAX if there are no pending flush LSNs so that the LSN is always greater than the flush LSN and the iceberg snapshot can proceed.
pub fn get_min_ongoing_flush_lsn(&self) -> u64 {
self.ongoing_flush_lsns
.iter()
.next()
.copied()
.unwrap_or(u64::MAX)
if let Some((lsn, _)) = self.ongoing_flush_lsns.first_key_value() {
return *lsn;
}
u64::MAX
}

pub fn insert_ongoing_flush_lsn(&mut self, lsn: u64) {
assert!(
self.ongoing_flush_lsns.insert(lsn),
"LSN {lsn} already in pending flush LSNs"
);
pub fn insert_ongoing_flush_lsn(&mut self, lsn: u64, count: u32) {
*self.ongoing_flush_lsns.entry(lsn).or_insert(0) += count;
}

pub fn remove_ongoing_flush_lsn(&mut self, lsn: u64) {
assert!(
self.ongoing_flush_lsns.remove(&lsn),
"LSN {lsn} not found in pending flush LSNs"
);
use std::collections::btree_map::Entry;

match self.ongoing_flush_lsns.entry(lsn) {
Entry::Occupied(mut entry) => {
let counter = entry.get_mut();
if *counter > 1 {
*counter -= 1;
} else {
entry.remove();
}
}
Entry::Vacant(_) => {
panic!("Tried to remove LSN {lsn}, but it is not tracked");
}
}

// It's possible to have multiple completed flush operations for the same LSN.
self.unrecorded_flush_lsns.insert(lsn);
}

pub fn has_ongoing_flush(&self) -> bool {
Expand Down Expand Up @@ -1205,7 +1229,8 @@ impl MooncakeTable {
);

let table_notify_tx = self.table_notify.as_ref().unwrap().clone();
if self.mem_slice.is_empty() || self.ongoing_flush_lsns.contains(&lsn) {
if self.mem_slice.is_empty() || self.ongoing_flush_lsns.contains_key(&lsn) {
println!("set flush lsn {} at {:?}:{:?}", lsn, file!(), line!());
self.try_set_next_flush_lsn(lsn);
tokio::task::spawn(async move {
table_notify_tx
Expand Down Expand Up @@ -1234,6 +1259,7 @@ impl MooncakeTable {
}

let mut disk_slice = self.prepare_disk_slice(lsn)?;
self.insert_ongoing_flush_lsn(lsn, /*count=*/1);
self.flush_disk_slice(
&mut disk_slice,
table_notify_tx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ pub struct IcebergSnapshotEventInitiation {
pub struct IcebergSnapshotEventCompletion {
/// Event id.
pub uuid: uuid::Uuid,
/// Flush LSN.
pub lsn: u64,
}

/// =====================
Expand Down Expand Up @@ -416,8 +418,9 @@ pub fn create_iceberg_snapshot_event_initiation(
}
pub fn create_iceberg_snapshot_event_completion(
uuid: uuid::Uuid,
lsn: u64,
) -> IcebergSnapshotEventCompletion {
IcebergSnapshotEventCompletion { uuid }
IcebergSnapshotEventCompletion { uuid, lsn }
}
/// Create index merge events.
pub fn create_index_merge_event_initiation(
Expand Down
5 changes: 5 additions & 0 deletions src/moonlink/src/storage/mooncake_table/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,11 @@ impl SnapshotTableState {
flush_lsn,
committed_deletion_logs,
));

println!("decide to perform an iceberg snapshot, flush lsn = {}, mooncake snapshot lsn = {}",
flush_lsn,
self.current_snapshot.snapshot_version,
);
}
}

Expand Down
70 changes: 35 additions & 35 deletions src/moonlink/src/storage/mooncake_table/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1300,25 +1300,25 @@ async fn test_ongoing_flush_lsns_tracking() -> Result<()> {
.expect("Disk slice 3 should be present");

// Verify all LSNs are tracked
assert!(table.ongoing_flush_lsns.contains(&5));
assert!(table.ongoing_flush_lsns.contains(&10));
assert!(table.ongoing_flush_lsns.contains(&15));
assert!(table.ongoing_flush_lsns.contains_key(&5));
assert!(table.ongoing_flush_lsns.contains_key(&10));
assert!(table.ongoing_flush_lsns.contains_key(&15));
assert_eq!(table.ongoing_flush_lsns.len(), 3);

// Verify min is correctly calculated (should be 5)
assert_eq!(table.get_min_ongoing_flush_lsn(), 5);

// Complete flush with LSN 10 (out of order completion)
table.apply_flush_result(disk_slice_2, uuid::Uuid::new_v4() /*placeholder*/);
assert!(table.ongoing_flush_lsns.contains(&5));
assert!(!table.ongoing_flush_lsns.contains(&10));
assert!(table.ongoing_flush_lsns.contains(&15));
assert!(table.ongoing_flush_lsns.contains_key(&5));
assert!(!table.ongoing_flush_lsns.contains_key(&10));
assert!(table.ongoing_flush_lsns.contains_key(&15));
assert_eq!(table.get_min_ongoing_flush_lsn(), 5); // Still 5

// Complete flush with LSN 5
table.apply_flush_result(disk_slice_1, uuid::Uuid::new_v4() /*placeholder*/);
assert!(!table.ongoing_flush_lsns.contains(&5));
assert!(table.ongoing_flush_lsns.contains(&15));
assert!(!table.ongoing_flush_lsns.contains_key(&5));
assert!(table.ongoing_flush_lsns.contains_key(&15));
assert_eq!(table.get_min_ongoing_flush_lsn(), 15); // Now 15

// Complete last flush
Expand Down Expand Up @@ -1355,8 +1355,8 @@ async fn test_streaming_flush_lsns_tracking() -> Result<()> {
.expect("Disk slice 2 should be present");

// Verify both streaming LSNs are tracked
assert!(table.ongoing_flush_lsns.contains(&100));
assert!(table.ongoing_flush_lsns.contains(&50));
assert!(table.ongoing_flush_lsns.contains_key(&100));
assert!(table.ongoing_flush_lsns.contains_key(&50));
assert_eq!(table.get_min_ongoing_flush_lsn(), 50);

// Mix with regular flush (must be higher than previous regular flush)
Expand All @@ -1367,9 +1367,9 @@ async fn test_streaming_flush_lsns_tracking() -> Result<()> {
.expect("Disk slice 3 should be present");

// Verify all three LSNs are tracked
assert!(table.ongoing_flush_lsns.contains(&100));
assert!(table.ongoing_flush_lsns.contains(&50));
assert!(table.ongoing_flush_lsns.contains(&75));
assert!(table.ongoing_flush_lsns.contains_key(&100));
assert!(table.ongoing_flush_lsns.contains_key(&50));
assert!(table.ongoing_flush_lsns.contains_key(&75));
assert_eq!(table.get_min_ongoing_flush_lsn(), 50);

// Complete streaming flushes
Expand Down Expand Up @@ -1580,22 +1580,22 @@ async fn test_out_of_order_flush_completion() -> Result<()> {
.expect("Disk slice should be present");

// Verify all are pending and min is 10
assert!(table.ongoing_flush_lsns.contains(&10));
assert!(table.ongoing_flush_lsns.contains(&20));
assert!(table.ongoing_flush_lsns.contains(&30));
assert!(table.ongoing_flush_lsns.contains_key(&10));
assert!(table.ongoing_flush_lsns.contains_key(&20));
assert!(table.ongoing_flush_lsns.contains_key(&30));
assert_eq!(table.get_min_ongoing_flush_lsn(), 10);

// Complete flush 30 first (out of order)
table.apply_flush_result(disk_slice_30, uuid::Uuid::new_v4() /*placeholder*/);
assert!(!table.ongoing_flush_lsns.contains(&30));
assert!(table.ongoing_flush_lsns.contains(&10));
assert!(table.ongoing_flush_lsns.contains(&20));
assert!(!table.ongoing_flush_lsns.contains_key(&30));
assert!(table.ongoing_flush_lsns.contains_key(&10));
assert!(table.ongoing_flush_lsns.contains_key(&20));
assert_eq!(table.get_min_ongoing_flush_lsn(), 10); // Still 10

// Complete flush 10 (should update min to 20)
table.apply_flush_result(disk_slice_10, uuid::Uuid::new_v4() /*placeholder*/);
assert!(!table.ongoing_flush_lsns.contains(&10));
assert!(table.ongoing_flush_lsns.contains(&20));
assert!(!table.ongoing_flush_lsns.contains_key(&10));
assert!(table.ongoing_flush_lsns.contains_key(&20));
assert_eq!(table.get_min_ongoing_flush_lsn(), 20);

// Complete flush 20 (should clear all)
Expand Down Expand Up @@ -1631,8 +1631,8 @@ async fn test_mixed_regular_and_streaming_lsn_ordering() -> Result<()> {
.expect("Streaming disk slice should be present");

// Verify both are tracked and min is 50
assert!(table.ongoing_flush_lsns.contains(&100));
assert!(table.ongoing_flush_lsns.contains(&50));
assert!(table.ongoing_flush_lsns.contains_key(&100));
assert!(table.ongoing_flush_lsns.contains_key(&50));
assert_eq!(table.get_min_ongoing_flush_lsn(), 50);

// According to table handler logic, iceberg snapshots with flush_lsn >= 50 should be blocked
Expand Down Expand Up @@ -1662,8 +1662,8 @@ async fn test_mixed_regular_and_streaming_lsn_ordering() -> Result<()> {
streaming_disk_slice,
uuid::Uuid::new_v4(), /*placeholder*/
);
assert!(!table.ongoing_flush_lsns.contains(&50));
assert!(table.ongoing_flush_lsns.contains(&100));
assert!(!table.ongoing_flush_lsns.contains_key(&50));
assert!(table.ongoing_flush_lsns.contains_key(&100));
assert_eq!(table.get_min_ongoing_flush_lsn(), 100);

// Now iceberg snapshots with flush_lsn < 100 should be allowed
Expand Down Expand Up @@ -1988,7 +1988,7 @@ async fn test_iceberg_snapshot_blocked_by_ongoing_flushes() -> Result<()> {
table.commit(2);

// Verify we have pending flushes
assert!(table.ongoing_flush_lsns.contains(&30));
assert!(table.ongoing_flush_lsns.contains_key(&30));
assert_eq!(table.get_min_ongoing_flush_lsn(), 30);

// Create a mooncake snapshot - this will create an iceberg payload
Expand Down Expand Up @@ -2090,9 +2090,9 @@ async fn test_out_of_order_flush_completion_with_iceberg_snapshots() -> Result<(

// Verify all pending flushes and min pending LSN
assert_eq!(table.get_min_ongoing_flush_lsn(), 10);
assert!(table.ongoing_flush_lsns.contains(&10));
assert!(table.ongoing_flush_lsns.contains(&20));
assert!(table.ongoing_flush_lsns.contains(&30));
assert!(table.ongoing_flush_lsns.contains_key(&10));
assert!(table.ongoing_flush_lsns.contains_key(&20));
assert!(table.ongoing_flush_lsns.contains_key(&30));

// Create snapshot and test constraint with min_ongoing_flush_lsn = 10
let created = table.create_snapshot(SnapshotOption {
Expand Down Expand Up @@ -2136,16 +2136,16 @@ async fn test_out_of_order_flush_completion_with_iceberg_snapshots() -> Result<(
// Complete flushes OUT OF ORDER - complete middle one first (LSN 20)
table.apply_flush_result(disk_slice_2, uuid::Uuid::new_v4() /*placeholder*/);
assert_eq!(table.get_min_ongoing_flush_lsn(), 10); // Should still be 10
assert!(table.ongoing_flush_lsns.contains(&10));
assert!(!table.ongoing_flush_lsns.contains(&20));
assert!(table.ongoing_flush_lsns.contains(&30));
assert!(table.ongoing_flush_lsns.contains_key(&10));
assert!(!table.ongoing_flush_lsns.contains_key(&20));
assert!(table.ongoing_flush_lsns.contains_key(&30));

// Complete the first flush (LSN 10) - now min should be 30
table.apply_flush_result(disk_slice_1, uuid::Uuid::new_v4() /*placeholder*/);
assert_eq!(table.get_min_ongoing_flush_lsn(), 30); // Now should be 30
assert!(!table.ongoing_flush_lsns.contains(&10));
assert!(!table.ongoing_flush_lsns.contains(&20));
assert!(table.ongoing_flush_lsns.contains(&30));
assert!(!table.ongoing_flush_lsns.contains_key(&10));
assert!(!table.ongoing_flush_lsns.contains_key(&20));
assert!(table.ongoing_flush_lsns.contains_key(&30));

// Test constraint logic with new min pending flush LSN = 30
let can_initiate_low = TableHandlerState::can_initiate_iceberg_snapshot(
Expand Down
12 changes: 12 additions & 0 deletions src/moonlink/src/storage/mooncake_table/transaction_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ impl MooncakeTable {
// Perform table flush completion notification.
if let Some(lsn) = disk_slice.lsn() {
self.remove_ongoing_flush_lsn(lsn);
println!("set flush lsn {} at {:?}:{:?}", lsn, file!(), line!());
self.try_set_next_flush_lsn(lsn);
}

Expand All @@ -446,6 +447,7 @@ impl MooncakeTable {
let should_remove = stream_state.ongoing_flush_count == 0;
let _ = stream_state;

println!("set flush lsn {} at {:?}:{:?}", commit_lsn, file!(), line!());
self.try_set_next_flush_lsn(commit_lsn);
self.next_snapshot_task.new_disk_slices.push(disk_slice);
if should_remove {
Expand Down Expand Up @@ -555,6 +557,16 @@ impl MooncakeTable {
.unwrap();
}

// Record ongoing flushes.
let ongoing_flush_count = {
let stream_state = self
.transaction_stream_states
.get_mut(&xact_id)
.expect("Stream state not found for xact_id: {xact_id}");
stream_state.ongoing_flush_count
};
self.insert_ongoing_flush_lsn(lsn, ongoing_flush_count);

// Perform commit operation.
let stream_state = self
.transaction_stream_states
Expand Down
Loading