Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

from_json function parses a column containing an empty array, throws an exception. #10907

Closed
Feng-Jiang28 opened this issue May 27, 2024 · 4 comments
Assignees
Labels
bug Something isn't working

Comments

@Feng-Jiang28
Copy link
Contributor

Feng-Jiang28 commented May 27, 2024

from_json function parses a column containing an empty array, throws an exception.

CPU:


scala> import org.apache.spark.sql.types.{IntegerType, StructType}
scala> val df = Seq("""[]""").toDS()
scala> df.write.mode("OVERWRITE").parquet("TEMP")                                                                     
scala> val df2 = spark.read.parquet("TEMP")
scala> val schema = new StructType().add("a", IntegerType)
scala> var parsed = df2.select(from_json($"value", schema))
scala> parsed.show()
+----------------+                                                              
|from_json(value)|
+----------------+
|          {null}|
+----------------+

GPU:

$ $SPARK_HOME/bin/spark-shell --master local[*] --jars ${SPARK_RAPIDS_PLUGIN_JAR} 
--conf spark.plugins=com.nvidia.spark.SQLPlugin 
--conf spark.rapids.sql.enabled=true 
--conf spark.rapids.sql.explain=ALL --driver-java-options '-ea -Duser.timezone=UTC ' 
--conf spark.rapids.sql.expression.JsonTuple=true 
--conf spark.rapids.sql.expression.GetJsonObject=true 
--conf spark.rapids.sql.expression.JsonToStructs=true 
--conf spark.rapids.sql.expression.StructsToJson=true
scala> import org.apache.spark.sql.types.{IntegerType, StructType}
scala> val df = Seq("""[]""").toDS()
scala> df.write.mode("OVERWRITE").parquet("TEMP")
24/05/27 10:39:59 WARN GpuOverrides: 
*Exec <DataWritingCommandExec> will run on GPU
  *Output <InsertIntoHadoopFsRelationCommand> will run on GPU
  ! <LocalTableScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.LocalTableScanExec
    @Expression <AttributeReference> value#1 could run on GPU


scala> val df2 = spark.read.parquet("TEMP")
scala> val schema = new StructType().add("a", IntegerType)
scala> var parsed = df2.select(from_json($"value", schema))
scala> parsed.show()
24/05/27 10:40:02 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> cast(from_json(StructField(a,IntegerType,true), value#5, Some(UTC)) as string) AS from_json(value)#11 will run on GPU
      *Expression <Cast> cast(from_json(StructField(a,IntegerType,true), value#5, Some(UTC)) as string) will run on GPU
        *Expression <JsonToStructs> from_json(StructField(a,IntegerType,true), value#5, Some(UTC)) will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

24/05/27 10:40:02 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.IllegalStateException: No empty row count provided and the table read has no row count or columns
	at ai.rapids.cudf.Table.gatherJSONColumns(Table.java:1204)
	at ai.rapids.cudf.Table.readJSON(Table.java:1446)
	at ai.rapids.cudf.Table.readJSON(Table.java:1428)
	at org.apache.spark.sql.rapids.GpuJsonToStructs.$anonfun$doColumnar$2(GpuJsonToStructs.scala:179)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at org.apache.spark.sql.rapids.GpuJsonToStructs.$anonfun$doColumnar$1(GpuJsonToStructs.scala:177)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at org.apache.spark.sql.rapids.GpuJsonToStructs.doColumnar(GpuJsonToStructs.scala:175)
	at com.nvidia.spark.rapids.GpuUnaryExpression.doItColumnar(GpuExpressions.scala:250)
	at com.nvidia.spark.rapids.GpuUnaryExpression.$anonfun$columnarEval$1(GpuExpressions.scala:261)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.GpuUnaryExpression.columnarEval(GpuExpressions.scala:260)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:35)
	at com.nvidia.spark.rapids.GpuUnaryExpression.columnarEval(GpuExpressions.scala:260)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:35)
	at com.nvidia.spark.rapids.GpuAlias.columnarEval(namedExpressions.scala:110)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:35)
	at com.nvidia.spark.rapids.GpuProjectExec$.$anonfun$project$1(basicPhysicalOperators.scala:110)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1(implicits.scala:221)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1$adapted(implicits.scala:218)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.safeMap(implicits.scala:218)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$AutoCloseableProducingSeq.safeMap(implicits.scala:253)
	at com.nvidia.spark.rapids.GpuProjectExec$.project(basicPhysicalOperators.scala:110)
	at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$project$2(basicPhysicalOperators.scala:619)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.GpuTieredProject.recurse$2(basicPhysicalOperators.scala:618)
	at com.nvidia.spark.rapids.GpuTieredProject.project(basicPhysicalOperators.scala:631)
	at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$5(basicPhysicalOperators.scala:567)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$.withRestoreOnRetry(RmmRapidsRetryIterator.scala:272)
	at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$4(basicPhysicalOperators.scala:567)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$3(basicPhysicalOperators.scala:565)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$NoInputSpliterator.next(RmmRapidsRetryIterator.scala:395)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryIterator.next(RmmRapidsRetryIterator.scala:6
@Feng-Jiang28 Feng-Jiang28 changed the title from_json - input=empty array, schema=struct, output=single row with null from_json function parses a column containing an empty array, throws an exception. May 27, 2024
@revans2 revans2 self-assigned this May 28, 2024
@revans2 revans2 added bug Something isn't working ? - Needs Triage Need team to review and classify labels May 28, 2024
@mattahrens mattahrens assigned Feng-Jiang28 and unassigned revans2 May 28, 2024
@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label May 28, 2024
@sameerz
Copy link
Collaborator

sameerz commented Nov 27, 2024

Related to #11717

@nartal1
Copy link
Collaborator

nartal1 commented Dec 9, 2024

@Feng-Jiang28 - I don't see the exception anymore. Fixed with this PR -#11618.
Is it okay to close this?

scala> parsed.show()
24/12/09 19:32:05 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> cast(from_json(StructField(a,IntegerType,true), value#5, Some(UTC)) as string) AS from_json(value)#11 will run on GPU
      *Expression <Cast> cast(from_json(StructField(a,IntegerType,true), value#5, Some(UTC)) as string) will run on GPU
        *Expression <JsonToStructs> from_json(StructField(a,IntegerType,true), value#5, Some(UTC)) will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

24/12/09 19:32:05 WARN MultiFileReaderThreadPool: Configuring the file reader thread pool with a max of 64 threads instead of spark.rapids.sql.multiThreadedRead.numThreads = 20
+----------------+
|from_json(value)|
+----------------+
|          {null}|
+----------------+

@Feng-Jiang28
Copy link
Contributor Author

@nartal1 Thank you!

@Feng-Jiang28
Copy link
Contributor Author

Fixed with this PR -#11618.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants