Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel DEDUP_SEMI and DEDUP_SEMI_REVERSE Join.(A new approach to process Semi Join Query in Parallel of MPP) #653

Merged
merged 8 commits into from
Oct 16, 2024

Conversation

avamingli
Copy link
Contributor

@avamingli avamingli commented Oct 1, 2024

To handle semi join in parallel, we have enabled Parallel Semi join.
This commit introduces a new approach to process Semi Join Query in Parallel of MPP, enable DEDUP_SEMI and DEDUP_SEMI_REVERSE join in parallel to handle semi join cases which could be a win in MPP mode.

Allow to use RowIdExpr in paths to process SEMI join query in parallel, and we could use Broadcast Motion in parallel too.
If one of that tables is relatively smaller than another, that will be a big win.

Even Redistribute both side(in parallel-oblivious plan) and unique it in upper plans, it may be a win compared to a SemiJoin Nodes.


Performance

image

Test of case[0] three times with 10 parallel number.

plan avg(ms) 1st(ms) 2nd(ms) 3rd(ms)
Non-Parallell 32660 32655.304 32144.482 33181.489
Parallell Semi Join 24670 24846.083 24134.965 25029.130
Parallell DEDUP_SEMI Join 9249 7936.484 10333.245 9478.557
Parallell DEDUP_SEMI_REVERSE Join 5448 6277.671 5950.229 4117.556

Plan

Both parallel-oblivious and parallel-aware are enabled, for examples of parallel-aware case:

DEDUP_SEMI

select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b);
                                       QUERY PLAN                                        
-----------------------------------------------------------------------------------------
 Finalize Aggregate
   ->  Gather Motion 6:1  (slice1; segments: 6)
         ->  Partial Aggregate
               ->  HashAggregate
                     Group Key: (RowIdExpr)
                     ->  Redistribute Motion 6:6  (slice2; segments: 6)
                           Hash Key: (RowIdExpr)
                           Hash Module: 3
                           ->  Parallel Hash Join
                                 Hash Cond: (foo.a = bar.b)
                                 ->  Broadcast Workers Motion 6:6  (slice3; segments: 6)
                                       ->  Parallel Seq Scan on foo
                                 ->  Parallel Hash
                                       ->  Parallel Seq Scan on bar
 Optimizer: Postgres query optimizer
(15 rows)

DEDUP_SEMI_REVERSE

select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b);
                                           QUERY PLAN                                           
------------------------------------------------------------------------------------------------
 Finalize Aggregate
   ->  Gather Motion 6:1  (slice1; segments: 6)
         ->  Partial Aggregate
               ->  HashAggregate
                     Group Key: (RowIdExpr)
                     ->  Redistribute Motion 6:6  (slice2; segments: 6)
                           Hash Key: (RowIdExpr)
                           Hash Module: 3
                           ->  Parallel Hash Join
                                 Hash Cond: (bar.b = foo.a)
                                 ->  Parallel Seq Scan on bar
                                 ->  Parallel Hash
                                       ->  Broadcast Workers Motion 6:6  (slice3; segments: 6)
                                             ->  Parallel Seq Scan on foo
 Optimizer: Postgres query optimizer
(15 rows)

For DEDUP_SEMI or DEDUP_SEMI_REVERSE join, each process need a unique RowIdExpr to identify unique rows, which is assigned with a baseline when building paths.

It's ok for non-parallel plan, but in parallel mode there are multiple processes on same segment, RowIdExpr in not unique then. To enable that, add ParallelWorkerNumberOfSlice to identify worker id of a parallel plan of a slice.

When rowidexpr is used, it's executed by 48 bits and left other 16 bits for segment_id. In parallel mode, we have to make more room for parallel worker id within segment_id's bits. This is done during planner with checks, in case that there are many segments with many parallel workers (which rarely happens).


[0] DDL & DML

create table foo(a int)  with(parallel_workers=10)  distributed randomly;
create table bar(b int)  with(parallel_workers=10)  distributed randomly;
insert into foo select i from generate_series(1,10000000)i;
insert into bar select i from generate_series(1,100000000)i;
analyze foo;
analyze bar;
select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b);

Authored-by: Zhang Mingli [email protected]

fix #ISSUE_Number


