Skip to content

Commit 369e04d

Browse files
committed
Enable V2 streaming for DV tables
This PR adds V2 connector support for streaming reads on tables with deletion vectors: 1. Fix getSnapshotFiles() to include all files in initial snapshot - Use Scan.getScanFiles() directly instead of StreamingHelper.getDataChangeAdd() - Initial snapshot should include all files regardless of dataChange flag 2. Fix DV file path issue by using Path.toString() instead of toUri().toString() - Avoids URL encoding issues that caused FileNotFoundException 3. Add DeltaSourceV2DeletionVectorsSuite for V2 streaming DV tests - Tests use loadStreamWithOptions() for V2 connector routing - Override executeSql() to use V1 for write operations (DELETE not yet supported in V2) 4. Update DeltaSourceDeletionVectorTests to support both V1 and V2 connectors - Add DeltaSourceConnectorTrait for connector abstraction - Add executeSql() hook for V2 write operation workaround
1 parent c9c61a2 commit 369e04d

File tree

5 files changed

+141
-36
lines changed

5 files changed

+141
-36
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta.test
18+
19+
import org.apache.spark.sql.delta.{DeltaSourceDeletionVectorTests, DeltaSourceSuiteBase, PersistentDVEnabled}
20+
import org.apache.spark.sql.delta.sources.DeltaSQLConf
21+
22+
/**
23+
* Test suite that runs DeltaSourceDeletionVectorTests using the V2 connector.
24+
*/
25+
class DeltaSourceV2DeletionVectorsSuite extends DeltaSourceSuiteBase
26+
with DeltaSQLCommandTest
27+
with DeltaSourceDeletionVectorTests
28+
with PersistentDVEnabled
29+
with V2ForceTest {
30+
31+
override protected def useDsv2: Boolean = true
32+
33+
// Override executeSql to use V1 connector for write operations (DELETE/INSERT)
34+
// V2 connector doesn't support write operations yet
35+
override protected def executeSql(sqlText: String): Unit = {
36+
withSQLConf(DeltaSQLConf.V2_ENABLE_MODE.key -> "NONE") {
37+
sql(sqlText)
38+
}
39+
}
40+
41+
private lazy val shouldPassTests = Set(
42+
"allow to delete files before starting a streaming query",
43+
"allow to delete files before staring a streaming query without checkpoint",
44+
"multiple deletion vectors per file with initial snapshot"
45+
)
46+
47+
private lazy val shouldFailTests = Set(
48+
// These tests use ignoreDeletes/ignoreChanges options not yet supported in V2
49+
"deleting files fails query if ignoreDeletes = false",
50+
"allow to delete files after staring a streaming query when ignoreFileDeletion is true",
51+
"allow to delete files after staring a streaming query when ignoreDeletes is true",
52+
"updating the source table causes failure when ignoreChanges = false - using DELETE",
53+
"allow to update the source table when ignoreChanges = true - using DELETE",
54+
"deleting files when ignoreChanges = true doesn't fail the query",
55+
"updating source table when ignoreDeletes = true fails the query - using DELETE",
56+
"subsequent DML commands are processed correctly in a batch - DELETE->DELETE - List()",
57+
"subsequent DML commands are processed correctly in a batch - DELETE->DELETE" +
58+
" - List((ignoreDeletes,true))",
59+
"subsequent DML commands are processed correctly in a batch - DELETE->DELETE" +
60+
" - List((ignoreChanges,true))",
61+
"subsequent DML commands are processed correctly in a batch - DELETE->DELETE" +
62+
" - List((skipChangeCommits,true))",
63+
"subsequent DML commands are processed correctly in a batch - INSERT->DELETE - List()",
64+
"subsequent DML commands are processed correctly in a batch - INSERT->DELETE" +
65+
" - List((ignoreDeletes,true))",
66+
"subsequent DML commands are processed correctly in a batch - INSERT->DELETE" +
67+
" - List((ignoreChanges,true))",
68+
"subsequent DML commands are processed correctly in a batch - INSERT->DELETE" +
69+
" - List((skipChangeCommits,true))",
70+
"multiple deletion vectors per file - List((ignoreFileDeletion,true))",
71+
"multiple deletion vectors per file - List((ignoreChanges,true))"
72+
)
73+
74+
override protected def shouldFail(testName: String): Boolean = {
75+
val inPassList = shouldPassTests.contains(testName)
76+
val inFailList = shouldFailTests.contains(testName)
77+
78+
assert(inPassList || inFailList, s"Test '$testName' not in shouldPassTests or shouldFailTests")
79+
assert(!(inPassList && inFailList),
80+
s"Test '$testName' in both shouldPassTests and shouldFailTests")
81+
82+
inFailList
83+
}
84+
}

spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceDeletionVectorsSuite.scala

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.File
2121
import scala.util.control.NonFatal
2222

2323
import org.apache.spark.sql.delta.Relocated.StreamExecution
24-
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
24+
import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
2525
import org.apache.hadoop.fs.Path
2626
import org.scalatest.concurrent.Eventually
2727
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -30,28 +30,31 @@ import org.apache.spark.sql.streaming.{StreamTest, Trigger}
3030
import org.apache.spark.sql.streaming.util.StreamManualClock
3131

3232
trait DeltaSourceDeletionVectorTests extends StreamTest
33-
with DeletionVectorsTestUtils {
33+
with DeletionVectorsTestUtils
34+
with DeltaSourceConnectorTrait {
35+
self: DeltaSQLTestUtils =>
3436

3537
import testImplicits._
3638

39+
/** Execute SQL statement. Override in V2 tests to use V1 connector for write operations. */
40+
protected def executeSql(sqlText: String): Unit = sql(sqlText)
41+
3742
test("allow to delete files before starting a streaming query") {
3843
withTempDir { inputDir =>
3944
val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI))
4045
(0 until 5).foreach { i =>
4146
val v = Seq(i.toString).toDF
4247
v.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
4348
}
44-
sql(s"DELETE FROM delta.`$inputDir`")
49+
executeSql(s"DELETE FROM delta.`$inputDir`")
4550
(5 until 10).foreach { i =>
4651
val v = Seq(i.toString).toDF
4752
v.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
4853
}
4954
deltaLog.checkpoint()
5055
assert(deltaLog.readLastCheckpointFile().nonEmpty, "this test requires a checkpoint")
5156

52-
val df = spark.readStream
53-
.format("delta")
54-
.load(inputDir.getCanonicalPath)
57+
val df = loadStreamWithOptions(inputDir.getCanonicalPath, Map.empty)
5558

5659
testStream(df)(
5760
AssertOnQuery { q =>
@@ -69,16 +72,14 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
6972
val v = Seq(i.toString).toDF
7073
v.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
7174
}
72-
sql(s"DELETE FROM delta.`$inputDir`")
75+
executeSql(s"DELETE FROM delta.`$inputDir`")
7376
(5 until 7).foreach { i =>
7477
val v = Seq(i.toString).toDF
7578
v.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
7679
}
7780
assert(deltaLog.readLastCheckpointFile().isEmpty, "this test requires no checkpoint")
7881

79-
val df = spark.readStream
80-
.format("delta")
81-
.load(inputDir.getCanonicalPath)
82+
val df = loadStreamWithOptions(inputDir.getCanonicalPath, Map.empty)
8283

8384
testStream(df)(
8485
AssertOnQuery { q =>
@@ -115,7 +116,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
115116
Seq(i, i + 1).toDF().coalesce(1).write.format("delta").mode("append").save(inputDir)
116117
}
117118

118-
val df = spark.readStream.format("delta").options(sourceOptions.toMap).load(inputDir)
119+
val df = loadStreamWithOptions(inputDir, sourceOptions.toMap)
119120
val expectDVs = commandShouldProduceDVs.getOrElse(
120121
sqlCommand.toUpperCase().startsWith("DELETE"))
121122

@@ -126,7 +127,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
126127
},
127128
CheckAnswer((0 until 10): _*),
128129
AssertOnQuery { q =>
129-
sql(sqlCommand)
130+
executeSql(sqlCommand)
130131
deletionVectorsPresentIfExpected(inputDir, expectDVs)
131132
})
132133

@@ -148,7 +149,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
148149
}
149150
val log = DeltaLog.forTable(spark, inputDir)
150151
val commitVersionBeforeDML = log.update().version
151-
val df = spark.readStream.format("delta").options(sourceOptions.toMap).load(inputDir)
152+
val df = loadStreamWithOptions(inputDir, sourceOptions.toMap)
152153
def expectDVsInCommand(shouldProduceDVs: Option[Boolean], command: String): Boolean = {
153154
shouldProduceDVs.getOrElse(command.toUpperCase().startsWith("DELETE"))
154155
}
@@ -177,11 +178,11 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
177178
true
178179
},
179180
AssertOnQuery { q =>
180-
sql(sqlCommand1)
181+
executeSql(sqlCommand1)
181182
deletionVectorsPresentIfExpected(inputDir, expectDVsInCommand1)
182183
},
183184
AssertOnQuery { q =>
184-
sql(sqlCommand2)
185+
executeSql(sqlCommand2)
185186
deletionVectorsPresentIfExpected(inputDir, expectDVsInCommand2)
186187
},
187188
AssertOnQuery { q =>
@@ -416,21 +417,19 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
416417
(0 until 10).toDF("value").coalesce(1).write.format("delta").save(path)
417418

418419
// V1: Delete row 0
419-
sql(s"DELETE FROM delta.`$path` WHERE value = 0")
420+
executeSql(s"DELETE FROM delta.`$path` WHERE value = 0")
420421

421422
// V2: Delete row 1
422-
sql(s"DELETE FROM delta.`$path` WHERE value = 1")
423+
executeSql(s"DELETE FROM delta.`$path` WHERE value = 1")
423424

424425
// V3: Delete row 2
425-
sql(s"DELETE FROM delta.`$path` WHERE value = 2")
426+
executeSql(s"DELETE FROM delta.`$path` WHERE value = 2")
426427

427428
// Verify DVs are present
428429
assert(getFilesWithDeletionVectors(deltaLog).nonEmpty,
429430
"This test requires deletion vectors to be present")
430431

431-
val df = spark.readStream
432-
.format("delta")
433-
.load(path)
432+
val df = loadStreamWithOptions(path, Map.empty)
434433

435434
testStream(df)(
436435
// Process the initial snapshot
@@ -457,10 +456,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
457456
// V0: 10 rows in a single file
458457
(0 until 10).toDF("value").coalesce(1).write.format("delta").save(path)
459458

460-
val df = spark.readStream
461-
.format("delta")
462-
.options(sourceOptions.toMap)
463-
.load(path)
459+
val df = loadStreamWithOptions(path, sourceOptions.toMap)
464460

465461
testStream(df)(
466462
AssertOnQuery { q =>
@@ -470,12 +466,12 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
470466
CheckAnswer((0 until 10): _*),
471467
AssertOnQuery { q =>
472468
// V1: Delete row 0 - creates first DV (version 1)
473-
sql(s"DELETE FROM delta.`$path` WHERE value = 0")
469+
executeSql(s"DELETE FROM delta.`$path` WHERE value = 0")
474470
true
475471
},
476472
AssertOnQuery { q =>
477473
// V2: Delete row 1 - updates DV (version 2). DV is cumulative: {0, 1}
478-
sql(s"DELETE FROM delta.`$path` WHERE value = 1")
474+
executeSql(s"DELETE FROM delta.`$path` WHERE value = 1")
479475
true
480476
},
481477
AssertOnQuery { q =>

spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.delta.kernel.Snapshot;
2525
import io.delta.kernel.data.ColumnarBatch;
2626
import io.delta.kernel.data.FilteredColumnarBatch;
27+
import io.delta.kernel.data.Row;
2728
import io.delta.kernel.defaults.engine.DefaultEngine;
2829
import io.delta.kernel.engine.Engine;
2930
import io.delta.kernel.exceptions.UnsupportedTableFeatureException;
@@ -439,7 +440,9 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) {
439440
Seq<FilePartition> filePartitions =
440441
FilePartition$.MODULE$.getFilePartitions(
441442
spark, JavaConverters.asScalaBuffer(partitionedFiles).toSeq(), maxSplitBytes);
442-
return JavaConverters.seqAsJavaList(filePartitions).toArray(new InputPartition[0]);
443+
InputPartition[] result =
444+
JavaConverters.seqAsJavaList(filePartitions).toArray(new InputPartition[0]);
445+
return result;
443446
}
444447

445448
@Override
@@ -982,9 +985,15 @@ private CloseableIterator<IndexedFile> getSnapshotFiles(long version) {
982985
try (CloseableIterator<FilteredColumnarBatch> filesIter = scan.getScanFiles(engine)) {
983986
while (filesIter.hasNext()) {
984987
FilteredColumnarBatch filteredBatch = filesIter.next();
985-
ColumnarBatch batch = filteredBatch.getData();
986-
for (int rowId = 0; rowId < batch.getSize(); rowId++) {
987-
StreamingHelper.getDataChangeAdd(batch, rowId).ifPresent(addFiles::add);
988+
// getScanFiles returns rows with schema {add: struct, tableRoot: string}
989+
// Extract AddFile directly from each row
990+
try (CloseableIterator<Row> rowIter = filteredBatch.getRows()) {
991+
while (rowIter.hasNext()) {
992+
Row scanFileRow = rowIter.next();
993+
// addFile struct is at index 0 in scan file schema
994+
Row addFileRow = scanFileRow.getStruct(0);
995+
addFiles.add(new AddFile(addFileRow));
996+
}
988997
}
989998
}
990999
} catch (IOException e) {

spark/v2/src/main/java/io/delta/spark/internal/v2/utils/PartitionUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,9 @@ public static PartitionReaderFactory createDeltaParquetReaderFactory(
193193
SnapshotImpl snapshotImpl = (SnapshotImpl) snapshot;
194194
Protocol protocol = snapshotImpl.getProtocol();
195195
Metadata metadata = snapshotImpl.getMetadata();
196-
String tablePath = snapshotImpl.getDataPath().toUri().toString();
196+
// Use Path.toString() instead of toUri().toString() to avoid URL encoding issues
197+
// This matches V1 connector behavior in PreprocessTableWithDVs.scala
198+
String tablePath = snapshotImpl.getDataPath().toString();
197199

198200
// Create DV schema context if table supports deletion vectors
199201
boolean isTableSupportDv =

spark/v2/src/test/java/io/delta/spark/internal/v2/read/deletionvector/DvSchemaContextTest.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
import java.util.Arrays;
2121
import java.util.List;
2222
import java.util.stream.Collectors;
23+
import java.util.stream.Stream;
2324
import org.apache.spark.sql.delta.DeltaParquetFileFormat;
2425
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat;
2526
import org.apache.spark.sql.types.DataTypes;
2627
import org.apache.spark.sql.types.StructType;
2728
import org.junit.jupiter.api.Test;
2829
import org.junit.jupiter.params.ParameterizedTest;
29-
import org.junit.jupiter.params.provider.CsvSource;
30+
import org.junit.jupiter.params.provider.Arguments;
31+
import org.junit.jupiter.params.provider.MethodSource;
3032

3133
public class DvSchemaContextTest {
3234

@@ -36,8 +38,12 @@ public class DvSchemaContextTest {
3638
private static final StructType PARTITION_SCHEMA =
3739
new StructType().add("date", DataTypes.StringType);
3840

41+
static Stream<Arguments> schemaWithDvColumnArgs() {
42+
return Stream.of(Arguments.of(false, 3, 2), Arguments.of(true, 4, 3));
43+
}
44+
3945
@ParameterizedTest(name = "useMetadataRowIndex={0}")
40-
@CsvSource({"false, 3, 2", "true, 4, 3"})
46+
@MethodSource("schemaWithDvColumnArgs")
4147
void testSchemaWithDvColumn(
4248
boolean useMetadataRowIndex, int expectedFieldCount, int expectedDvIndex) {
4349
DvSchemaContext context =
@@ -57,16 +63,24 @@ void testSchemaWithDvColumn(
5763
schemaWithDv.fields()[expectedDvIndex].name());
5864
}
5965

66+
static Stream<Arguments> inputColumnCountArgs() {
67+
return Stream.of(Arguments.of(false, 4), Arguments.of(true, 5));
68+
}
69+
6070
@ParameterizedTest(name = "useMetadataRowIndex={0}")
61-
@CsvSource({"false, 4", "true, 5"})
71+
@MethodSource("inputColumnCountArgs")
6272
void testInputColumnCount(boolean useMetadataRowIndex, int expectedCount) {
6373
DvSchemaContext context =
6474
new DvSchemaContext(DATA_SCHEMA, PARTITION_SCHEMA, useMetadataRowIndex);
6575
assertEquals(expectedCount, context.getInputColumnCount());
6676
}
6777

78+
static Stream<Arguments> outputColumnOrdinalsArgs() {
79+
return Stream.of(Arguments.of(false, "0,1,3"), Arguments.of(true, "0,1,4"));
80+
}
81+
6882
@ParameterizedTest(name = "useMetadataRowIndex={0}")
69-
@CsvSource({"false, '0,1,3'", "true, '0,1,4'"})
83+
@MethodSource("outputColumnOrdinalsArgs")
7084
void testOutputColumnOrdinals(boolean useMetadataRowIndex, String expectedOrdinalsStr) {
7185
DvSchemaContext context =
7286
new DvSchemaContext(DATA_SCHEMA, PARTITION_SCHEMA, useMetadataRowIndex);

0 commit comments

Comments
 (0)