Skip to content

fix: Fix incremental query with full scan mode on MOR tables on Databricks Runtime#18258

Open
yihua wants to merge 2 commits intoapache:branch-0.xfrom
yihua:fix-databricks-incremental-0x
Open

fix: Fix incremental query with full scan mode on MOR tables on Databricks Runtime#18258
yihua wants to merge 2 commits intoapache:branch-0.xfrom
yihua:fix-databricks-incremental-0x

Conversation

@yihua
Copy link
Contributor

@yihua yihua commented Feb 26, 2026

Describe the issue this Pull Request addresses

Cherrypicks #18003 to branch-0.x

Fixes #18002

Hudi Spark integration uses certain Spark APIs through Hudi's SparkAdapter framework. Databricks Spark Runtime has incompatible APIs compared OSS Spark. Particularly, the following Databricks Spark classes cause Hudi's incremental query needing full scan (a particular case) to fail (which invokes HoodieBaseRelation#listLatestFileSlices).

  1. FileStatusWithMetadata#fileStatus does not work in Databricks Runtime
    In OSS Spark:
case class FileStatusWithMetadata(fileStatus: FileStatus, metadata: Map[String, Any] = Map.empty) {
  // Wrapper methods to improve source compatibility in code that still expects a [[FileStatus]].
  def getPath: Path = fileStatus.getPath
  def getLen: Long = fileStatus.getLen
  def getModificationTime: Long = fileStatus.getModificationTime
  def isDirectory: Boolean = fileStatus.isDirectory
}

In Databricks Spark, the type of fileStatus is changed from FileStatus to SerializableFileStatus:

classOf[FileStatusWithMetadata].getDeclaredFields.foreach(println)

private final org.apache.spark.sql.execution.datasources.SerializableFileStatus org.apache.spark.sql.execution.datasources.FileStatusWithMetadata.fileStatus
private final scala.collection.immutable.Map org.apache.spark.sql.execution.datasources.FileStatusWithMetadata.metadata
classOf[FileStatus].isAssignableFrom(classOf[SerializableFileStatus])
res12: Boolean = false

Databricks Spark provides this API to get the FileStatus object from the FileStatusWithMetadata

classOf[FileStatusWithMetadata].getDeclaredMethods
...
public org.apache.hadoop.fs.FileStatus org.apache.spark.sql.execution.datasources.FileStatusWithMetadata.toFileStatus()
  1. PartitionedFile(partitionValues, SparkPath.fromUri(filePath.toUri), start, length) does not work in Databricks Runtime
    In OSS Spark:
case class PartitionedFile(
    partitionValues: InternalRow,
    filePath: SparkPath,
    start: Long,
    length: Long,
    @transient locations: Array[String] = Array.empty,
    modificationTime: Long = 0L,
    fileSize: Long = 0L,
    otherConstantMetadataColumnValues: Map[String, Any] = Map.empty)

In Databricks Spark, the type of locations has changed from Array to Seq:

classOf[PartitionedFile].getDeclaredFields.foreach(println)

private final org.apache.spark.sql.catalyst.InternalRow org.apache.spark.sql.execution.datasources.PartitionedFile.partitionValues
private final org.apache.spark.paths.SparkPath org.apache.spark.sql.execution.datasources.PartitionedFile.filePath
private final long org.apache.spark.sql.execution.datasources.PartitionedFile.start
private final long org.apache.spark.sql.execution.datasources.PartitionedFile.length
private final transient scala.collection.Seq org.apache.spark.sql.execution.datasources.PartitionedFile.locations
private final long org.apache.spark.sql.execution.datasources.PartitionedFile.modificationTime
private final long org.apache.spark.sql.execution.datasources.PartitionedFile.fileSize
private final scala.collection.immutable.Map org.apache.spark.sql.execution.datasources.PartitionedFile.otherConstantMetadataColumnValues
private final scala.collection.mutable.Map org.apache.spark.sql.execution.datasources.PartitionedFile.tags
private scala.Option org.apache.spark.sql.execution.datasources.PartitionedFile.org$apache$spark$sql$execution$datasources$PartitionedFile$$basePathKey
private scala.Option org.apache.spark.sql.execution.datasources.PartitionedFile.org$apache$spark$sql$execution$datasources$PartitionedFile$$basePath
private scala.Option org.apache.spark.sql.execution.datasources.PartitionedFile.org$apache$spark$sql$execution$datasources$PartitionedFile$$rowIndexFilter
private boolean org.apache.spark.sql.execution.datasources.PartitionedFile.useSplittableFileScan

This causes the following exception:

Caused by: java.lang.NoSuchMethodError: 'java.lang.String[] org.apache.spark.sql.execution.datasources.PartitionedFile$.apply$default$5()'
	at org.apache.spark.sql.execution.datasources.HoodieSpark35PartitionedFileUtils$.createPartitionedFile(HoodieSpark35PartitionedFileUtils.scala:45)
	at org.apache.hudi.SparkFileFormatInternalRowReaderContext.getFileRecordIterator(SparkFileFormatInternalRowReaderContext.scala:95)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader.makeBaseFileIterator(HoodieFileGroupReader.java:162)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader.initRecordIterators(HoodieFileGroupReader.java:129)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader.getBufferedRecordIterator(HoodieFileGroupReader.java:291)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader.getClosableIterator(HoodieFileGroupReader.java:300)
	at org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedFileFormat.$anonfun$buildReaderWithPartitionValues$4(HoodieFileGroupReaderBasedFileFormat.scala:273)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:737)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$prepareNextFile$1(FileScanRDD.scala:1084)

Summary and Changelog

This PR adapts the Hudi Spark integration for Databricks Runtime based on the above findings, to fix the incremental query with full scan mode.

Impact

Fixes incremental query with full scan mode.

Risk Level

none

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

…ricks Runtime (apache#18003)

* fix: Fix incremental query with full scan mode on MOR tables on Databricks Runtime

* Fix Spark 4.0
@yihua yihua added this to the release-0.15.1 milestone Feb 26, 2026
@github-actions github-actions bot added the size:S PR with lines of changes in (10, 100] label Feb 26, 2026
@yihua yihua changed the title fix: Fix incremental query with full scan mode on MOR tables on Databricks Runtime (#18003) fix: Fix incremental query with full scan mode on MOR tables on Databricks Runtime Feb 26, 2026
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:S PR with lines of changes in (10, 100]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants