Skip to content

Conversation

@huan233usc
Copy link
Collaborator

@huan233usc huan233usc commented Jan 19, 2026

🥞 Stacked PR

Use this link to review incremental changes.


This PR adds V2 connector support for streaming reads on tables with
deletion vectors:

  1. Fix getSnapshotFiles() to include all files in initial snapshot

    • Use Scan.getScanFiles() directly instead of StreamingHelper.getDataChangeAdd()
    • Initial snapshot should include all files regardless of dataChange flag
  2. Fix DV file path issue by using Path.toString() instead of toUri().toString()

    • Avoids URL encoding issues that caused FileNotFoundException
  3. Add DeltaSourceV2DeletionVectorsSuite for V2 streaming DV tests

    • Tests use loadStreamWithOptions() for V2 connector routing
    • Override executeSql() to use V1 for write operations (DELETE not yet supported in V2)
  4. Update DeltaSourceDeletionVectorTests to support both V1 and V2 connectors

    • Add DeltaSourceConnectorTrait for connector abstraction
    • Add executeSql() hook for V2 write operation workaround

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

How was this patch tested?

Does this PR introduce any user-facing changes?

@huan233usc huan233usc force-pushed the stack/dv_pr5_streaming_support branch 12 times, most recently from 7ef322f to b61ab49 Compare January 21, 2026 01:15
huan233usc added a commit that referenced this pull request Jan 21, 2026
…t useMetadataRowIndex control (#5773)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5773/files) to
review incremental changes.
-
[**stack/dv_pr1_refactor_parquet_format**](#5773)
[[Files changed](https://github.com/delta-io/delta/pull/5773/files)]
-
[stack/dv_pr2_phase1_basic_read](#5774)
[[Files
changed](https://github.com/delta-io/delta/pull/5774/files/13e4002784c6078d00c6532055a65859439c815b..46571bc81d22af165f6dc4e6e9298b7f45a3d4a8)]
-
[stack/dv_pr3_phase2_vectorized](#5775)
[[Files
changed](https://github.com/delta-io/delta/pull/5775/files/46571bc81d22af165f6dc4e6e9298b7f45a3d4a8..cefbfb57e45c4dc396de6eacd05f490293032fdf)]
-
[stack/dv_pr4_phase3_file_splitting](#5776)
[[Files
changed](https://github.com/delta-io/delta/pull/5776/files/cefbfb57e45c4dc396de6eacd05f490293032fdf..68bf7c96e58ee479d40d222baae02c8dceb8548d)]
-
[stack/dv_pr5_streaming_support](#5877)
[[Files
changed](https://github.com/delta-io/delta/pull/5877/files/68bf7c96e58ee479d40d222baae02c8dceb8548d..b61ab4940d6375165fee5597ee8d6ce78702f6e0)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [x] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

Refactor `DeltaParquetFileFormat` to support explicit control of
`useMetadataRowIndex` for the V2 connector.

### Changes:
- Add `useMetadataRowIndexOpt: Option[Boolean]` parameter to
`DeltaParquetFileFormatBase`
- V1 connector: Captures config value at construction time
- V2 connector: Explicitly passes the value per-scan via
`DeltaParquetFileFormatV2`

This is a pure refactor with no behavioral changes for V1. It enables
the V2 connector to control `_metadata.row_index` usage independently.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
Existing V1 tests pass (no behavior change)
## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No
vitaliili-db and others added 13 commits January 21, 2026 18:21
…Utils (delta-io#5818)

This PR consolidates the V2 connector configuration and catalog utility
classes:

1. Move V2_ENABLE_MODE from DeltaSQLConfV2 to DeltaSQLConf
   - Deleted spark/v2/.../DeltaSQLConfV2.scala
   - Added V2_ENABLE_MODE config to DeltaSQLConf
   - Updated all references in DeltaCatalog and tests

2. Move CatalogTableUtils from v2 module to spark module
   - Created spark/.../util/CatalogTableUtils.scala (Scala version)
   - Created spark/.../util/CatalogTableUtilsSuite.scala (Scala tests)
   - Uses hardcoded feature keys (no Kernel dependency in spark module)
   - All 10 tests pass

This reduces module coupling by centralizing the config in the main
spark module where it's primarily used.

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
…ta-io#5651)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [x] Kernel
- [ ] Other (fill in here)

Implements initial snapshot support for Delta streaming in Kernel-Spark
(DSv2). Users can now start streaming queries without specifying a
`startingVersion` option, and the stream will:
  1. Read all existing data from the current snapshot
  2. Continue processing new commits as they arrive

  This brings DSv2 closer to feature parity with DSv1 DeltaSource.

The focus of this PR is correctness; not performance or scalability. Key
follow-up TODOs:
1. Memory Protection (TODO delta-io#5318) for tables with a large number of
files.
2. Caching (TODO delta-io#5318)

<!--
- Describe what this PR changes.
- Describe why we need the change.

If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

Unit tests (`SparkMicroBatchStreamTest`) and E2E tests
(`DeltaSourceDSv2Suite`).

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

No.
<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
#### Which Delta project/connector is this regarding?

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

Since we already merged the PR (
delta-io#5843 ) , currently we can re-add
the spark `4.2.0-SNAPSHOT` master branch to the delta again.

Let's see if any more CI will be broken. 

## How was this patch tested?

Everything will be okay if github ci says good. 

## Does this PR introduce _any_ user-facing changes?

No.

---------

Signed-off-by: openinx <openinx@gmail.com>
…-io#5813)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [x] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
This PR introduce the basic build support into `build.sbt` for Flink
connector.
It also introduce multiple interface classes that Flink connector can
use to access Delta Kernel.
<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?
UT
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?
No
<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
delta-io#5813)" (delta-io#5868)

This reverts commit 52bc9d2.

#### Which Delta project/connector is this regarding?
- [ ] Spark
- [ ] Standalone
- [X] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

Reverting 52bc9d2. Want it merged with
all the right approvals.

## How was this patch tested?

Just a revert.

## Does this PR introduce _any_ user-facing changes?

No.
This PR implements Phase 1 of DV support in the V2 connector with the following features:

1. DV metadata in PartitionedFile:
   - Convert Kernel DeletionVectorDescriptor to Spark format
   - Store serialized DV in otherConstantMetadataColumnValues

2. Schema augmentation:
   - Add __delta_internal_is_row_deleted column when table has DVs
   - DeltaParquetFileFormat generates this column using DV bitmap

3. DV filtering in connector:
   - DVFilteringIterator filters rows where is_row_deleted != 0
   - Projects out the DV column from output (clean data to Spark)

4. Phase 1 limitations (to be addressed in later phases):
   - Vectorized reader disabled (Phase 2 will add ColumnarBatch support)
   - File splitting disabled (Phase 3 will use _metadata.row_index)

The connector returns clean, already-filtered data - Spark plans never see DV columns.
@huan233usc huan233usc force-pushed the stack/dv_pr5_streaming_support branch 8 times, most recently from 59c1402 to 4253ce1 Compare January 22, 2026 01:41
This PR adds V2 connector support for streaming reads on tables with
deletion vectors:

1. Fix getSnapshotFiles() to include all files in initial snapshot
   - Use Scan.getScanFiles() directly instead of StreamingHelper.getDataChangeAdd()
   - Initial snapshot should include all files regardless of dataChange flag

2. Fix DV file path issue by using Path.toString() instead of toUri().toString()
   - Avoids URL encoding issues that caused FileNotFoundException

3. Add DeltaSourceV2DeletionVectorsSuite for V2 streaming DV tests
   - Tests use loadStreamWithOptions() for V2 connector routing
   - Override executeSql() to use V1 for write operations (DELETE not yet supported in V2)

4. Update DeltaSourceDeletionVectorTests to support both V1 and V2 connectors
   - Add DeltaSourceConnectorTrait for connector abstraction
   - Add executeSql() hook for V2 write operation workaround
@huan233usc huan233usc force-pushed the stack/dv_pr5_streaming_support branch from 4253ce1 to 369e04d Compare January 22, 2026 03:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants