Skip to content

Commit

Permalink
API, Core: Enable dropping rewritten delete files in RowDelta
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Sep 18, 2024
1 parent bbeadea commit da1af21
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 0 deletions.
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/RowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ public interface RowDelta extends SnapshotUpdate<RowDelta> {
*/
RowDelta addDeletes(DeleteFile deletes);

/**
* Drop a rewritten {@link DeleteFile} from the table.
*
* @param deletes a rewritten delete file that can be removed from the table
* @return this for method chaining
*/
default RowDelta dropRewrittenDeletes(DeleteFile deletes) {
throw new UnsupportedOperationException(
getClass().getName() + " does not implement dropRewrittenDeletes");
}

/**
* Set the snapshot ID used in any reads for this operation.
*
Expand Down
130 changes: 130 additions & 0 deletions core/src/jmh/java/org/apache/iceberg/ReplaceDeleteFilesBenchmark.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import static org.apache.iceberg.types.Types.NestedField.required;

import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Timeout;
import org.openjdk.jmh.annotations.Warmup;

/**
* A benchmark that evaluates the performance of replacing delete files in the table.
*
* <p>To run this benchmark: <code>
* ./gradlew :iceberg-core:jmh
* -PjmhIncludeRegex=ReplaceDeleteFilesBenchmark
* -PjmhOutputPath=benchmark/replace-delete-files-benchmark.txt
* </code>
*/
@Fork(1)
@State(Scope.Benchmark)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.SingleShotTime)
@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
public class ReplaceDeleteFilesBenchmark {

private static final String TABLE_IDENT = "tbl";
private static final Schema SCHEMA =
new Schema(
required(1, "int_col", Types.IntegerType.get()),
required(2, "long_col", Types.LongType.get()),
required(3, "decimal_col", Types.DecimalType.of(10, 10)),
required(4, "date_col", Types.DateType.get()),
required(5, "timestamp_col", Types.TimestampType.withoutZone()),
required(6, "timestamp_tz_col", Types.TimestampType.withZone()),
required(7, "str_col", Types.StringType.get()));
private static final PartitionSpec SPEC = PartitionSpec.unpartitioned();
private static final HadoopTables TABLES = new HadoopTables();

private Table table;
private List<DeleteFile> deleteFiles;
private List<DeleteFile> pendingDeleteFiles;

@Param({"50000", "100000", "500000", "1000000", "2500000"})
private int numFiles;

@Setup
public void setupBenchmark() {
initTable();
initFiles();
}

@TearDown
public void tearDownBenchmark() {
dropTable();
}

@Benchmark
@Threads(1)
public void replaceDeleteFiles() {
RowDelta rowDelta = table.newRowDelta();
deleteFiles.forEach(rowDelta::dropRewrittenDeletes);
pendingDeleteFiles.forEach(rowDelta::addDeletes);
rowDelta.commit();
}

private void initTable() {
this.table = TABLES.create(SCHEMA, SPEC, TABLE_IDENT);
}

private void dropTable() {
TABLES.dropTable(TABLE_IDENT);
}

private void initFiles() {
List<DeleteFile> generatedDeleteFiles = Lists.newArrayListWithExpectedSize(numFiles);
List<DeleteFile> generatedPendingDeleteFiles = Lists.newArrayListWithExpectedSize(numFiles);

RowDelta rowDelta = table.newRowDelta();

for (int ordinal = 0; ordinal < numFiles; ordinal++) {
DataFile dataFile = FileGenerationUtil.generateDataFile(table, null);
rowDelta.addRows(dataFile);

DeleteFile deleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
rowDelta.addDeletes(deleteFile);
generatedDeleteFiles.add(deleteFile);

DeleteFile pendingDeleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
generatedPendingDeleteFiles.add(pendingDeleteFile);
}

rowDelta.commit();

this.deleteFiles = generatedDeleteFiles;
this.pendingDeleteFiles = generatedPendingDeleteFiles;
}
}
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ public RowDelta addDeletes(DeleteFile deletes) {
return this;
}

@Override
public RowDelta dropRewrittenDeletes(DeleteFile deletes) {
delete(deletes);
return this;
}

@Override
public RowDelta validateFromSnapshot(long snapshotId) {
this.startingSnapshotId = snapshotId;
Expand Down
147 changes: 147 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -1409,4 +1409,151 @@ public void testRowDeltaCaseSensitivity() {
.isInstanceOf(ValidationException.class)
.hasMessageStartingWith("Found new conflicting delete files");
}

@TestTemplate
public void testRewrittenDeleteFiles() {
DataFile dataFile = newDataFile("data_bucket=0");
DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0");
RowDelta baseRowDelta = table.newRowDelta().addRows(dataFile).addDeletes(deleteFile);
Snapshot baseSnapshot = commit(table, baseRowDelta, branch);
assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE);

DeleteFile newDeleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0");
RowDelta rowDelta =
table
.newRowDelta()
.dropRewrittenDeletes(deleteFile)
.addDeletes(newDeleteFile)
.validateFromSnapshot(baseSnapshot.snapshotId());
Snapshot snapshot = commit(table, rowDelta, branch);
assertThat(snapshot.operation()).isEqualTo(DataOperations.DELETE);

List<ManifestFile> dataManifests = snapshot.dataManifests(table.io());
assertThat(dataManifests).hasSize(1);
validateManifest(
dataManifests.get(0),
dataSeqs(1L),
fileSeqs(1L),
ids(baseSnapshot.snapshotId()),
files(dataFile),
statuses(Status.ADDED));

List<ManifestFile> deleteManifests = snapshot.deleteManifests(table.io());
assertThat(deleteManifests).hasSize(2);
validateDeleteManifest(
deleteManifests.get(0),
dataSeqs(2L),
fileSeqs(2L),
ids(snapshot.snapshotId()),
files(newDeleteFile),
statuses(Status.ADDED));
validateDeleteManifest(
deleteManifests.get(1),
dataSeqs(1L),
fileSeqs(1L),
ids(snapshot.snapshotId()),
files(deleteFile),
statuses(Status.DELETED));
}

