Skip to content

Commit 531a4c0

Browse files
committed
pass spark3.5
1 parent 66a295b commit 531a4c0

6 files changed

+59
-48
lines changed

backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala

+10-15
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ class GlutenClickHouseDecimalSuite
6666
private val decimalTable: String = "decimal_table"
6767
private val decimalTPCHTables: Seq[(DecimalType, Seq[Int])] = Seq.apply(
6868
(DecimalType.apply(9, 4), Seq()),
69-
// 1: ch decimal avg is float
7069
(DecimalType.apply(18, 8), Seq()),
71-
// 1: ch decimal avg is float, 3/10: all value is null and compare with limit
72-
(DecimalType.apply(38, 19), Seq(3, 10))
70+
// 3/10: all value is null and compare with limit
71+
// 1 Spark 3.5
72+
(DecimalType.apply(38, 19), if (isSparkVersionLE("3.3")) Seq(3, 10) else Seq(1, 3, 10))
7373
)
7474

7575
private def createDecimalTables(dataType: DecimalType): Unit = {
@@ -343,27 +343,22 @@ class GlutenClickHouseDecimalSuite
343343
decimalTPCHTables.foreach {
344344
dt =>
345345
{
346+
val fallBack = (sql_num == 16 || sql_num == 21)
347+
val compareResult = !dt._2.contains(sql_num)
348+
val native = if (fallBack) "fallback" else "native"
349+
val compare = if (compareResult) "compare" else "noCompare"
350+
val PrecisionLoss = s"allowPrecisionLoss=$allowPrecisionLoss"
346351
val decimalType = dt._1
347352
test(s"""TPCH Decimal(${decimalType.precision},${decimalType.scale})
348-
| Q$sql_num[allowPrecisionLoss=$allowPrecisionLoss]""".stripMargin) {
349-
var noFallBack = true
350-
var compareResult = true
351-
if (sql_num == 16 || sql_num == 21) {
352-
noFallBack = false
353-
}
354-
355-
if (dt._2.contains(sql_num)) {
356-
compareResult = false
357-
}
358-
353+
| Q$sql_num[$PrecisionLoss,$native,$compare]""".stripMargin) {
359354
spark.sql(s"use decimal_${decimalType.precision}_${decimalType.scale}")
360355
withSQLConf(
361356
(SQLConf.DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key, allowPrecisionLoss)) {
362357
runTPCHQuery(
363358
sql_num,
364359
tpchQueries,
365360
compareResult = compareResult,
366-
noFallBack = noFallBack) { _ => {} }
361+
noFallBack = !fallBack) { _ => {} }
367362
}
368363
spark.sql(s"use default")
369364
}

backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala

+6-2
Original file line numberDiff line numberDiff line change
@@ -1051,8 +1051,12 @@ class GlutenClickHouseHiveTableSuite
10511051
spark.sql(
10521052
s"CREATE FUNCTION my_add as " +
10531053
s"'org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd2' USING JAR '$jarUrl'")
1054-
runQueryAndCompare("select MY_ADD(id, id+1) from range(10)")(
1055-
checkGlutenOperatorMatch[ProjectExecTransformer])
1054+
if (isSparkVersionLE("3.3")) {
1055+
runQueryAndCompare("select MY_ADD(id, id+1) from range(10)")(
1056+
checkGlutenOperatorMatch[ProjectExecTransformer])
1057+
} else {
1058+
runQueryAndCompare("select MY_ADD(id, id+1) from range(10)", noFallBack = false)(_ => {})
1059+
}
10561060
}
10571061

10581062
test("GLUTEN-4333: fix CSE in aggregate operator") {

backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala

+10-8
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,11 @@ class GlutenClickHouseTPCHBucketSuite
234234
val plans = collect(df.queryExecution.executedPlan) {
235235
case scanExec: BasicScanExecTransformer => scanExec
236236
}
237-
assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
238-
assert(plans(0).metrics("numFiles").value === 2)
239-
assert(plans(0).metrics("pruningTime").value === -1)
240-
assert(plans(0).metrics("numOutputRows").value === 591673)
237+
assert(!plans.head.asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
238+
assert(plans.head.metrics("numFiles").value === 2)
239+
val pruningTimeValue = if (isSparkVersionGE("3.4")) 0 else -1
240+
assert(plans.head.metrics("pruningTime").value === pruningTimeValue)
241+
assert(plans.head.metrics("numOutputRows").value === 591673)
241242
})
242243
}
243244

@@ -409,10 +410,11 @@ class GlutenClickHouseTPCHBucketSuite
409410
val plans = collect(df.queryExecution.executedPlan) {
410411
case scanExec: BasicScanExecTransformer => scanExec
411412
}
412-
assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
413-
assert(plans(0).metrics("numFiles").value === 2)
414-
assert(plans(0).metrics("pruningTime").value === -1)
415-
assert(plans(0).metrics("numOutputRows").value === 11618)
413+
assert(!plans.head.asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
414+
assert(plans.head.metrics("numFiles").value === 2)
415+
val pruningTimeValue = if (isSparkVersionGE("3.4")) 0 else -1
416+
assert(plans.head.metrics("pruningTime").value === pruningTimeValue)
417+
assert(plans.head.metrics("numOutputRows").value === 11618)
416418
})
417419
}
418420

backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class GlutenClickhouseCountDistinctSuite extends GlutenClickHouseWholeStageTrans
107107
values (0, null,1), (0,null,2), (1, 1,4) as data(a,b,c) group by try_add(c,b)
108108
""";
109109
val df = spark.sql(sql)
110-
WholeStageTransformerSuite.checkFallBack(df, noFallback = false)
110+
WholeStageTransformerSuite.checkFallBack(df, noFallback = isSparkVersionGE("3.4"))
111111
}
112112

113113
test("check count distinct with filter") {

backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala

+6-4
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,16 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
7171
assert(plans.size == 3)
7272

7373
assert(plans(2).metrics("numFiles").value === 1)
74-
assert(plans(2).metrics("pruningTime").value === -1)
74+
val pruningTimeValue = if (isSparkVersionGE("3.4")) 0 else -1
75+
assert(plans(2).metrics("pruningTime").value === pruningTimeValue)
7576
assert(plans(2).metrics("filesSize").value === 19230111)
7677

7778
assert(plans(1).metrics("numOutputRows").value === 4)
7879
assert(plans(1).metrics("outputVectors").value === 1)
7980

8081
// Execute Sort operator, it will read the data twice.
81-
assert(plans(0).metrics("numOutputRows").value === 4)
82-
assert(plans(0).metrics("outputVectors").value === 1)
82+
assert(plans.head.metrics("numOutputRows").value === 4)
83+
assert(plans.head.metrics("outputVectors").value === 1)
8384
}
8485
}
8586

@@ -139,7 +140,8 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
139140
assert(plans.size == 3)
140141

141142
assert(plans(2).metrics("numFiles").value === 1)
142-
assert(plans(2).metrics("pruningTime").value === -1)
143+
val pruningTimeValue = if (isSparkVersionGE("3.4")) 0 else -1
144+
assert(plans(2).metrics("pruningTime").value === pruningTimeValue)
143145
assert(plans(2).metrics("filesSize").value === 19230111)
144146

145147
assert(plans(1).metrics("numOutputRows").value === 4)

backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala

+26-18
Original file line numberDiff line numberDiff line change
@@ -460,26 +460,34 @@ class GlutenParquetFilterSuite
460460
"orders1" -> Nil)
461461
)
462462

