Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Use of columnar table cache causes error when data emitted from the cached plan is in vanilla columnar format #8497

Open
zhztheplayer opened this issue Jan 10, 2025 · 0 comments
Labels
bug Something isn't working triage

Comments

@zhztheplayer
Copy link
Member

zhztheplayer commented Jan 10, 2025

Edit: Plan to fix the issue:

  1. Add test case [GLUTEN-8497][VL] A bad test case that fails columnar table cache query #8498
  2. Add a query planer hook similar to AdaptiveContext.isAdaptiveContext, to determine whether the Gluten columnar rule is called for columnar table cache's inner query plan's optimization [GLUTEN-8497][CORE] A unified CallInfo API to replace AdaptiveContext #8551
  3. Unify the hook API for columnar table cache and AQE [GLUTEN-8497][CORE] A unified CallInfo API to replace AdaptiveContext #8551
  4. Fix test cases using the new hook API. Add a rule to make sure the output convention of the query plan conform to vanilla Spark's table cache optimization code. Specifically:
    • For query plan without a topmost ColumnarToRow, keep it as is;
    • For query plan with a topmost ColumnarToRow (which will be removed by Spark' code convertToColumnarIfPossible) and the second node is not in Velox columnar fomat, compare the costs between the following 2 query plan candidates, then choose the cheaper one:
      • The input plan unchanged, with a top fake unary node that prevents columnar cache to be used;
      • Velox columnar plan + a fake C2R to be removed;

The current code for determining whether to use columnar-based cache doesn't cover all possible cases. For example, the case when a vanilla Spark columnar scan fell back by unsupported data type.

If the case is met, the following error will be thrown:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 10) (55f8ad28c275 executor driver): java.lang.IllegalStateException: Heavy batch should consist of arrow vectors
	at org.apache.gluten.columnarbatch.ColumnarBatches.identifyBatchType(ColumnarBatches.java:81)
	at org.apache.gluten.columnarbatch.ColumnarBatches.isLightBatch(ColumnarBatches.java:99)
	at org.apache.gluten.columnarbatch.ColumnarBatches.ensureOffloaded(ColumnarBatches.java:134)
	at org.apache.gluten.columnarbatch.VeloxColumnarBatches.ensureVeloxBatch(VeloxColumnarBatches.java:104)
	at org.apache.spark.sql.execution.ColumnarCachedBatchSerializer.$anonfun$convertColumnarBatchToCachedBatch$2(ColumnarCachedBatchSerializer.scala:177)
	at scala.collection.Iterator$$anon$9.next(Iterator.scala:577)
	at org.apache.spark.sql.execution.ColumnarCachedBatchSerializer$$anon$1.next(ColumnarCachedBatchSerializer.scala:183)
	at org.apache.spark.sql.execution.ColumnarCachedBatchSerializer$$anon$1.next(ColumnarCachedBatchSerializer.scala:179)
	at scala.collection.Iterator$$anon$9.next(Iterator.scala:577)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1531)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1458)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1522)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:378)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage
Projects
None yet
Development

No branches or pull requests

1 participant