@TestTemplate
public void testConcurrentDeletesRewriteSameDeleteFile() {
DataFile dataFile = newDataFile("data_bucket=0");
DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0");
RowDelta baseRowDelta = table.newRowDelta().addRows(dataFile).addDeletes(deleteFile);
Snapshot baseSnapshot = commit(table, baseRowDelta, branch);
assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE);

// commit the first DELETE operation that replaces `deleteFile`
DeleteFile newDeleteFile1 = newDeleteFile(dataFile.specId(), "data_bucket=0");
RowDelta delete1 =
table
.newRowDelta()
.addDeletes(newDeleteFile1)
.dropRewrittenDeletes(deleteFile)
.validateFromSnapshot(baseSnapshot.snapshotId())
.validateNoConflictingDataFiles();
Snapshot snapshot1 = commit(table, delete1, branch);
assertThat(snapshot1.operation()).isEqualTo(DataOperations.DELETE);
assertThat(snapshot1.sequenceNumber()).isEqualTo(2L);

// commit the second DELETE operation that replaces `deleteFile`
DeleteFile newDeleteFile2 = newDeleteFile(dataFile.specId(), "data_bucket=0");
RowDelta delete2 =
table
.newRowDelta()
.addDeletes(newDeleteFile2)
.dropRewrittenDeletes(deleteFile)
.validateFromSnapshot(baseSnapshot.snapshotId())
.validateNoConflictingDataFiles();
Snapshot snapshot2 = commit(table, delete2, branch);
assertThat(snapshot2.operation()).isEqualTo(DataOperations.DELETE);
assertThat(snapshot2.sequenceNumber()).isEqualTo(3L);

List<ManifestFile> dataManifests = snapshot2.dataManifests(table.io());
assertThat(dataManifests).hasSize(1);
validateManifest(
dataManifests.get(0),
dataSeqs(1L),
fileSeqs(1L),
ids(baseSnapshot.snapshotId()),
files(dataFile),
statuses(Status.ADDED));

// verify both new delete files have been added
List<ManifestFile> deleteManifests = snapshot2.deleteManifests(table.io());
assertThat(deleteManifests).hasSize(2);
validateDeleteManifest(
deleteManifests.get(0),
dataSeqs(3L),
fileSeqs(3L),
ids(snapshot2.snapshotId()),
files(newDeleteFile2),
statuses(Status.ADDED));
validateDeleteManifest(
deleteManifests.get(1),
dataSeqs(2L),
fileSeqs(2L),
ids(snapshot1.snapshotId()),
files(newDeleteFile1),
statuses(Status.ADDED));
}

@TestTemplate
public void testConcurrentMergeRewriteSameDeleteFile() {
DataFile dataFile = newDataFile("data_bucket=0");
DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0");
RowDelta baseRowDelta = table.newRowDelta().addRows(dataFile).addDeletes(deleteFile);
Snapshot baseSnapshot = commit(table, baseRowDelta, branch);
assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE);

// commit a DELETE operation that replaces `deleteFile`
DeleteFile newDeleteFile1 = newDeleteFile(dataFile.specId(), "data_bucket=0");
RowDelta delete =
table
.newRowDelta()
.addDeletes(newDeleteFile1)
.dropRewrittenDeletes(deleteFile)
.validateFromSnapshot(baseSnapshot.snapshotId())
.validateNoConflictingDataFiles();
commit(table, delete, branch);

// attempt to commit a MERGE operation that replaces `deleteFile`
DataFile newDataFile2 = newDataFile("data_bucket=0");
DeleteFile newDeleteFile2 = newDeleteFile(dataFile.specId(), "data_bucket=0");
RowDelta merge =
table
.newRowDelta()
.addRows(newDataFile2)
.addDeletes(newDeleteFile2)
.dropRewrittenDeletes(deleteFile)
.validateFromSnapshot(baseSnapshot.snapshotId())
.validateNoConflictingDataFiles()
.validateNoConflictingDeleteFiles();

// MERGE must fail as DELETE could have deleted more positions
assertThatThrownBy(() -> commit(table, merge, branch))
.isInstanceOf(ValidationException.class)
.hasMessageStartingWith("Found new conflicting delete files that can apply");
}
}

0 comments on commit da1af21

Please sign in to comment.