Skip to content

Commit

Permalink
refactor: migrate LinearSearch to HashTable (apache#13658)
Browse files Browse the repository at this point in the history
  • Loading branch information
crepererum authored Dec 6, 2024
1 parent ce330ec commit fc70323
Showing 1 changed file with 15 additions and 12 deletions.
27 changes: 15 additions & 12 deletions datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};

use futures::stream::Stream;
use futures::{ready, StreamExt};
use hashbrown::raw::RawTable;
use hashbrown::hash_table::HashTable;
use indexmap::IndexMap;
use log::debug;

Expand Down Expand Up @@ -442,16 +442,16 @@ pub struct LinearSearch {
/// is ordered by a, b and the window expression contains a PARTITION BY b, a
/// clause, this attribute stores [1, 0].
ordered_partition_by_indices: Vec<usize>,
/// We use this [`RawTable`] to calculate unique partitions for each new
/// We use this [`HashTable`] to calculate unique partitions for each new
/// RecordBatch. First entry in the tuple is the hash value, the second
/// entry is the unique ID for each partition (increments from 0 to n).
row_map_batch: RawTable<(u64, usize)>,
/// We use this [`RawTable`] to calculate the output columns that we can
row_map_batch: HashTable<(u64, usize)>,
/// We use this [`HashTable`] to calculate the output columns that we can
/// produce at each cycle. First entry in the tuple is the hash value, the
/// second entry is the unique ID for each partition (increments from 0 to n).
/// The third entry stores how many new outputs are calculated for the
/// corresponding partition.
row_map_out: RawTable<(u64, usize, usize)>,
row_map_out: HashTable<(u64, usize, usize)>,
input_schema: SchemaRef,
}

Expand Down Expand Up @@ -610,8 +610,8 @@ impl LinearSearch {
input_buffer_hashes: VecDeque::new(),
random_state: Default::default(),
ordered_partition_by_indices,
row_map_batch: RawTable::with_capacity(256),
row_map_out: RawTable::with_capacity(256),
row_map_batch: HashTable::with_capacity(256),
row_map_out: HashTable::with_capacity(256),
input_schema,
}
}
Expand All @@ -631,7 +631,7 @@ impl LinearSearch {
// res stores PartitionKey and row indices (indices where these partition occurs in the `batch`) for each partition.
let mut result: Vec<(PartitionKey, Vec<u32>)> = vec![];
for (hash, row_idx) in batch_hashes.into_iter().zip(0u32..) {
let entry = self.row_map_batch.get_mut(hash, |(_, group_idx)| {
let entry = self.row_map_batch.find_mut(hash, |(_, group_idx)| {
// We can safely get the first index of the partition indices
// since partition indices has one element during initialization.
let row = get_row_at_idx(columns, row_idx as usize).unwrap();
Expand All @@ -641,8 +641,11 @@ impl LinearSearch {
if let Some((_, group_idx)) = entry {
result[*group_idx].1.push(row_idx)
} else {
self.row_map_batch
.insert(hash, (hash, result.len()), |(hash, _)| *hash);
self.row_map_batch.insert_unique(
hash,
(hash, result.len()),
|(hash, _)| *hash,
);
let row = get_row_at_idx(columns, row_idx as usize)?;
// This is a new partition its only index is row_idx for now.
result.push((row, vec![row_idx]));
Expand All @@ -667,7 +670,7 @@ impl LinearSearch {
self.row_map_out.clear();
let mut partition_indices: Vec<(PartitionKey, Vec<u32>)> = vec![];
for (hash, row_idx) in self.input_buffer_hashes.iter().zip(0u32..) {
let entry = self.row_map_out.get_mut(*hash, |(_, group_idx, _)| {
let entry = self.row_map_out.find_mut(*hash, |(_, group_idx, _)| {
let row =
get_row_at_idx(&partition_by_columns, row_idx as usize).unwrap();
row == partition_indices[*group_idx].0
Expand All @@ -693,7 +696,7 @@ impl LinearSearch {
if min_out == 0 {
break;
}
self.row_map_out.insert(
self.row_map_out.insert_unique(
*hash,
(*hash, partition_indices.len(), min_out),
|(hash, _, _)| *hash,
Expand Down

0 comments on commit fc70323

Please sign in to comment.