Change logs

Describe your change clearly, including what problem is being solved or what feature is being added.

If it has some breaking backward or forward compatibility, please clary.

Why are the changes needed?

Describe why the changes are necessary.

Does this PR introduce any user-facing change?

If yes, please clarify the previous behavior and the change this PR proposes.

How was this patch tested?

Please detail how the changes were tested, including manual tests and any relevant unit or integration tests.

Contributor's Checklist

Here are some reminders and checklists before/when submitting your pull request, please check them:

  • Make sure your Pull Request has a clear title and commit message. You can take git-commit template as a reference.
  • Sign the Contributor License Agreement as prompted for your first-time contribution(One-time setup).
  • Learn the coding contribution guide, including our code conventions, workflow and more.
  • List your communication in the GitHub Issues or Discussions (if has or needed).
  • Document changes.
  • Add tests for the change
  • Pass make installcheck
  • Pass make -C src/test installcheck-cbdb-parallel
  • Feel free to request cloudberrydb/dev team for review and approval when your PR is ready🥳

@avamingli avamingli requested a review from my-ship-it October 1, 2024 11:12
@avamingli avamingli force-pushed the parallel_DEDUP_SEMI_JOIN branch from f32844c to 926ee79 Compare October 1, 2024 16:24
@avamingli
Copy link
Contributor Author

Another flaky failed cases of resgroup:
#651 (comment)

@avamingli
Copy link
Contributor Author

avamingli commented Oct 8, 2024

Nestloop tends to use normal plan instead of Parallel, need a dig.

@avamingli avamingli force-pushed the parallel_DEDUP_SEMI_JOIN branch 2 times, most recently from 7d5b18e to 511df86 Compare October 9, 2024 03:15
To handle semi join in parallel, we have enabled Parallel Semi join.
This commit enable DEDUP_SEMI and DEDUP_SEMI_REVERSE join in parallel
to handle semi join cases which could be a win in MPP mode.

Both parallel-oblivious and parallel-aware are enabled, for an example
of parallel-aware case:

select * from foo where exists (select 1 from bar where foo.a = bar.b);
                                QUERY PLAN
---------------------------------------------------------------------------
 Gather Motion 6:1  (slice1; segments: 6)
   ->  HashAggregate
         Group Key: (RowIdExpr)
         ->  Redistribute Motion 6:6  (slice2; segments: 6)
               Hash Key: (RowIdExpr)
               Hash Module: 3
               ->  Parallel Hash Join
                     Hash Cond: (bar.b = foo.a)
                     ->  Parallel Seq Scan on bar
                     ->  Parallel Hash
                           ->  Broadcast Workers Motion 6:6  (slice3;
segments: 6)
                                 ->  Parallel Seq Scan on foo
 Optimizer: Postgres query optimizer
(13 rows)

For DEDUP_SEMI or DEDUP_SEMI_REVERSE join, each process need a unique
RowIdExpr to identify unique rows, which is assigned with a baseline
when building paths.

It's ok for non-parallel plan, but in parallel mode there are multiple
processes on same segment, RowIdExpr in not unique then.
To enable that, add ParallelWorkerNumberOfSlice to identify worker id
of a parallel plan of a slice.
When rowidexpr is used, it's executed by 48 bits and left other 16 bits
for segment_id. In parallel mode, we have to make more room for parallel
worker id within segment_id's bits. This is done during planner with
checks, in case that there are many segments with many parallel workers
(which rarely happens).

Authored-by: Zhang Mingli [email protected]
@avamingli avamingli force-pushed the parallel_DEDUP_SEMI_JOIN branch from 511df86 to c348b2c Compare October 12, 2024 02:01
@avamingli
Copy link
Contributor Author

Nestloop tends to use normal plan instead of Parallel, need a dig.

ENV problem, corrected.
And I have to clarify that even without Broadcast, Redistribute both side(in parallel-oblivious plan) and unique it in upper plans, it may be a win compared to a SemiJoin Node.
See parallel Nestloop test cases.

@my-ship-it my-ship-it merged commit e86787e into apache:main Oct 16, 2024
12 checks passed
@avamingli avamingli deleted the parallel_DEDUP_SEMI_JOIN branch December 10, 2024 03:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants