diff --git a/api/src/main/java/org/apache/iceberg/RowDelta.java b/api/src/main/java/org/apache/iceberg/RowDelta.java index 624f6c15d20b..a5e3fa477ba9 100644 --- a/api/src/main/java/org/apache/iceberg/RowDelta.java +++ b/api/src/main/java/org/apache/iceberg/RowDelta.java @@ -46,6 +46,17 @@ public interface RowDelta extends SnapshotUpdate { */ RowDelta addDeletes(DeleteFile deletes); + /** + * Removes a rewritten {@link DeleteFile} from the table. + * + * @param deletes a delete file that can be removed from the table + * @return this for method chaining + */ + default RowDelta removeDeletes(DeleteFile deletes) { + throw new UnsupportedOperationException( + getClass().getName() + " does not implement removeDeletes"); + } + /** * Set the snapshot ID used in any reads for this operation. * diff --git a/core/src/jmh/java/org/apache/iceberg/ReplaceDeleteFilesBenchmark.java b/core/src/jmh/java/org/apache/iceberg/ReplaceDeleteFilesBenchmark.java new file mode 100644 index 000000000000..a899b870a90c --- /dev/null +++ b/core/src/jmh/java/org/apache/iceberg/ReplaceDeleteFilesBenchmark.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Timeout; +import org.openjdk.jmh.annotations.Warmup; + +/** + * A benchmark that evaluates the performance of replacing delete files in the table. + * + *

To run this benchmark: + * ./gradlew :iceberg-core:jmh + * -PjmhIncludeRegex=ReplaceDeleteFilesBenchmark + * -PjmhOutputPath=benchmark/replace-delete-files-benchmark.txt + * + */ +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.SingleShotTime) +@Timeout(time = 10, timeUnit = TimeUnit.MINUTES) +public class ReplaceDeleteFilesBenchmark { + + private static final String TABLE_IDENT = "tbl"; + private static final Schema SCHEMA = + new Schema( + required(1, "int_col", Types.IntegerType.get()), + required(2, "long_col", Types.LongType.get()), + required(3, "decimal_col", Types.DecimalType.of(10, 10)), + required(4, "date_col", Types.DateType.get()), + required(5, "timestamp_col", Types.TimestampType.withoutZone()), + required(6, "timestamp_tz_col", Types.TimestampType.withZone()), + required(7, "str_col", Types.StringType.get())); + private static final PartitionSpec SPEC = PartitionSpec.unpartitioned(); + private static final HadoopTables TABLES = new HadoopTables(); + + private Table table; + private List deleteFiles; + private List pendingDeleteFiles; + + @Param({"50000", "100000", "500000", "1000000", "2500000"}) + private int numFiles; + + @Setup + public void setupBenchmark() { + initTable(); + initFiles(); + } + + @TearDown + public void tearDownBenchmark() { + dropTable(); + } + + @Benchmark + @Threads(1) + public void replaceDeleteFiles() { + RowDelta rowDelta = table.newRowDelta(); + deleteFiles.forEach(rowDelta::removeDeletes); + pendingDeleteFiles.forEach(rowDelta::addDeletes); + rowDelta.commit(); + } + + private void initTable() { + this.table = TABLES.create(SCHEMA, SPEC, TABLE_IDENT); + } + + private void dropTable() { + TABLES.dropTable(TABLE_IDENT); + } + + private void initFiles() { + List generatedDeleteFiles = Lists.newArrayListWithExpectedSize(numFiles); + List generatedPendingDeleteFiles = Lists.newArrayListWithExpectedSize(numFiles); + + RowDelta rowDelta = table.newRowDelta(); + + for (int ordinal = 0; ordinal < numFiles; ordinal++) { + DataFile dataFile = FileGenerationUtil.generateDataFile(table, null); + rowDelta.addRows(dataFile); + + DeleteFile deleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile); + rowDelta.addDeletes(deleteFile); + generatedDeleteFiles.add(deleteFile); + + DeleteFile pendingDeleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile); + generatedPendingDeleteFiles.add(pendingDeleteFile); + } + + rowDelta.commit(); + + this.deleteFiles = generatedDeleteFiles; + this.pendingDeleteFiles = generatedPendingDeleteFiles; + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java index 42fd17f0320b..85c2269ee526 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java +++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java @@ -62,6 +62,12 @@ public RowDelta addDeletes(DeleteFile deletes) { return this; } + @Override + public RowDelta removeDeletes(DeleteFile deletes) { + delete(deletes); + return this; + } + @Override public RowDelta validateFromSnapshot(long snapshotId) { this.startingSnapshotId = snapshotId; diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index a2a043e630bb..1d67e48a2ce2 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -1409,4 +1409,151 @@ public void testRowDeltaCaseSensitivity() { .isInstanceOf(ValidationException.class) .hasMessageStartingWith("Found new conflicting delete files"); } + + @TestTemplate + public void testRewrittenDeleteFiles() { + DataFile dataFile = newDataFile("data_bucket=0"); + DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0"); + RowDelta baseRowDelta = table.newRowDelta().addRows(dataFile).addDeletes(deleteFile); + Snapshot baseSnapshot = commit(table, baseRowDelta, branch); + assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE); + + DeleteFile newDeleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0"); + RowDelta rowDelta = + table + .newRowDelta() + .removeDeletes(deleteFile) + .addDeletes(newDeleteFile) + .validateFromSnapshot(baseSnapshot.snapshotId()); + Snapshot snapshot = commit(table, rowDelta, branch); + assertThat(snapshot.operation()).isEqualTo(DataOperations.DELETE); + + List dataManifests = snapshot.dataManifests(table.io()); + assertThat(dataManifests).hasSize(1); + validateManifest( + dataManifests.get(0), + dataSeqs(1L), + fileSeqs(1L), + ids(baseSnapshot.snapshotId()), + files(dataFile), + statuses(Status.ADDED)); + + List deleteManifests = snapshot.deleteManifests(table.io()); + assertThat(deleteManifests).hasSize(2); + validateDeleteManifest( + deleteManifests.get(0), + dataSeqs(2L), + fileSeqs(2L), + ids(snapshot.snapshotId()), + files(newDeleteFile), + statuses(Status.ADDED)); + validateDeleteManifest( + deleteManifests.get(1), + dataSeqs(1L), + fileSeqs(1L), + ids(snapshot.snapshotId()), + files(deleteFile), + statuses(Status.DELETED)); + } + + @TestTemplate + public void testConcurrentDeletesRewriteSameDeleteFile() { + DataFile dataFile = newDataFile("data_bucket=0"); + DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0"); + RowDelta baseRowDelta = table.newRowDelta().addRows(dataFile).addDeletes(deleteFile); + Snapshot baseSnapshot = commit(table, baseRowDelta, branch); + assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE); + + // commit the first DELETE operation that replaces `deleteFile` + DeleteFile newDeleteFile1 = newDeleteFile(dataFile.specId(), "data_bucket=0"); + RowDelta delete1 = + table + .newRowDelta() + .addDeletes(newDeleteFile1) + .removeDeletes(deleteFile) + .validateFromSnapshot(baseSnapshot.snapshotId()) + .validateNoConflictingDataFiles(); + Snapshot snapshot1 = commit(table, delete1, branch); + assertThat(snapshot1.operation()).isEqualTo(DataOperations.DELETE); + assertThat(snapshot1.sequenceNumber()).isEqualTo(2L); + + // commit the second DELETE operation that replaces `deleteFile` + DeleteFile newDeleteFile2 = newDeleteFile(dataFile.specId(), "data_bucket=0"); + RowDelta delete2 = + table + .newRowDelta() + .addDeletes(newDeleteFile2) + .removeDeletes(deleteFile) + .validateFromSnapshot(baseSnapshot.snapshotId()) + .validateNoConflictingDataFiles(); + Snapshot snapshot2 = commit(table, delete2, branch); + assertThat(snapshot2.operation()).isEqualTo(DataOperations.DELETE); + assertThat(snapshot2.sequenceNumber()).isEqualTo(3L); + + List dataManifests = snapshot2.dataManifests(table.io()); + assertThat(dataManifests).hasSize(1); + validateManifest( + dataManifests.get(0), + dataSeqs(1L), + fileSeqs(1L), + ids(baseSnapshot.snapshotId()), + files(dataFile), + statuses(Status.ADDED)); + + // verify both new delete files have been added + List deleteManifests = snapshot2.deleteManifests(table.io()); + assertThat(deleteManifests).hasSize(2); + validateDeleteManifest( + deleteManifests.get(0), + dataSeqs(3L), + fileSeqs(3L), + ids(snapshot2.snapshotId()), + files(newDeleteFile2), + statuses(Status.ADDED)); + validateDeleteManifest( + deleteManifests.get(1), + dataSeqs(2L), + fileSeqs(2L), + ids(snapshot1.snapshotId()), + files(newDeleteFile1), + statuses(Status.ADDED)); + } + + @TestTemplate + public void testConcurrentMergeRewriteSameDeleteFile() { + DataFile dataFile = newDataFile("data_bucket=0"); + DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0"); + RowDelta baseRowDelta = table.newRowDelta().addRows(dataFile).addDeletes(deleteFile); + Snapshot baseSnapshot = commit(table, baseRowDelta, branch); + assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE); + + // commit a DELETE operation that replaces `deleteFile` + DeleteFile newDeleteFile1 = newDeleteFile(dataFile.specId(), "data_bucket=0"); + RowDelta delete = + table + .newRowDelta() + .addDeletes(newDeleteFile1) + .removeDeletes(deleteFile) + .validateFromSnapshot(baseSnapshot.snapshotId()) + .validateNoConflictingDataFiles(); + commit(table, delete, branch); + + // attempt to commit a MERGE operation that replaces `deleteFile` + DataFile newDataFile2 = newDataFile("data_bucket=0"); + DeleteFile newDeleteFile2 = newDeleteFile(dataFile.specId(), "data_bucket=0"); + RowDelta merge = + table + .newRowDelta() + .addRows(newDataFile2) + .addDeletes(newDeleteFile2) + .removeDeletes(deleteFile) + .validateFromSnapshot(baseSnapshot.snapshotId()) + .validateNoConflictingDataFiles() + .validateNoConflictingDeleteFiles(); + + // MERGE must fail as DELETE could have deleted more positions + assertThatThrownBy(() -> commit(table, merge, branch)) + .isInstanceOf(ValidationException.class) + .hasMessageStartingWith("Found new conflicting delete files that can apply"); + } }