perf: Pre-resolve type dispatch in sort-merge join comparators#20452
Closed
andygrove wants to merge 3 commits intoapache:mainfrom
Closed
perf: Pre-resolve type dispatch in sort-merge join comparators#20452andygrove wants to merge 3 commits intoapache:mainfrom
andygrove wants to merge 3 commits intoapache:mainfrom
Conversation
3a51775 to
bf9d82c
Compare
Contributor
|
Thanks for working on SMJ performance @andygrove! It is somewhat overlooked compared to hash join :) One suggestion: could // already in DataFusion's dependency tree (arrow-ord)
pub type DynComparator = Box<dyn Fn(usize, usize) -> Ordering + Send + Sync>;
make_comparator(left: &dyn Array, right: &dyn Array, opts: SortOptions) -> Result<DynComparator>It already does everything this PR implements, and more:
For let cmp = make_comparator(left_arr, right_arr, opts)?;
let is_eq = move |i: usize, j: usize| -> bool {
match (left_arr.is_null(i), right_arr.is_null(j)) {
(true, true) => true,
(true, false) | (false, true) => false,
_ => cmp(i, j).is_eq(),
}
};Side note: |
Replace per-row runtime DataType matching in is_join_arrays_equal() and compare_join_arrays() with a JoinComparator struct that resolves typed comparison function pointers once during SortMergeJoinStream construction. This eliminates the overhead of matching on 20+ DataType variants for every row comparison in the merge loop. The JoinComparator provides: - compare(): for merge-loop ordering decisions - is_equal(): for buffered batch key-group expansion Benchmark results (best of 3 iterations, 20 queries): - Q1 (INNER 100Kx100K 1:1): -29.9% - Q17 (INNER 100Kx5M 1:50 filtered): -12.5% - Most other queries: 1-4% improvement - No significant regressions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
bf9d82c to
df2bf1c
Compare
Replace hand-rolled JoinComparator type dispatch (~200 lines of macros and per-type match arms) with Arrow's built-in make_comparator. This handles more types (List, Struct, Map, RunEndEncoded, etc.) and optimizes away null checks when columns have no nulls. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add ComparatorCache that tracks array identity via Arc data pointers. Comparators are only rebuilt when the underlying join key arrays change (i.e., when a new batch arrives), not on every compare_streamed_buffered call. This eliminates per-row comparator construction overhead. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Member
Author
|
results are pretty mixed now. I will close this for now and re-open if I manage to get consistent improvements. Thanks for the review @mbutrovich! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
DataTypematching inis_join_arrays_equal()andcompare_join_arrays()with aJoinComparatorstruct that resolves typed comparison function pointers once duringSortMergeJoinStreamconstructionDataTypevariants for every row comparison in the merge loopJoinComparatorprovides two methods:compare()— for merge-loop ordering decisions (streamed vs buffered advance)is_equal()— for buffered batch key-group expansionBenchmark Results
Best of 3 iterations across 20 SMJ benchmark queries (
cargo run --release -p datafusion-benchmarks --bin dfbench -- smj):The biggest wins are on comparison-dominated workloads (Q1: 1:1 join, Q17: filtered 1:50 join). High-cardinality joins (Q3, Q5, Q20) where output construction dominates show no significant change.
Test plan
sort_merge_joinunit tests passcargo fmtcleancargo clippyclean (zero warnings)🤖 Generated with Claude Code