Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid pushdown of volatile functions to tablescan #13475

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

theirix
Copy link
Contributor

@theirix theirix commented Nov 18, 2024

Which issue does this PR close?

When using pushdown filters, the planner pushes the volatile random() filter to the table source, so it executes in scan (for example, in parquet) and in the query engine, which leads to weird results.

Closes #13268.

Rationale for this change

It's impossible to evaluate volatile filters in different layers.

What changes are included in this PR?

  • Improvement for rule optimiser to avoid passing volatile filters to scan
  • Unit test

Are these changes tested?

  • Unit tests
  • Regression tests
  • Manual test

As proposed in the original issue, I tried alltypes_tiny_pages_plain.parquet sample file containing 7300 lines:

set datafusion.execution.parquet.pushdown_filters=true;
create external table data stored as parquet location 'alltypes_tiny_pages_plain.parquet';

Running a query

select COUNT(*) from data WHERE RANDOM() < 0.1;

with datafusion-cli gives an answer of 726, which is pretty close to the expected 730.

New plan

+---------------+---------------------------------------------------------------------------------+
| plan_type     | plan                                                                            |
+---------------+---------------------------------------------------------------------------------+
| logical_plan  | Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]                   |
|               |   Filter: random() < Float64(0.1)                                               |
|               |     TableScan: data projection=[]                                               |
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(*)]                              |
|               |   CoalescePartitionsExec                                                        |
|               |     AggregateExec: mode=Partial, gby=[], aggr=[count(*)]                        |
|               |       CoalesceBatchesExec: target_batch_size=8192                               |
|               |         FilterExec: random() < 0.1                                              |
|               |           RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1  |
|               |             ParquetExec: file_groups={1 group: [[sample.parquet]]} 							|
|               |                                                                                 |
+---------------+---------------------------------------------------------------------------------+

Before the change plan was

| ParquetExec: file_groups={1 group: [[alltypes_tiny_pages_plain.parquet]]}, predicate=random() < 0.1 |

Are there any user-facing changes?

No breaking changes.

@github-actions github-actions bot added the optimizer Optimizer rules label Nov 18, 2024
@theirix theirix marked this pull request as ready for review November 18, 2024 22:33
.into_iter()
.zip(results)
.map(|(expr, res)| {
let filter_pushdown_type = if expr.is_volatile() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could skip checking volatile if it’s already unsupported.

                let zip =
                    filter_predicates
                        .into_iter()
                        .zip(results)
                        .map(|(pred, mut res)| {
                            if !matches!(res, TableProviderFilterPushDown::Unsupported)
                                && pred.is_volatile()
                            {
                                // Do not push down predicate with volatile functions to scan
                                res = TableProviderFilterPushDown::Unsupported
                            }
                            (pred, res)
                        });

.into_iter()
.zip(results)
.map(|(expr, res)| {
let filter_pushdown_type = if expr.is_volatile() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we check is_volatile before calling scan.source.supports_filters_pushdown(...)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand, we can ask a table source to tell if the filter is supported even for volatile expressions.

I thought the main difference is which call is more expensive. supports_filters_pushdown for different sources often provides blanket all-supported or all-unsupported for all predicates. So we can save is_volatile calls by filtering out Unsupported values before. In contrast, if we check for volatility first and only then ask the source if the filters are supported, we can waste is_volatilty calls. If the supports_filters_pushdown is more costly, let's reverse it, of course.

I pushed a change to check the volatility after filtering out unsupported predicates. If we prefer the inverse order, I could refactor this optimiser quite a bit to:

  • Get a subset of supported predicates
  • Apply the is_volatile filter to it
  • Build new_scan_filters and new_predicate

Copy link
Contributor

@jayzhan211 jayzhan211 Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is possible that any table source can support volatile function then this checking should happen in supports_filters_pushdown.

Otherwise, we should check is_volatile before supports_filters_pushdown, so those table source don't need to care about whether the function is volatile or not.

Given that volatile has no deterministic result, I think it doesn't make sense to pushdown filter with those function, therefore I think we can check volatility beforehand

It would be nice to add comment to supports_filters_pushdown mentioning that volatile function is not passed to the function

@github-actions github-actions bot added the logical-expr Logical plan and expressions label Nov 20, 2024
Copy link
Contributor

@jayzhan211 jayzhan211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
logical-expr Logical plan and expressions optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Filters on RANDOM() are applied incorrectly when pushdown_filters is enabled.
4 participants