Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-8497][VL] A bad test case that fails columnar table cache query #8498

Merged
merged 7 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.types.{LongType, Metadata, MetadataBuilder, StructType}
import org.apache.spark.storage.StorageLevel

import scala.collection.JavaConverters._

class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper {
override protected val resourcePath: String = "/tpch-data-parquet"
override protected val fileFormat: String = "parquet"
Expand Down Expand Up @@ -55,7 +58,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt
)
}

test("input columnar batch") {
test("Input columnar batch") {
TPCHTables.map(_.name).foreach {
table =>
runQueryAndCompare(s"SELECT * FROM $table", cache = true) {
Expand All @@ -64,7 +67,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt
}
}

test("input columnar batch and column pruning") {
test("Input columnar batch and column pruning") {
val expected = sql("SELECT l_partkey FROM lineitem").collect()
val cached = sql("SELECT * FROM lineitem").cache()
try {
Expand All @@ -85,7 +88,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt
}
}

test("input vanilla Spark columnar batch") {
test("Input vanilla Spark columnar batch") {
withSQLConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key -> "false") {
val df = spark.table("lineitem")
val expected = df.collect()
Expand All @@ -98,6 +101,40 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt
}
}

// TODO: Fix this case. See https://github.com/apache/incubator-gluten/issues/8497.
testWithSpecifiedSparkVersion("Input fallen back vanilla Spark columnar scan", Some("3.3")) {
def withId(id: Int): Metadata =
new MetadataBuilder().putLong("parquet.field.id", id).build()

withTempDir {
dir =>
val readSchema =
new StructType()
.add("l_orderkey_read", LongType, true, withId(1))
val writeSchema =
new StructType()
.add("l_orderkey_write", LongType, true, withId(1))
withSQLConf("spark.sql.parquet.fieldId.read.enabled" -> "true") {
// Write a table with metadata information that Gluten Velox backend doesn't support,
// to emulate the scenario that a Spark columnar scan is not offload-able so fallen back,
// then user tries to cache it.
spark
.createDataFrame(
spark.sql("select l_orderkey from lineitem").collect().toList.asJava,
writeSchema)
.write
.mode("overwrite")
.parquet(dir.getCanonicalPath)
val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath)
df.cache()
// FIXME: The following call will throw since ColumnarCachedBatchSerializer will be
// confused by the input vanilla Parquet scan when its #convertColumnarBatchToCachedBatch
// method is called.
assertThrows[Exception](df.collect())
}
}
}

test("CachedColumnarBatch serialize and deserialize") {
val df = spark.table("lineitem")
val expected = df.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ object TransitionGraph {
}
}

// TODO: Consolidate transition graph's cost model with RAS cost model.
private object TransitionCostModel extends FloydWarshallGraph.CostModel[Transition] {
override def zero(): TransitionCost = TransitionCost(0, Nil)
override def costOf(transition: Transition): TransitionCost = {
Expand Down
Loading