463+
def runTest(i: Int): Unit = withDataFrame(tpchSQL(i + 1, tpchQueriesResourceFolder)) {
464+
df =>
465+
val scans = df.queryExecution.executedPlan
466+
.collect { case scan: FileSourceScanExecTransformer => scan }
467+
assertResult(result(i).size)(scans.size)
468+
scans.zipWithIndex
469+
.foreach {
470+
case (scan, fileIndex) =>
471+
val tableName = scan.tableIdentifier
472+
.map(_.table)
473+
.getOrElse(scan.relation.options("path").split("/").last)
474+
val predicates = scan.filterExprs()
475+
val expected = result(i)(s"$tableName$fileIndex")
476+
assertResult(expected.size)(predicates.size)
477+
if (expected.isEmpty) assert(predicates.isEmpty)
478+
else compareExpressions(expected.reduceLeft(And), predicates.reduceLeft(And))
479+
}
480+
}
481+
463482
tpchQueries.zipWithIndex.foreach {
464483
case (q, i) =>
465-
test(q) {
466-
withDataFrame(tpchSQL(i + 1, tpchQueriesResourceFolder)) {
467-
df =>
468-
val scans = df.queryExecution.executedPlan
469-
.collect { case scan: FileSourceScanExecTransformer => scan }
470-
assertResult(result(i).size)(scans.size)
471-
scans.zipWithIndex
472-
.foreach {
473-
case (scan, fileIndex) =>
474-
val tableName = scan.tableIdentifier
475-
.map(_.table)
476-
.getOrElse(scan.relation.options("path").split("/").last)
477-
val predicates = scan.filterExprs()
478-
val expected = result(i)(s"$tableName$fileIndex")
479-
assertResult(expected.size)(predicates.size)
480-
if (expected.isEmpty) assert(predicates.isEmpty)
481-
else compareExpressions(expected.reduceLeft(And), predicates.reduceLeft(And))
482-
}
484+
if (q == "q2" || q == "q9") {
485+
testSparkVersionLE33(q) {
486+
runTest(i)
487+
}
488+
} else {
489+
test(q) {
490+
runTest(i)
483491
}
484492
}
485493
}

0 commit comments

Comments
 (0)