Skip to content

Commit ad15c85

Browse files
committed
test: update tests to reflect updated invariant checking paradigm
1 parent 94482d1 commit ad15c85

File tree

2 files changed

+196
-22
lines changed

2 files changed

+196
-22
lines changed

datafusion/core/src/physical_planner.rs

+196-21
Original file line numberDiff line numberDiff line change
@@ -2082,6 +2082,8 @@ mod tests {
20822082
use super::*;
20832083
use crate::datasource::file_format::options::CsvReadOptions;
20842084
use crate::datasource::MemTable;
2085+
use crate::physical_optimizer::enforce_sorting::EnforceSorting;
2086+
use crate::physical_optimizer::sanity_checker::SanityCheckPlan;
20852087
use crate::physical_plan::{
20862088
expressions, DisplayAs, DisplayFormatType, PlanProperties,
20872089
SendableRecordBatchStream,
@@ -2098,7 +2100,11 @@ mod tests {
20982100
use datafusion_execution::TaskContext;
20992101
use datafusion_expr::{col, lit, LogicalPlanBuilder, UserDefinedLogicalNodeCore};
21002102
use datafusion_functions_aggregate::expr_fn::sum;
2101-
use datafusion_physical_expr::EquivalenceProperties;
2103+
use datafusion_physical_expr::expressions::{
2104+
col as physical_expr_col, lit as physical_expr_lit,
2105+
};
2106+
use datafusion_physical_expr::{Distribution, EquivalenceProperties, LexRequirement};
2107+
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
21022108
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
21032109

21042110
fn make_session_state() -> SessionState {
@@ -2848,9 +2854,29 @@ digraph {
28482854
assert_contains!(generated_graph, expected_tooltip);
28492855
}
28502856

2857+
fn default_plan_props() -> PlanProperties {
2858+
PlanProperties::new(
2859+
EquivalenceProperties::new(Arc::new(Schema::empty())),
2860+
Partitioning::RoundRobinBatch(1),
2861+
EmissionType::Final,
2862+
Boundedness::Bounded,
2863+
)
2864+
}
2865+
28512866
/// Extension Node which passes invariant checks
28522867
#[derive(Debug)]
2853-
struct OkExtensionNode(Vec<Arc<dyn ExecutionPlan>>);
2868+
struct OkExtensionNode {
2869+
children: Vec<Arc<dyn ExecutionPlan>>,
2870+
properties: PlanProperties,
2871+
}
2872+
impl OkExtensionNode {
2873+
fn new(children: Vec<Arc<dyn ExecutionPlan>>) -> Self {
2874+
Self {
2875+
children,
2876+
properties: default_plan_props(),
2877+
}
2878+
}
2879+
}
28542880
impl ExecutionPlan for OkExtensionNode {
28552881
fn name(&self) -> &str {
28562882
"always ok"
@@ -2859,19 +2885,19 @@ digraph {
28592885
self: Arc<Self>,
28602886
children: Vec<Arc<dyn ExecutionPlan>>,
28612887
) -> Result<Arc<dyn ExecutionPlan>> {
2862-
Ok(Arc::new(Self(children)))
2888+
Ok(Arc::new(Self::new(children)))
28632889
}
28642890
fn schema(&self) -> SchemaRef {
28652891
Arc::new(Schema::empty())
28662892
}
28672893
fn as_any(&self) -> &dyn Any {
2868-
unimplemented!()
2894+
self
28692895
}
28702896
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2871-
self.0.iter().collect::<Vec<_>>()
2897+
self.children.iter().collect::<Vec<_>>()
28722898
}
28732899
fn properties(&self) -> &PlanProperties {
2874-
unimplemented!()
2900+
&self.properties
28752901
}
28762902
fn execute(
28772903
&self,
@@ -2889,7 +2915,16 @@ digraph {
28892915

28902916
/// Extension Node which fails invariant checks
28912917
#[derive(Debug)]
2892-
struct InvariantFailsExtensionNode;
2918+
struct InvariantFailsExtensionNode {
2919+
properties: PlanProperties,
2920+
}
2921+
impl InvariantFailsExtensionNode {
2922+
fn new() -> Self {
2923+
Self {
2924+
properties: default_plan_props(),
2925+
}
2926+
}
2927+
}
28932928
impl ExecutionPlan for InvariantFailsExtensionNode {
28942929
fn name(&self) -> &str {
28952930
"InvariantFailsExtensionNode"
@@ -2907,13 +2942,13 @@ digraph {
29072942
unimplemented!()
29082943
}
29092944
fn as_any(&self) -> &dyn Any {
2910-
unimplemented!()
2945+
self
29112946
}
29122947
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2913-
unimplemented!()
2948+
vec![]
29142949
}
29152950
fn properties(&self) -> &PlanProperties {
2916-
unimplemented!()
2951+
&self.properties
29172952
}
29182953
fn execute(
29192954
&self,
@@ -2946,15 +2981,18 @@ digraph {
29462981
fn schema_check(&self) -> bool {
29472982
true
29482983
}
2984+
fn executable_check(&self, _previous_plan_is_valid: bool) -> bool {
2985+
true
2986+
}
29492987
}
29502988

29512989
#[test]
2952-
fn test_invariant_checker() -> Result<()> {
2990+
fn test_invariant_checker_with_execution_plan_extensions() -> Result<()> {
29532991
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
29542992
Arc::new(OptimizerRuleWithSchemaCheck);
29552993

29562994
// ok plan
2957-
let ok_node: Arc<dyn ExecutionPlan> = Arc::new(OkExtensionNode(vec![]));
2995+
let ok_node: Arc<dyn ExecutionPlan> = Arc::new(OkExtensionNode::new(vec![]));
29582996
let child = Arc::clone(&ok_node);
29592997
let ok_plan = Arc::clone(&ok_node).with_new_children(vec![
29602998
Arc::clone(&child).with_new_children(vec![Arc::clone(&child)])?,
@@ -2963,38 +3001,175 @@ digraph {
29633001

29643002
// Test: check should pass with same schema
29653003
let equal_schema = ok_plan.schema();
2966-
InvariantChecker.check(&ok_plan, &rule, equal_schema)?;
3004+
InvariantChecker::new(&Default::default(), &rule).check(
3005+
&ok_plan,
3006+
equal_schema,
3007+
true,
3008+
)?;
29673009

29683010
// Test: should fail with schema changed
29693011
let different_schema =
29703012
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
2971-
let expected_err = InvariantChecker
2972-
.check(&ok_plan, &rule, different_schema)
3013+
let expected_err = InvariantChecker::new(&Default::default(), &rule)
3014+
.check(&ok_plan, different_schema, true)
29733015
.unwrap_err();
29743016
assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed, due to generate a different schema"));
29753017

29763018
// Test: should fail when extension node fails it's own invariant check
2977-
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
2978-
let expected_err = InvariantChecker
2979-
.check(&failing_node, &rule, ok_plan.schema())
3019+
let failing_node: Arc<dyn ExecutionPlan> =
3020+
Arc::new(InvariantFailsExtensionNode::new());
3021+
let expected_err = InvariantChecker::new(&Default::default(), &rule)
3022+
.check(&failing_node, ok_plan.schema(), true)
29803023
.unwrap_err();
29813024
assert!(expected_err
29823025
.to_string()
29833026
.contains("extension node failed it's user-defined invariant check"));
29843027

29853028
// Test: should fail when descendent extension node fails
2986-
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
3029+
let failing_node: Arc<dyn ExecutionPlan> =
3030+
Arc::new(InvariantFailsExtensionNode::new());
29873031
let invalid_plan = ok_node.with_new_children(vec![
29883032
Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?,
29893033
Arc::clone(&child),
29903034
])?;
2991-
let expected_err = InvariantChecker
2992-
.check(&invalid_plan, &rule, ok_plan.schema())
3035+
let expected_err = InvariantChecker::new(&Default::default(), &rule)
3036+
.check(&invalid_plan, ok_plan.schema(), true)
29933037
.unwrap_err();
29943038
assert!(expected_err
29953039
.to_string()
29963040
.contains("extension node failed it's user-defined invariant check"));
29973041

3042+
// Test: confirm error message contains both the user-defined extension name and the optimizer rule name
3043+
assert!(expected_err
3044+
.to_string()
3045+
.contains("Invariant for ExecutionPlan node 'InvariantFailsExtensionNode' failed for PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck'"));
3046+
3047+
Ok(())
3048+
}
3049+
3050+
fn wrap_in_nonexecutable(
3051+
plan: Arc<dyn ExecutionPlan>,
3052+
) -> Result<Arc<dyn ExecutionPlan>> {
3053+
let schema =
3054+
Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, false)]));
3055+
let col_a = physical_expr_col("a", &schema)?;
3056+
3057+
// mutate the tree is such a way that is NOT yet executable
3058+
Ok(Arc::new(OutputRequirementExec::new(
3059+
plan,
3060+
Some(LexRequirement::from_lex_ordering(
3061+
vec![PhysicalSortExpr::new_default(col_a)].into(),
3062+
)),
3063+
Distribution::UnspecifiedDistribution,
3064+
)))
3065+
}
3066+
3067+
/// Extension where the physical plan mutation creates a non-executable plan.
3068+
///
3069+
/// This is a "failing" extension, since it doesn't implement [`PhysicalOptimizerRule::executable_check`]
3070+
/// as false.
3071+
#[derive(Debug)]
3072+
struct ExtensionRuleDoesBadMutation;
3073+
impl PhysicalOptimizerRule for ExtensionRuleDoesBadMutation {
3074+
fn optimize(
3075+
&self,
3076+
plan: Arc<dyn ExecutionPlan>,
3077+
_config: &ConfigOptions,
3078+
) -> Result<Arc<dyn ExecutionPlan>> {
3079+
wrap_in_nonexecutable(plan)
3080+
}
3081+
fn name(&self) -> &str {
3082+
"ExtensionRuleDoesBadMutation"
3083+
}
3084+
fn schema_check(&self) -> bool {
3085+
true
3086+
}
3087+
}
3088+
3089+
/// Extension where the physical plan mutation creates a non-executable plan.
3090+
///
3091+
/// This extension properly implements [`PhysicalOptimizerRule::executable_check`] => false.
3092+
/// And then the follow up optimizer runs may be performed.
3093+
#[derive(Debug)]
3094+
struct ExtensionRuleNeedsMoreRuns;
3095+
impl PhysicalOptimizerRule for ExtensionRuleNeedsMoreRuns {
3096+
fn optimize(
3097+
&self,
3098+
plan: Arc<dyn ExecutionPlan>,
3099+
_config: &ConfigOptions,
3100+
) -> Result<Arc<dyn ExecutionPlan>> {
3101+
wrap_in_nonexecutable(plan)
3102+
}
3103+
fn name(&self) -> &str {
3104+
"ExtensionRuleNeedsMoreRuns"
3105+
}
3106+
fn schema_check(&self) -> bool {
3107+
true
3108+
}
3109+
fn executable_check(&self, _previous_plan_is_valid: bool) -> bool {
3110+
false
3111+
}
3112+
}
3113+
3114+
#[test]
3115+
fn test_invariant_checker_with_optimization_extension() -> Result<()> {
3116+
let planner = DefaultPhysicalPlanner {
3117+
extension_planners: vec![],
3118+
};
3119+
3120+
// ok plan
3121+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)]));
3122+
let ok_plan = Arc::new(MemoryExec::try_new_as_values(
3123+
schema,
3124+
vec![vec![physical_expr_lit(ScalarValue::UInt32(None))]],
3125+
)?);
3126+
3127+
// Test: check should pass with valid OpimizerRule mutation
3128+
let session = SessionStateBuilder::new()
3129+
.with_physical_optimizer_rules(vec![Arc::new(OptimizerRuleWithSchemaCheck)])
3130+
.build();
3131+
assert_eq!(
3132+
session.physical_optimizers().len(),
3133+
1,
3134+
"should have the 1 valid optimizer rule"
3135+
);
3136+
planner.optimize_physical_plan(ok_plan.clone(), &session, |_, _| {})?;
3137+
3138+
// Test: should fail with invalid OpimizerRule mutation that leaves plan not executable
3139+
let session = SessionStateBuilder::new()
3140+
.with_physical_optimizer_rules(vec![
3141+
Arc::new(SanityCheckPlan::new()), // should produce executable plan
3142+
Arc::new(ExtensionRuleDoesBadMutation), // will fail executable check
3143+
])
3144+
.build();
3145+
assert_eq!(
3146+
session.physical_optimizers().len(),
3147+
2,
3148+
"should have 2 optimizer rules"
3149+
);
3150+
let expected_err = planner
3151+
.optimize_physical_plan(ok_plan.clone(), &session, |_, _| {})
3152+
.unwrap_err();
3153+
assert!(expected_err
3154+
.to_string()
3155+
.contains("SanityCheckPlan failed for PhysicalOptimizer rule 'ExtensionRuleDoesBadMutation'"));
3156+
3157+
// Test: should pass once the proper additional optimizer rules are applied after the Extension rule
3158+
let session = SessionStateBuilder::new()
3159+
.with_physical_optimizer_rules(vec![
3160+
Arc::new(SanityCheckPlan::new()), // should produce executable plan
3161+
Arc::new(ExtensionRuleNeedsMoreRuns), // Extension states that the returned plan is not executable
3162+
Arc::new(EnforceSorting::new()), // should mutate plan
3163+
Arc::new(SanityCheckPlan::new()), // should produce executable plan
3164+
])
3165+
.build();
3166+
assert_eq!(
3167+
session.physical_optimizers().len(),
3168+
4,
3169+
"should have 4 optimizer rules"
3170+
);
3171+
planner.optimize_physical_plan(ok_plan, &session, |_, _| {})?;
3172+
29983173
Ok(())
29993174
}
30003175
}

datafusion/physical-plan/src/execution_plan.rs

-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
116116
/// A default set of invariants is provided in the default implementation.
117117
/// Extension nodes can provide their own invariants.
118118
fn check_node_invariants(&self) -> Result<()> {
119-
// TODO
120119
Ok(())
121120
}
122121

0 commit comments

Comments
 (0)