Skip to content

Commit e8583c3

Browse files
authored
[GLUTEN-6067][CH][MINOR][UT] Pass backends-clickhouse ut in Spark 3.5 (#6623)
* 1. move tpch parquet ut to tpch package 2. move tpcds ut to tpcds package 3. pass ut in spark 3.5 * we don't support 3.4 1. isSparkVersionGE("3.5") 2. isSparkVersionLE("3.3")
1 parent cff5de5 commit e8583c3

24 files changed

+298
-265
lines changed

backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala

+10-15
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ class GlutenClickHouseDecimalSuite
6666
private val decimalTable: String = "decimal_table"
6767
private val decimalTPCHTables: Seq[(DecimalType, Seq[Int])] = Seq.apply(
6868
(DecimalType.apply(9, 4), Seq()),
69-
// 1: ch decimal avg is float
7069
(DecimalType.apply(18, 8), Seq()),
71-
// 1: ch decimal avg is float, 3/10: all value is null and compare with limit
72-
(DecimalType.apply(38, 19), Seq(3, 10))
70+
// 3/10: all value is null and compare with limit
71+
// 1 Spark 3.5
72+
(DecimalType.apply(38, 19), if (isSparkVersionLE("3.3")) Seq(3, 10) else Seq(1, 3, 10))
7373
)
7474

7575
private def createDecimalTables(dataType: DecimalType): Unit = {
@@ -343,27 +343,22 @@ class GlutenClickHouseDecimalSuite
343343
decimalTPCHTables.foreach {
344344
dt =>
345345
{
346+
val fallBack = (sql_num == 16 || sql_num == 21)
347+
val compareResult = !dt._2.contains(sql_num)
348+
val native = if (fallBack) "fallback" else "native"
349+
val compare = if (compareResult) "compare" else "noCompare"
350+
val PrecisionLoss = s"allowPrecisionLoss=$allowPrecisionLoss"
346351
val decimalType = dt._1
347352
test(s"""TPCH Decimal(${decimalType.precision},${decimalType.scale})
348-
| Q$sql_num[allowPrecisionLoss=$allowPrecisionLoss]""".stripMargin) {
349-
var noFallBack = true
350-
var compareResult = true
351-
if (sql_num == 16 || sql_num == 21) {
352-
noFallBack = false
353-
}
354-
355-
if (dt._2.contains(sql_num)) {
356-
compareResult = false
357-
}
358-
353+
| Q$sql_num[$PrecisionLoss,$native,$compare]""".stripMargin) {
359354
spark.sql(s"use decimal_${decimalType.precision}_${decimalType.scale}")
360355
withSQLConf(
361356
(SQLConf.DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key, allowPrecisionLoss)) {
362357
runTPCHQuery(
363358
sql_num,
364359
tpchQueries,
365360
compareResult = compareResult,
366-
noFallBack = noFallBack) { _ => {} }
361+
noFallBack = !fallBack) { _ => {} }
367362
}
368363
spark.sql(s"use default")
369364
}

backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala

+6-2
Original file line numberDiff line numberDiff line change
@@ -1051,8 +1051,12 @@ class GlutenClickHouseHiveTableSuite
10511051
spark.sql(
10521052
s"CREATE FUNCTION my_add as " +
10531053
s"'org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd2' USING JAR '$jarUrl'")
1054-
runQueryAndCompare("select MY_ADD(id, id+1) from range(10)")(
1055-
checkGlutenOperatorMatch[ProjectExecTransformer])
1054+
if (isSparkVersionLE("3.3")) {
1055+
runQueryAndCompare("select MY_ADD(id, id+1) from range(10)")(
1056+
checkGlutenOperatorMatch[ProjectExecTransformer])
1057+
} else {
1058+
runQueryAndCompare("select MY_ADD(id, id+1) from range(10)", noFallBack = false)(_ => {})
1059+
}
10561060
}
10571061

10581062
test("GLUTEN-4333: fix CSE in aggregate operator") {

backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@ class GlutenClickHouseNativeWriteTableSuite
603603
("timestamp_field", "timestamp")
604604
)
605605
def excludeTimeFieldForORC(format: String): Seq[String] = {
606-
if (format.equals("orc") && isSparkVersionGE("3.4")) {
606+
if (format.equals("orc") && isSparkVersionGE("3.5")) {
607607
// FIXME:https://github.com/apache/incubator-gluten/pull/6507
608608
fields.keys.filterNot(_.equals("timestamp_field")).toSeq
609609
} else {
@@ -913,7 +913,7 @@ class GlutenClickHouseNativeWriteTableSuite
913913
(table_name, create_sql, insert_sql)
914914
},
915915
(table_name, _) =>
916-
if (isSparkVersionGE("3.4")) {
916+
if (isSparkVersionGE("3.5")) {
917917
compareResultsAgainstVanillaSpark(
918918
s"select * from $table_name",
919919
compareResult = true,

backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala

+27-28
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.gluten.execution
1818

1919
import org.apache.gluten.GlutenConfig
2020
import org.apache.gluten.benchmarks.GenTPCDSTableScripts
21-
import org.apache.gluten.utils.UTSystemParameters
21+
import org.apache.gluten.utils.{Arm, UTSystemParameters}
2222

2323
import org.apache.spark.SparkConf
2424
import org.apache.spark.internal.Logging
@@ -46,8 +46,8 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
4646
rootPath + "../../../../gluten-core/src/test/resources/tpcds-queries/tpcds.queries.original"
4747
protected val queriesResults: String = rootPath + "tpcds-decimal-queries-output"
4848

49-
/** Return values: (sql num, is fall back, skip fall back assert) */
50-
def tpcdsAllQueries(isAqe: Boolean): Seq[(String, Boolean, Boolean)] =
49+
/** Return values: (sql num, is fall back) */
50+
def tpcdsAllQueries(isAqe: Boolean): Seq[(String, Boolean)] =
5151
Range
5252
.inclusive(1, 99)
5353
.flatMap(
@@ -57,37 +57,37 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
5757
} else {
5858
Seq("q" + "%d".format(queryNum))
5959
}
60-
val noFallBack = queryNum match {
61-
case i if !isAqe && (i == 10 || i == 16 || i == 35 || i == 94) =>
62-
// q10 smj + existence join
63-
// q16 smj + left semi + not condition
64-
// q35 smj + existence join
65-
// Q94 BroadcastHashJoin, LeftSemi, NOT condition
66-
(false, false)
67-
case i if isAqe && (i == 16 || i == 94) =>
68-
(false, false)
69-
case other => (true, false)
70-
}
71-
sqlNums.map((_, noFallBack._1, noFallBack._2))
60+
val native = !fallbackSets(isAqe).contains(queryNum)
61+
sqlNums.map((_, native))
7262
})
7363

74-
// FIXME "q17", stddev_samp inconsistent results, CH return NaN, Spark return null
64+
protected def fallbackSets(isAqe: Boolean): Set[Int] = {
65+
val more = if (isSparkVersionGE("3.5")) Set(44, 67, 70) else Set.empty[Int]
66+
67+
// q16 smj + left semi + not condition
68+
// Q94 BroadcastHashJoin, LeftSemi, NOT condition
69+
if (isAqe) {
70+
Set(16, 94) | more
71+
} else {
72+
// q10, q35 smj + existence join
73+
Set(10, 16, 35, 94) | more
74+
}
75+
}
7576
protected def excludedTpcdsQueries: Set[String] = Set(
76-
"q61", // inconsistent results
77-
"q66", // inconsistent results
78-
"q67" // inconsistent results
77+
"q66" // inconsistent results
7978
)
8079

8180
def executeTPCDSTest(isAqe: Boolean): Unit = {
8281
tpcdsAllQueries(isAqe).foreach(
8382
s =>
8483
if (excludedTpcdsQueries.contains(s._1)) {
8584
ignore(s"TPCDS ${s._1.toUpperCase()}") {
86-
runTPCDSQuery(s._1, noFallBack = s._2, skipFallBackAssert = s._3) { df => }
85+
runTPCDSQuery(s._1, noFallBack = s._2) { df => }
8786
}
8887
} else {
89-
test(s"TPCDS ${s._1.toUpperCase()}") {
90-
runTPCDSQuery(s._1, noFallBack = s._2, skipFallBackAssert = s._3) { df => }
88+
val tag = if (s._2) "Native" else "Fallback"
89+
test(s"TPCDS[$tag] ${s._1.toUpperCase()}") {
90+
runTPCDSQuery(s._1, noFallBack = s._2) { df => }
9191
}
9292
})
9393
}
@@ -152,7 +152,7 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
152152
}
153153

154154
override protected def afterAll(): Unit = {
155-
ClickhouseSnapshot.clearAllFileStatusCache
155+
ClickhouseSnapshot.clearAllFileStatusCache()
156156
DeltaLog.clearCache()
157157

158158
try {
@@ -183,11 +183,10 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
183183
tpcdsQueries: String = tpcdsQueries,
184184
queriesResults: String = queriesResults,
185185
compareResult: Boolean = true,
186-
noFallBack: Boolean = true,
187-
skipFallBackAssert: Boolean = false)(customCheck: DataFrame => Unit): Unit = {
186+
noFallBack: Boolean = true)(customCheck: DataFrame => Unit): Unit = {
188187

189188
val sqlFile = tpcdsQueries + "/" + queryNum + ".sql"
190-
val sql = Source.fromFile(new File(sqlFile), "UTF-8").mkString
189+
val sql = Arm.withResource(Source.fromFile(new File(sqlFile), "UTF-8"))(_.mkString)
191190
val df = spark.sql(sql)
192191

193192
if (compareResult) {
@@ -212,13 +211,13 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
212211
// using WARN to guarantee printed
213212
log.warn(s"query: $queryNum, finish comparing with saved result")
214213
} else {
215-
val start = System.currentTimeMillis();
214+
val start = System.currentTimeMillis()
216215
val ret = df.collect()
217216
// using WARN to guarantee printed
218217
log.warn(s"query: $queryNum skipped comparing, time cost to collect: ${System
219218
.currentTimeMillis() - start} ms, ret size: ${ret.length}")
220219
}
221-
WholeStageTransformerSuite.checkFallBack(df, noFallBack, skipFallBackAssert)
220+
WholeStageTransformerSuite.checkFallBack(df, noFallBack)
222221
customCheck(df)
223222
}
224223
}

backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala

+17-17
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,10 @@ class GlutenClickHouseTPCHBucketSuite
234234
val plans = collect(df.queryExecution.executedPlan) {
235235
case scanExec: BasicScanExecTransformer => scanExec
236236
}
237-
assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
238-
assert(plans(0).metrics("numFiles").value === 2)
239-
assert(plans(0).metrics("pruningTime").value === -1)
240-
assert(plans(0).metrics("numOutputRows").value === 591673)
237+
assert(!plans.head.asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
238+
assert(plans.head.metrics("numFiles").value === 2)
239+
assert(plans.head.metrics("pruningTime").value === pruningTimeValueSpark)
240+
assert(plans.head.metrics("numOutputRows").value === 591673)
241241
})
242242
}
243243

@@ -291,7 +291,7 @@ class GlutenClickHouseTPCHBucketSuite
291291
}
292292

293293
if (sparkVersion.equals("3.2")) {
294-
assert(!(plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
294+
assert(!plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
295295
} else {
296296
assert(plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
297297
}
@@ -327,14 +327,14 @@ class GlutenClickHouseTPCHBucketSuite
327327
.isInstanceOf[InputIteratorTransformer])
328328

329329
if (sparkVersion.equals("3.2")) {
330-
assert(!(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
330+
assert(!plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
331331
} else {
332332
assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
333333
}
334334
assert(plans(2).metrics("numFiles").value === 2)
335335
assert(plans(2).metrics("numOutputRows").value === 3111)
336336

337-
assert(!(plans(3).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
337+
assert(!plans(3).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
338338
assert(plans(3).metrics("numFiles").value === 2)
339339
assert(plans(3).metrics("numOutputRows").value === 72678)
340340
})
@@ -366,12 +366,12 @@ class GlutenClickHouseTPCHBucketSuite
366366
}
367367
// bucket join
368368
assert(
369-
plans(0)
369+
plans.head
370370
.asInstanceOf[HashJoinLikeExecTransformer]
371371
.left
372372
.isInstanceOf[ProjectExecTransformer])
373373
assert(
374-
plans(0)
374+
plans.head
375375
.asInstanceOf[HashJoinLikeExecTransformer]
376376
.right
377377
.isInstanceOf[ProjectExecTransformer])
@@ -409,10 +409,10 @@ class GlutenClickHouseTPCHBucketSuite
409409
val plans = collect(df.queryExecution.executedPlan) {
410410
case scanExec: BasicScanExecTransformer => scanExec
411411
}
412-
assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
413-
assert(plans(0).metrics("numFiles").value === 2)
414-
assert(plans(0).metrics("pruningTime").value === -1)
415-
assert(plans(0).metrics("numOutputRows").value === 11618)
412+
assert(!plans.head.asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
413+
assert(plans.head.metrics("numFiles").value === 2)
414+
assert(plans.head.metrics("pruningTime").value === pruningTimeValueSpark)
415+
assert(plans.head.metrics("numOutputRows").value === 11618)
416416
})
417417
}
418418

@@ -425,12 +425,12 @@ class GlutenClickHouseTPCHBucketSuite
425425
}
426426
// bucket join
427427
assert(
428-
plans(0)
428+
plans.head
429429
.asInstanceOf[HashJoinLikeExecTransformer]
430430
.left
431431
.isInstanceOf[FilterExecTransformerBase])
432432
assert(
433-
plans(0)
433+
plans.head
434434
.asInstanceOf[HashJoinLikeExecTransformer]
435435
.right
436436
.isInstanceOf[ProjectExecTransformer])
@@ -585,7 +585,7 @@ class GlutenClickHouseTPCHBucketSuite
585585
def checkResult(df: DataFrame, exceptedResult: Seq[Row]): Unit = {
586586
// check the result
587587
val result = df.collect()
588-
assert(result.size == exceptedResult.size)
588+
assert(result.length == exceptedResult.size)
589589
val sortedRes = result.map {
590590
s =>
591591
Row.fromSeq(s.toSeq.map {
@@ -786,7 +786,7 @@ class GlutenClickHouseTPCHBucketSuite
786786
|order by l_orderkey, l_returnflag, t
787787
|limit 10
788788
|""".stripMargin
789-
runSql(SQL7, false)(
789+
runSql(SQL7, noFallBack = false)(
790790
df => {
791791
checkResult(
792792
df,

backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala

+16-5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
2323
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
2424

2525
import org.apache.commons.io.FileUtils
26+
import org.scalatest.Tag
2627

2728
import java.io.File
2829

@@ -177,13 +178,23 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu
177178
super.beforeAll()
178179
}
179180

180-
protected val rootPath = this.getClass.getResource("/").getPath
181-
protected val basePath = rootPath + "tests-working-home"
182-
protected val warehouse = basePath + "/spark-warehouse"
183-
protected val metaStorePathAbsolute = basePath + "/meta"
184-
protected val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
181+
protected val rootPath: String = this.getClass.getResource("/").getPath
182+
protected val basePath: String = rootPath + "tests-working-home"
183+
protected val warehouse: String = basePath + "/spark-warehouse"
184+
protected val metaStorePathAbsolute: String = basePath + "/meta"
185+
protected val hiveMetaStoreDB: String = metaStorePathAbsolute + "/metastore_db"
185186

186187
final override protected val resourcePath: String = "" // ch not need this
187188
override protected val fileFormat: String = "parquet"
189+
190+
protected def testSparkVersionLE33(testName: String, testTag: Tag*)(testFun: => Any): Unit = {
191+
if (isSparkVersionLE("3.3")) {
192+
test(testName, testTag: _*)(testFun)
193+
} else {
194+
ignore(s"[$SPARK_VERSION_SHORT]-$testName", testTag: _*)(testFun)
195+
}
196+
}
197+
198+
lazy val pruningTimeValueSpark: Int = if (isSparkVersionLE("3.3")) -1 else 0
188199
}
189200
// scalastyle:off line.size.limit

backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,9 @@ class GlutenClickhouseCountDistinctSuite extends GlutenClickHouseWholeStageTrans
105105
val sql = s"""
106106
select count(distinct(a,b)) , try_add(c,b) from
107107
values (0, null,1), (0,null,2), (1, 1,4) as data(a,b,c) group by try_add(c,b)
108-
""";
108+
"""
109109
val df = spark.sql(sql)
110-
WholeStageTransformerSuite.checkFallBack(df, noFallback = false)
110+
WholeStageTransformerSuite.checkFallBack(df, noFallback = isSparkVersionGE("3.5"))
111111
}
112112

113113
test("check count distinct with filter") {

0 commit comments

Comments
 (0)