Skip to content

Commit

Permalink
Clarify what checkpoint_messages does
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 19, 2024
1 parent ba49fa7 commit a600c3d
Showing 1 changed file with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@ impl QueueSharedState {
}
}

/// Acquires shards from the shared state for the provided list of messages and
/// maps results to that same list
/// Acquires shards from the shared state for the provided list of messages
/// using [`QueueSharedState::acquire_partitions`], then maps resulting
/// positions back to that original list. Messages that don't require any
/// further processing are dropped.
pub async fn checkpoint_messages(
shared_state: &mut QueueSharedState,
publish_token: &str,
Expand All @@ -200,12 +202,12 @@ pub async fn checkpoint_messages(
.collect();
let partition_ids = message_map.keys().cloned().collect();

let shards = shared_state
let partition_positions = shared_state
.acquire_partitions(publish_token, partition_ids)
.await?;

let mut result = Vec::with_capacity(shards.len());
for (partition_id, position) in shards {
let mut result = Vec::with_capacity(partition_positions.len());
for (partition_id, position) in partition_positions {
let content = message_map.remove(&partition_id).context("Unexpected partition ID. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.")?;
result.push((content, position));
}
Expand Down

0 comments on commit a600c3d

Please sign in to comment.