Skip to content
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 22 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,10 +802,17 @@ impl DefaultPhysicalPlanner {
));
}
}
return internal_err!(
debug!(
"Physical input schema should be the same as the one converted from logical input schema. Differences: {}",
differences.iter().map(|s| format!("\n\t- {s}")).join("")
);

//influx: temporarily remove error and only log so that we can find a
//reproducer in production
// return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences
// .iter()
// .map(|s| format!("\n\t- {s}"))
// .join(""));
}

let groups = self.create_grouping_physical_expr(
Expand Down Expand Up @@ -4207,6 +4214,8 @@ digraph {
}

#[tokio::test]
// Ignored due to disabling the physical schema check skip.
#[ignore]
async fn test_aggregate_schema_mismatch_metadata() {
let logical_schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
Expand All @@ -4227,6 +4236,8 @@ digraph {
}

#[tokio::test]
// Ignored due to disabling the physical schema check skip.
#[ignore]
async fn test_aggregate_schema_mismatch_field_count() {
let logical_schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
Expand All @@ -4247,6 +4258,8 @@ digraph {
}

#[tokio::test]
// Ignored due to disabling the physical schema check skip.
#[ignore]
async fn test_aggregate_schema_mismatch_field_name() {
let logical_schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
Expand All @@ -4268,6 +4281,8 @@ digraph {
}

#[tokio::test]
// Ignored due to disabling the physical schema check skip.
#[ignore]
async fn test_aggregate_schema_mismatch_field_type() {
let logical_schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
Expand All @@ -4286,6 +4301,8 @@ digraph {
}

#[tokio::test]
// Ignored due to disabling the physical schema check skip.
#[ignore]
async fn test_aggregate_schema_mismatch_field_nullability() {
let logical_schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
Expand All @@ -4304,6 +4321,8 @@ digraph {
}

#[tokio::test]
// Ignored due to disabling the physical schema check skip.
#[ignore]
async fn test_aggregate_schema_mismatch_field_metadata() {
let logical_schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
Expand All @@ -4324,6 +4343,8 @@ digraph {
}

#[tokio::test]
// Ignored due to disabling the physical schema check skip.
#[ignore]
async fn test_aggregate_schema_mismatch_multiple() {
let logical_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ mod test {
let plan_string = get_plan_string(&aggregate_exec_partial).swap_remove(0);
assert_snapshot!(
plan_string,
@"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)], ordering_mode=Sorted"
@"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)]"
);

let p0_statistics = aggregate_exec_partial.partition_statistics(Some(0))?;
Expand Down
162 changes: 118 additions & 44 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ impl DataSource for FileScanConfig {
let schema = self.file_source.table_schema().table_schema();
let mut eq_properties = EquivalenceProperties::new_with_orderings(
Arc::clone(schema),
self.output_ordering.clone(),
self.validated_output_ordering(),
)
.with_constraints(self.constraints.clone());

Expand Down Expand Up @@ -853,6 +853,40 @@ impl DataSource for FileScanConfig {
}

impl FileScanConfig {
/// Returns only the output orderings that are validated against actual
/// file group statistics.
///
/// For example, individual files may be ordered by `col1 ASC`,
/// but if we have files with these min/max statistics in a single partition / file group:
///
/// - file1: min(col1) = 10, max(col1) = 20
/// - file2: min(col1) = 5, max(col1) = 15
///
/// Because reading file1 followed by file2 would produce out-of-order output (there is overlap
/// in the ranges), we cannot retain `col1 ASC` as a valid output ordering.
///
/// Similarly this would not be a valid order (non-overlapping ranges but not ordered):
///
/// - file1: min(col1) = 20, max(col1) = 30
/// - file2: min(col1) = 10, max(col1) = 15
///
/// On the other hand if we had:
///
/// - file1: min(col1) = 5, max(col1) = 15
/// - file2: min(col1) = 16, max(col1) = 25
///
/// Then we know that reading file1 followed by file2 will produce ordered output,
/// so `col1 ASC` would be retained.
///
/// Note that we are checking for ordering *within* *each* file group / partition,
/// files in different partitions are read independently and do not affect each other's ordering.
/// Merging of the multiple partition streams into a single ordered stream is handled
/// upstream e.g. by `SortPreservingMergeExec`.
fn validated_output_ordering(&self) -> Vec<LexOrdering> {
let schema = self.file_source.table_schema().table_schema();
validate_orderings(&self.output_ordering, schema, &self.file_groups, None)
}

/// Get the file schema (schema of the files without partition columns)
pub fn file_schema(&self) -> &SchemaRef {
self.file_source.table_schema().file_schema()
Expand Down Expand Up @@ -1202,6 +1236,51 @@ fn ordered_column_indices_from_projection(
.collect::<Option<Vec<usize>>>()
}

/// Check whether a given ordering is valid for all file groups by verifying
/// that files within each group are sorted according to their min/max statistics.
///
/// For single-file (or empty) groups, the ordering is trivially valid.
/// For multi-file groups, we check that the min/max statistics for the sort
/// columns are in order and non-overlapping (or touching at boundaries).
///
/// `projection` maps projected column indices back to table-schema indices
/// when validating after projection; pass `None` when validating at
/// table-schema level.
fn is_ordering_valid_for_file_groups(
file_groups: &[FileGroup],
ordering: &LexOrdering,
schema: &SchemaRef,
projection: Option<&[usize]>,
) -> bool {
file_groups.iter().all(|group| {
if group.len() <= 1 {
return true; // single-file groups are trivially sorted
}
match MinMaxStatistics::new_from_files(ordering, schema, projection, group.iter())
{
Ok(stats) => stats.is_sorted(),
Err(_) => false, // can't prove sorted → reject
}
})
}

/// Filters orderings to retain only those valid for all file groups,
/// verified via min/max statistics.
fn validate_orderings(
orderings: &[LexOrdering],
schema: &SchemaRef,
file_groups: &[FileGroup],
projection: Option<&[usize]>,
) -> Vec<LexOrdering> {
orderings
.iter()
.filter(|ordering| {
is_ordering_valid_for_file_groups(file_groups, ordering, schema, projection)
})
.cloned()
.collect()
}

/// The various listing tables does not attempt to read all files
/// concurrently, instead they will read files in sequence within a
/// partition. This is an important property as it allows plans to
Expand Down Expand Up @@ -1268,52 +1347,47 @@ fn get_projected_output_ordering(
let projected_orderings =
project_orderings(&base_config.output_ordering, projected_schema);

let mut all_orderings = vec![];
for new_ordering in projected_orderings {
// Check if any file groups are not sorted
if base_config.file_groups.iter().any(|group| {
if group.len() <= 1 {
// File groups with <= 1 files are always sorted
return false;
}

let Some(indices) = base_config
.file_source
.projection()
.as_ref()
.map(|p| ordered_column_indices_from_projection(p))
else {
// Can't determine if ordered without a simple projection
return true;
};

let statistics = match MinMaxStatistics::new_from_files(
&new_ordering,
let indices = base_config
.file_source
.projection()
.as_ref()
.map(|p| ordered_column_indices_from_projection(p));

match indices {
Some(Some(indices)) => {
// Simple column projection — validate with statistics
validate_orderings(
&projected_orderings,
projected_schema,
indices.as_deref(),
group.iter(),
) {
Ok(statistics) => statistics,
Err(e) => {
log::trace!("Error fetching statistics for file group: {e}");
// we can't prove that it's ordered, so we have to reject it
return true;
}
};

!statistics.is_sorted()
}) {
debug!(
"Skipping specified output ordering {:?}. \
Some file groups couldn't be determined to be sorted: {:?}",
base_config.output_ordering[0], base_config.file_groups
);
continue;
&base_config.file_groups,
Some(indices.as_slice()),
)
}
None => {
// No projection — validate with statistics (no remapping needed)
validate_orderings(
&projected_orderings,
projected_schema,
&base_config.file_groups,
None,
)
}
Some(None) => {
// Complex projection (expressions, not simple columns) — can't
// determine column indices for statistics. Still valid if all
// file groups have at most one file.
if base_config.file_groups.iter().all(|g| g.len() <= 1) {
projected_orderings
} else {
debug!(
"Skipping specified output orderings. \
Some file groups couldn't be determined to be sorted: {:?}",
base_config.file_groups
);
vec![]
}
}

all_orderings.push(new_ordering);
}
all_orderings
}

/// Convert type to a type suitable for use as a `ListingTable`
Expand Down
3 changes: 2 additions & 1 deletion datafusion/datasource/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,12 @@ impl MinMaxStatistics {
}

/// Check if the min/max statistics are in order and non-overlapping
/// (or touching at boundaries)
pub fn is_sorted(&self) -> bool {
self.max_by_sort_order
.iter()
.zip(self.min_by_sort_order.iter().skip(1))
.all(|(max, next_min)| max < next_min)
.all(|(max, next_min)| max <= next_min)
}
}

Expand Down
Loading