Skip to content

Commit

Permalink
feat(optimizer): cherry-pick support better predicate pushdown for ta…
Browse files Browse the repository at this point in the history
…ble scan (#19934)

Co-authored-by: Xinhao Xu <[email protected]>
  • Loading branch information
Li0k and xxhZs authored Dec 25, 2024
1 parent 4b2e0df commit c8819a9
Show file tree
Hide file tree
Showing 14 changed files with 1,622 additions and 349 deletions.
65 changes: 65 additions & 0 deletions e2e_test/batch/basic/row_filter.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
statement ok
create table t1(v1 int, v2 int, v3 int);

statement ok
insert into t1 values(1,1,1),(1,2,1),(1,2,2),(1,3,1),(1,3,2),(1,3,3);

statement ok
create materialized view mv1 as select * from t1 order by v1 asc, v2 desc, v3 asc;

statement ok
create materialized view mv2 as select * from t1 order by v1 desc, v2 desc, v3 desc;

statement ok
create materialized view mv3 as select * from t1 order by v1 asc, v2 asc, v3 asc;

query III
select * from mv1 where (v1,v2,v3) > (1,3,1) order by v3;
----
1 3 2
1 3 3

query III
select * from mv2 where (v1,v2,v3) > (1,3,1) order by v3;
----
1 3 2
1 3 3

query III
select * from mv3 where (v1,v2,v3) > (1,3,1) order by v3;
----
1 3 2
1 3 3

query III
select * from mv1 where (v1,v2,v3) < (1,3,1) order by v1,v2,v3;
----
1 1 1
1 2 1
1 2 2

query III
select * from mv2 where (v1,v2,v3) < (1,3,1) order by v1,v2,v3;
----
1 1 1
1 2 1
1 2 2

query III
select * from mv3 where (v1,v2,v3) < (1,3,1) order by v1,v2,v3;
----
1 1 1
1 2 1
1 2 2

statement ok
drop materialized view mv3;

statement ok
drop materialized view mv2;

statement ok
drop materialized view mv1;

statement ok
drop table t1;
2 changes: 1 addition & 1 deletion proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ message ScanRange {

// `None` represent unbounded.
message Bound {
bytes value = 1;
repeated bytes value = 1;
bool inclusive = 2;
}
// The lower bound of the next PK column subsequent to those in `eq_conds`.
Expand Down
120 changes: 66 additions & 54 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, Schema};
use risingwave_common::hash::VnodeCountCompat;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::types::DataType;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::value_encoding::deserialize_datum;
use risingwave_pb::batch_plan::plan_node::NodeBody;
Expand Down Expand Up @@ -60,12 +60,13 @@ pub struct RowSeqScanExecutor<S: StateStore> {
}

/// Range for batch scan.
#[derive(Debug)]
pub struct ScanRange {
/// The prefix of the primary key.
pub pk_prefix: OwnedRow,

/// The range bounds of the next column.
pub next_col_bounds: (Bound<Datum>, Bound<Datum>),
pub next_col_bounds: (Bound<OwnedRow>, Bound<OwnedRow>),
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -100,17 +101,16 @@ impl From<&AsOf> for PbAsOf {

impl ScanRange {
/// Create a scan range from the prost representation.
pub fn new(
scan_range: PbScanRange,
mut pk_types: impl Iterator<Item = DataType>,
) -> Result<Self> {
pub fn new(scan_range: PbScanRange, pk_types: Vec<DataType>) -> Result<Self> {
let mut index = 0;
let pk_prefix = OwnedRow::new(
scan_range
.eq_conds
.iter()
.map(|v| {
let ty = pk_types.next().unwrap();
deserialize_datum(v.as_slice(), &ty)
let ty = pk_types.get(index).unwrap();
index += 1;
deserialize_datum(v.as_slice(), ty)
})
.try_collect()?,
);
Expand All @@ -121,26 +121,34 @@ impl ScanRange {
});
}

let bound_ty = pk_types.next().unwrap();
let build_bound = |bound: &scan_range::Bound| -> Bound<Datum> {
let datum = deserialize_datum(bound.value.as_slice(), &bound_ty).unwrap();
let build_bound = |bound: &scan_range::Bound, mut index| -> Result<Bound<OwnedRow>> {
let next_col_bounds = OwnedRow::new(
bound
.value
.iter()
.map(|v| {
let ty = pk_types.get(index).unwrap();
index += 1;
deserialize_datum(v.as_slice(), ty)
})
.try_collect()?,
);
if bound.inclusive {
Bound::Included(datum)
Ok(Bound::Included(next_col_bounds))
} else {
Bound::Excluded(datum)
Ok(Bound::Excluded(next_col_bounds))
}
};

let next_col_bounds: (Bound<Datum>, Bound<Datum>) = match (
let next_col_bounds: (Bound<OwnedRow>, Bound<OwnedRow>) = match (
scan_range.lower_bound.as_ref(),
scan_range.upper_bound.as_ref(),
) {
(Some(lb), Some(ub)) => (build_bound(lb), build_bound(ub)),
(None, Some(ub)) => (Bound::Unbounded, build_bound(ub)),
(Some(lb), None) => (build_bound(lb), Bound::Unbounded),
(Some(lb), Some(ub)) => (build_bound(lb, index)?, build_bound(ub, index)?),
(None, Some(ub)) => (Bound::Unbounded, build_bound(ub, index)?),
(Some(lb), None) => (build_bound(lb, index)?, Bound::Unbounded),
(None, None) => unreachable!(),
};

Ok(Self {
pk_prefix,
next_col_bounds,
Expand Down Expand Up @@ -221,14 +229,18 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
scan_ranges
.iter()
.map(|scan_range| {
let pk_types = table_desc.pk.iter().map(|order| {
DataType::from(
table_desc.columns[order.column_index as usize]
.column_type
.as_ref()
.unwrap(),
)
});
let pk_types = table_desc
.pk
.iter()
.map(|order| {
DataType::from(
table_desc.columns[order.column_index as usize]
.column_type
.as_ref()
.unwrap(),
)
})
.collect_vec();
ScanRange::new(scan_range.clone(), pk_types)
})
.try_collect()?
Expand Down Expand Up @@ -424,6 +436,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
next_col_bounds,
} = scan_range;

// The len of a valid pk_prefix should be less than or equal pk's num.
let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()];
let (start_bound, end_bound) = if order_type.is_ascending() {
(next_col_bounds.0, next_col_bounds.1)
Expand All @@ -434,40 +447,39 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
let start_bound_is_bounded = !matches!(start_bound, Bound::Unbounded);
let end_bound_is_bounded = !matches!(end_bound, Bound::Unbounded);

let build_bound = |other_bound_is_bounded: bool, bound, order_type_nulls| {
match bound {
Bound::Unbounded => {
if other_bound_is_bounded && order_type_nulls {
// `NULL`s are at the start bound side, we should exclude them to meet SQL semantics.
Bound::Excluded(OwnedRow::new(vec![None]))
} else {
// Both start and end are unbounded, so we need to select all rows.
Bound::Unbounded
}
}
Bound::Included(x) => Bound::Included(x),
Bound::Excluded(x) => Bound::Excluded(x),
}
};
let start_bound = build_bound(
end_bound_is_bounded,
start_bound,
order_type.nulls_are_first(),
);
let end_bound = build_bound(
start_bound_is_bounded,
end_bound,
order_type.nulls_are_last(),
);

// Range Scan.
assert!(pk_prefix.len() < table.pk_indices().len());
let iter = table
.batch_chunk_iter_with_pk_bounds(
epoch.into(),
&pk_prefix,
(
match start_bound {
Bound::Unbounded => {
if end_bound_is_bounded && order_type.nulls_are_first() {
// `NULL`s are at the start bound side, we should exclude them to meet SQL semantics.
Bound::Excluded(OwnedRow::new(vec![None]))
} else {
// Both start and end are unbounded, so we need to select all rows.
Bound::Unbounded
}
}
Bound::Included(x) => Bound::Included(OwnedRow::new(vec![x])),
Bound::Excluded(x) => Bound::Excluded(OwnedRow::new(vec![x])),
},
match end_bound {
Bound::Unbounded => {
if start_bound_is_bounded && order_type.nulls_are_last() {
// `NULL`s are at the end bound side, we should exclude them to meet SQL semantics.
Bound::Excluded(OwnedRow::new(vec![None]))
} else {
// Both start and end are unbounded, so we need to select all rows.
Bound::Unbounded
}
}
Bound::Included(x) => Bound::Included(OwnedRow::new(vec![x])),
Bound::Excluded(x) => Bound::Excluded(OwnedRow::new(vec![x])),
},
),
(start_bound, end_bound),
ordered,
chunk_size,
PrefetchOptions::new(limit.is_none(), true),
Expand Down
Loading

0 comments on commit c8819a9

Please sign in to comment.