Skip to content

Commit

Permalink
[Improvement](runtime-filter) do not use bloom to replace in_or_bloom…
Browse files Browse the repository at this point in the history
… when rf need merge (apache#39147)

## Proposed changes
do not use bloom to replace in_or_bloom when rf need merge
Because in some cases, this will lead to poor performance

<img width="298" alt="图片"
src="https://github.com/user-attachments/assets/bcee330f-bb38-4e51-af76-1a181bd205f9">
<img width="298" alt="图片"
src="https://github.com/user-attachments/assets/481a4b06-929d-4f4a-8d10-bf2901e68fdf">
  • Loading branch information
BiteTheDDDDt authored Aug 13, 2024
1 parent aa2929e commit f3dd685
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,6 @@ public Boolean visitPhysicalRelation(PhysicalRelation scan, PushDownContext ctx)
}

TRuntimeFilterType type = ctx.type;
if (type == TRuntimeFilterType.IN_OR_BLOOM
&& RuntimeFilterGenerator.hasRemoteTarget(ctx.builderNode, scan)
&& !ctx.builderNode.isBroadCastJoin()) {
type = TRuntimeFilterType.BLOOM;
}

RuntimeFilter filter = ctx.rfContext.getRuntimeFilterBySrcAndType(ctx.srcExpr, type, ctx.builderNode);
if (filter != null) {
if (!filter.hasTargetScan(scan)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------PhysicalDistribute[DistributionSpecHash]
----------hashAgg[LOCAL]
------------PhysicalProject
--------------hashJoin[INNER_JOIN colocated] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF5 sr_ticket_number->[ss_ticket_number];RF6 sr_item_sk->[ss_item_sk];RF7 sr_item_sk->[i_item_sk]
--------------hashJoin[INNER_JOIN colocated] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF5 sr_ticket_number->[ss_ticket_number];RF6 sr_item_sk->[i_item_sk,ss_item_sk]
----------------PhysicalProject
------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF4 i_item_sk->[ss_item_sk]
--------------------PhysicalProject
Expand All @@ -26,7 +26,7 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[customer]
--------------------PhysicalProject
----------------------PhysicalOlapScan[item] apply RFs: RF7
----------------------PhysicalOlapScan[item] apply RFs: RF6
----------------PhysicalProject
------------------PhysicalOlapScan[store_returns]
--PhysicalResultSink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------PhysicalDistribute[DistributionSpecHash]
----------hashAgg[LOCAL]
------------PhysicalProject
--------------hashJoin[INNER_JOIN colocated] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF5 sr_ticket_number->[ss_ticket_number];RF6 sr_item_sk->[ss_item_sk];RF7 sr_item_sk->[i_item_sk]
--------------hashJoin[INNER_JOIN colocated] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF5 sr_ticket_number->[ss_ticket_number];RF6 sr_item_sk->[i_item_sk,ss_item_sk]
----------------PhysicalProject
------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF4 i_item_sk->[ss_item_sk]
--------------------PhysicalProject
Expand All @@ -26,7 +26,7 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[customer]
--------------------PhysicalProject
----------------------PhysicalOlapScan[item] apply RFs: RF7
----------------------PhysicalOlapScan[item] apply RFs: RF6
----------------PhysicalProject
------------------PhysicalOlapScan[store_returns]
--PhysicalResultSink
Expand Down
28 changes: 14 additions & 14 deletions regression-test/data/new_shapes_p0/hint_tpcds/shape/query64.out
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@ PhysicalCteAnchor ( cteId=CTEId#1 )
--------PhysicalDistribute[DistributionSpecHash]
----------hashAgg[LOCAL]
------------PhysicalProject
--------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_item_sk = cs_ui.cs_item_sk)) otherCondition=() build RFs:RF20 cs_item_sk->[i_item_sk,sr_item_sk,ss_item_sk]
--------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_item_sk = cs_ui.cs_item_sk)) otherCondition=() build RFs:RF19 cs_item_sk->[i_item_sk,sr_item_sk,ss_item_sk]
----------------PhysicalProject
------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk)) otherCondition=() build RFs:RF19 p_promo_sk->[ss_promo_sk]
------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk)) otherCondition=() build RFs:RF18 p_promo_sk->[ss_promo_sk]
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_store_sk = store.s_store_sk)) otherCondition=() build RFs:RF18 s_store_sk->[ss_store_sk]
----------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_store_sk = store.s_store_sk)) otherCondition=() build RFs:RF17 s_store_sk->[ss_store_sk]
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_sold_date_sk = d1.d_date_sk)) otherCondition=() build RFs:RF17 d_date_sk->[ss_sold_date_sk]
--------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_sold_date_sk = d1.d_date_sk)) otherCondition=() build RFs:RF16 d_date_sk->[ss_sold_date_sk]
----------------------------PhysicalProject
------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((hd2.hd_income_band_sk = ib2.ib_income_band_sk)) otherCondition=() build RFs:RF16 ib_income_band_sk->[hd_income_band_sk]
------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((hd2.hd_income_band_sk = ib2.ib_income_band_sk)) otherCondition=() build RFs:RF15 ib_income_band_sk->[hd_income_band_sk]
--------------------------------PhysicalProject
----------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((customer.c_current_addr_sk = ad2.ca_address_sk)) otherCondition=() build RFs:RF15 ca_address_sk->[c_current_addr_sk]
----------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((customer.c_current_addr_sk = ad2.ca_address_sk)) otherCondition=() build RFs:RF14 ca_address_sk->[c_current_addr_sk]
------------------------------------PhysicalProject
--------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((customer.c_current_hdemo_sk = hd2.hd_demo_sk)) otherCondition=() build RFs:RF14 hd_demo_sk->[c_current_hdemo_sk]
--------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((customer.c_current_hdemo_sk = hd2.hd_demo_sk)) otherCondition=() build RFs:RF13 hd_demo_sk->[c_current_hdemo_sk]
----------------------------------------PhysicalProject
------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_addr_sk = ad1.ca_address_sk)) otherCondition=() build RFs:RF13 ca_address_sk->[ss_addr_sk]
------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_addr_sk = ad1.ca_address_sk)) otherCondition=() build RFs:RF12 ca_address_sk->[ss_addr_sk]
--------------------------------------------PhysicalProject
----------------------------------------------hashJoin[INNER_JOIN colocated] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF10 sr_item_sk->[ss_item_sk];RF11 sr_item_sk->[i_item_sk];RF12 sr_ticket_number->[ss_ticket_number]
----------------------------------------------hashJoin[INNER_JOIN colocated] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF10 sr_item_sk->[i_item_sk,ss_item_sk];RF11 sr_ticket_number->[ss_ticket_number]
------------------------------------------------PhysicalProject
--------------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_hdemo_sk = hd1.hd_demo_sk)) otherCondition=() build RFs:RF9 hd_demo_sk->[ss_hdemo_sk]
----------------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF8 i_item_sk->[ss_item_sk]
Expand All @@ -34,13 +34,13 @@ PhysicalCteAnchor ( cteId=CTEId#1 )
--------------------------------------------------------------PhysicalProject
----------------------------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) otherCondition=() build RFs:RF5 c_customer_sk->[ss_customer_sk]
------------------------------------------------------------------PhysicalProject
--------------------------------------------------------------------PhysicalOlapScan[store_sales] apply RFs: RF5 RF6 RF8 RF9 RF10 RF12 RF13 RF17 RF18 RF19 RF20
--------------------------------------------------------------------PhysicalOlapScan[store_sales] apply RFs: RF5 RF6 RF8 RF9 RF10 RF11 RF12 RF16 RF17 RF18 RF19
------------------------------------------------------------------PhysicalProject
--------------------------------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((customer.c_current_cdemo_sk = cd2.cd_demo_sk)) otherCondition=() build RFs:RF4 cd_demo_sk->[c_current_cdemo_sk]
----------------------------------------------------------------------PhysicalProject
------------------------------------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((customer.c_first_sales_date_sk = d2.d_date_sk)) otherCondition=() build RFs:RF3 d_date_sk->[c_first_sales_date_sk]
--------------------------------------------------------------------------PhysicalProject
----------------------------------------------------------------------------PhysicalOlapScan[customer] apply RFs: RF3 RF4 RF7 RF14 RF15
----------------------------------------------------------------------------PhysicalOlapScan[customer] apply RFs: RF3 RF4 RF7 RF13 RF14
--------------------------------------------------------------------------PhysicalProject
----------------------------------------------------------------------------PhysicalOlapScan[date_dim]
----------------------------------------------------------------------PhysicalProject
Expand All @@ -51,19 +51,19 @@ PhysicalCteAnchor ( cteId=CTEId#1 )
------------------------------------------------------------PhysicalOlapScan[date_dim]
------------------------------------------------------PhysicalProject
--------------------------------------------------------filter((item.i_current_price <= 58.00) and (item.i_current_price >= 49.00) and i_color IN ('blush', 'lace', 'lawn', 'misty', 'orange', 'pink'))
----------------------------------------------------------PhysicalOlapScan[item] apply RFs: RF11 RF20
----------------------------------------------------------PhysicalOlapScan[item] apply RFs: RF10 RF19
----------------------------------------------------PhysicalProject
------------------------------------------------------hashJoin[INNER_JOIN broadcast] hashCondition=((hd1.hd_income_band_sk = ib1.ib_income_band_sk)) otherCondition=() build RFs:RF2 ib_income_band_sk->[hd_income_band_sk]
--------------------------------------------------------PhysicalProject
----------------------------------------------------------PhysicalOlapScan[household_demographics] apply RFs: RF2
--------------------------------------------------------PhysicalProject
----------------------------------------------------------PhysicalOlapScan[income_band]
------------------------------------------------PhysicalProject
--------------------------------------------------PhysicalOlapScan[store_returns] apply RFs: RF20
--------------------------------------------------PhysicalOlapScan[store_returns] apply RFs: RF19
--------------------------------------------PhysicalProject
----------------------------------------------PhysicalOlapScan[customer_address]
----------------------------------------PhysicalProject
------------------------------------------PhysicalOlapScan[household_demographics] apply RFs: RF16
------------------------------------------PhysicalOlapScan[household_demographics] apply RFs: RF15
------------------------------------PhysicalProject
--------------------------------------PhysicalOlapScan[customer_address]
--------------------------------PhysicalProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------PhysicalDistribute[DistributionSpecHash]
----------hashAgg[LOCAL]
------------PhysicalProject
--------------hashJoin[INNER_JOIN colocated] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF5 sr_ticket_number->[ss_ticket_number];RF6 sr_item_sk->[ss_item_sk];RF7 sr_item_sk->[i_item_sk]
--------------hashJoin[INNER_JOIN colocated] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF5 sr_ticket_number->[ss_ticket_number];RF6 sr_item_sk->[i_item_sk,ss_item_sk]
----------------PhysicalProject
------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF4 i_item_sk->[ss_item_sk]
--------------------PhysicalProject
Expand All @@ -26,7 +26,7 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[customer_address]
--------------------PhysicalProject
----------------------PhysicalOlapScan[item] apply RFs: RF7
----------------------PhysicalOlapScan[item] apply RFs: RF6
----------------PhysicalProject
------------------PhysicalOlapScan[store_returns]
--PhysicalResultSink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------PhysicalDistribute[DistributionSpecHash]
----------hashAgg[LOCAL]
------------PhysicalProject
--------------hashJoin[INNER_JOIN colocated] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF5 sr_ticket_number->[ss_ticket_number];RF6 sr_item_sk->[ss_item_sk];RF7 sr_item_sk->[i_item_sk]
--------------hashJoin[INNER_JOIN colocated] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF5 sr_ticket_number->[ss_ticket_number];RF6 sr_item_sk->[i_item_sk,ss_item_sk]
----------------PhysicalProject
------------------hashJoin[INNER_JOIN broadcast] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF4 i_item_sk->[ss_item_sk]
--------------------PhysicalProject
Expand All @@ -26,7 +26,7 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[customer_address]
--------------------PhysicalProject
----------------------PhysicalOlapScan[item] apply RFs: RF7
----------------------PhysicalOlapScan[item] apply RFs: RF6
----------------PhysicalProject
------------------PhysicalOlapScan[store_returns]
--PhysicalResultSink
Expand Down

0 comments on commit f3dd685

Please sign in to comment.