Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,22 @@ private static boolean validateExpirationField(
*/
@VisibleForTesting
public static OptimizingConfig parseOptimizingConfig(Map<String, String> properties) {
boolean rewriteAllAvro =
CompatiblePropertyUtil.propertyAsBoolean(
properties,
TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO,
TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO_DEFAULT);
String defaultFileFormat =
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
if (TableProperties.FILE_FORMAT_AVRO.equalsIgnoreCase(defaultFileFormat) && rewriteAllAvro) {
LOG.warn(
"Table output format is avro, {} will be ignored.",
TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO);
rewriteAllAvro = false;
}
return new OptimizingConfig()
.setEnabled(
CompatiblePropertyUtil.propertyAsBoolean(
Expand Down Expand Up @@ -307,6 +323,7 @@ public static OptimizingConfig parseOptimizingConfig(Map<String, String> propert
properties,
TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES,
TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT))
.setRewriteAllAvro(rewriteAllAvro)
.setFilter(
CompatiblePropertyUtil.propertyAsString(
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@
import org.apache.amoro.table.TableProperties;
import org.apache.amoro.table.UnkeyedTable;
import org.apache.amoro.utils.MixedDataFiles;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Transaction;
Expand All @@ -67,6 +70,7 @@
public abstract class MixedTablePlanTestBase extends TableTestBase {

protected DefaultTableRuntime tableRuntime;
private long avroFileSeq = 0;

public MixedTablePlanTestBase(
CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
Expand Down Expand Up @@ -413,6 +417,24 @@ protected void updateTableProperty(String key, String value) {
getMixedTable().updateProperties().set(key, value).commit();
}

protected DataFile appendAvroDataFile(long fileSizeBytes) {
DataFile dataFile =
DataFiles.builder(getMixedTable().spec())
.withPath(String.format("avro-data-%s.avro", avroFileSeq++))
.withFileSizeInBytes(fileSizeBytes)
.withPartitionPath(getPartitionPath())
.withRecordCount(1)
.withFormat(FileFormat.AVRO)
.build();
AppendFiles appendFiles =
getMixedTable().isKeyedTable()
? getMixedTable().asKeyedTable().baseTable().newAppend()
: getMixedTable().asUnkeyedTable().newAppend();
appendFiles.appendFile(dataFile);
appendFiles.commit();
return dataFile;
}

protected void updatePartitionProperty(StructLike partition, String key, String value) {
UnkeyedTable table;
if (getMixedTable().isKeyedTable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.optimizing.IcebergRewriteExecutorFactory;
import org.apache.amoro.optimizing.RewriteStageTask;
import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.optimizing.plan.AbstractPartitionPlan;
import org.apache.amoro.optimizing.plan.IcebergPartitionPlan;
Expand All @@ -32,10 +33,15 @@
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.TableProperties;
import org.apache.iceberg.DataFile;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Collections;
import java.util.List;
import java.util.Map;

@RunWith(Parameterized.class)
Expand Down Expand Up @@ -73,6 +79,56 @@ public void testOnlyOneFragmentFiles() {
testOnlyOneFragmentFileBase();
}

@Test
public void testRewriteAllAvroWithFragmentFile() {
testRewriteAllAvro(
TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT
/ TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT);
}

@Test
public void testRewriteAllAvroWithUndersizedSegmentFile() {
testRewriteAllAvro(
TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT
/ TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT
+ 1);
}

@Test
public void testRewriteAllAvroWithTargetSizeReachedFile() {
testRewriteAllAvro(
(long)
(TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT
* TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO_DEFAULT)
+ 1);
}

@Test
public void testRewriteAllAvroWithTargetSizeReachedFile2() {
testRewriteAllAvro(TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT + 1);
}

private void testRewriteAllAvro(long fileSizeBytes) {
closeFullOptimizingInterval();
updateTableProperty(TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO, "false");
DataFile avroFile = appendAvroDataFile(fileSizeBytes);
Assert.assertTrue(planWithCurrentFiles().isEmpty());

updateTableProperty(TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO, "true");
List<RewriteStageTask> tasks = planWithCurrentFiles();
Assert.assertEquals(1, tasks.size());
assertTask(
tasks.get(0),
Collections.singletonList(avroFile),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList());

updateTableProperty(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.FILE_FORMAT_AVRO);
updateTableProperty(TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO, "true");
Assert.assertTrue(planWithCurrentFiles().isEmpty());
}

@Override
protected AbstractPartitionPlan getPartitionPlan() {
DefaultTableRuntime tableRuntime = getTableRuntime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,24 @@
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.optimizing.MixedIcebergRewriteExecutorFactory;
import org.apache.amoro.optimizing.RewriteStageTask;
import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.optimizing.plan.AbstractPartitionPlan;
import org.apache.amoro.optimizing.plan.MixedIcebergPartitionPlan;
import org.apache.amoro.optimizing.scan.TableFileScanHelper;
import org.apache.amoro.optimizing.scan.UnkeyedTableFileScanHelper;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.TableProperties;
import org.apache.amoro.table.UnkeyedTable;
import org.apache.iceberg.DataFile;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Collections;
import java.util.List;
import java.util.Map;

@RunWith(Parameterized.class)
Expand Down Expand Up @@ -79,6 +85,56 @@ public void testOnlyOneFragmentFiles() {
testOnlyOneFragmentFileBase();
}

@Test
public void testRewriteAllAvroWithFragmentFile() {
testRewriteAllAvro(
TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT
/ TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT);
}

@Test
public void testRewriteAllAvroWithUndersizedSegmentFile() {
testRewriteAllAvro(
TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT
/ TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT
+ 1);
}

@Test
public void testRewriteAllAvroWithTargetSizeReachedFile() {
testRewriteAllAvro(
(long)
(TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT
* TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO_DEFAULT)
+ 1);
}

@Test
public void testRewriteAllAvroWithTargetSizeReachedFile2() {
testRewriteAllAvro(TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT + 1);
}

private void testRewriteAllAvro(long fileSizeBytes) {
closeFullOptimizingInterval();
updateTableProperty(TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO, "false");
DataFile avroFile = appendAvroDataFile(fileSizeBytes);
Assert.assertTrue(planWithCurrentFiles().isEmpty());

updateTableProperty(TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO, "true");
List<RewriteStageTask> tasks = planWithCurrentFiles();
Assert.assertEquals(1, tasks.size());
assertTask(
tasks.get(0),
Collections.singletonList(avroFile),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList());

updateTableProperty(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.FILE_FORMAT_AVRO);
updateTableProperty(TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO, "true");
Assert.assertTrue(planWithCurrentFiles().isEmpty());
}

@Override
protected AbstractPartitionPlan getPartitionPlan() {
return new MixedIcebergPartitionPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public class OptimizingConfig {
// self-optimizing.full.rewrite-all-files
private boolean fullRewriteAllFiles;

// self-optimizing.rewrite-all-avro
private boolean rewriteAllAvro;

// self-optimizing.filter
private String filter;

Expand Down Expand Up @@ -260,6 +263,15 @@ public OptimizingConfig setFullRewriteAllFiles(boolean fullRewriteAllFiles) {
return this;
}

public boolean isRewriteAllAvro() {
return rewriteAllAvro;
}

public OptimizingConfig setRewriteAllAvro(boolean rewriteAllAvro) {
this.rewriteAllAvro = rewriteAllAvro;
return this;
}

public OptimizingConfig setFilter(String filter) {
this.filter = filter;
return this;
Expand Down Expand Up @@ -343,6 +355,7 @@ public boolean equals(Object o) {
&& Double.compare(that.majorDuplicateRatio, majorDuplicateRatio) == 0
&& fullTriggerInterval == that.fullTriggerInterval
&& fullRewriteAllFiles == that.fullRewriteAllFiles
&& rewriteAllAvro == that.rewriteAllAvro
&& Objects.equal(filter, that.filter)
&& baseHashBucket == that.baseHashBucket
&& baseRefreshInterval == that.baseRefreshInterval
Expand Down Expand Up @@ -373,6 +386,7 @@ public int hashCode() {
majorDuplicateRatio,
fullTriggerInterval,
fullRewriteAllFiles,
rewriteAllAvro,
filter,
baseHashBucket,
baseRefreshInterval,
Expand Down Expand Up @@ -401,6 +415,7 @@ public String toString() {
.add("majorDuplicateRatio", majorDuplicateRatio)
.add("fullTriggerInterval", fullTriggerInterval)
.add("fullRewriteAllFiles", fullRewriteAllFiles)
.add("rewriteAllAvro", rewriteAllAvro)
.add("filter", filter)
.add("baseHashBucket", baseHashBucket)
.add("baseRefreshInterval", baseRefreshInterval)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.utils.ContentFiles;
import org.apache.amoro.utils.TableFileUtil;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
Expand Down Expand Up @@ -55,6 +56,8 @@ public class CommonPartitionEvaluator implements PartitionEvaluator {

private final boolean reachFullInterval;

protected boolean needRewriteAvroFile = false;

// fragment files
protected int fragmentFileCount = 0;
protected long fragmentFileSize = 0;
Expand Down Expand Up @@ -161,6 +164,7 @@ private boolean addFragmentFile(DataFile dataFile, List<ContentFile<?>> deletes)
fragmentFileSize += dataFile.fileSizeInBytes();
fragmentFileCount++;
fragmentFileRecords += dataFile.recordCount();
needRewriteAvroFile = needRewriteAvroFile || isAvroFileAndNeedRewrite(dataFile);

for (ContentFile<?> delete : deletes) {
addDelete(delete);
Expand Down Expand Up @@ -250,7 +254,10 @@ protected boolean fileShouldFullOptimizing(DataFile dataFile, List<ContentFile<?
return true;
}
// If a file is related any delete files or is not big enough, it should full optimizing
return !deleteFiles.isEmpty() || isFragmentFile(dataFile) || isUndersizedSegmentFile(dataFile);
return !deleteFiles.isEmpty()
|| isFragmentFile(dataFile)
|| isUndersizedSegmentFile(dataFile)
|| isAvroFileAndNeedRewrite(dataFile);
}

public boolean fileShouldRewrite(DataFile dataFile, List<ContentFile<?>> deletes) {
Expand All @@ -260,6 +267,9 @@ public boolean fileShouldRewrite(DataFile dataFile, List<ContentFile<?>> deletes
if (isFragmentFile(dataFile)) {
return true;
}
if (isAvroFileAndNeedRewrite(dataFile)) {
return true;
}
// When Upsert writing is enabled in the Flink engine, both INSERT and UPDATE_AFTER will
// generate deletes files (Most are eq-delete), and eq-delete file will be associated
// with the data file before the current snapshot.
Expand All @@ -271,6 +281,10 @@ public boolean fileShouldRewrite(DataFile dataFile, List<ContentFile<?>> deletes
> dataFile.recordCount() * config.getMajorDuplicateRatio();
}

private boolean isAvroFileAndNeedRewrite(DataFile dataFile) {
return config.isRewriteAllAvro() && ContentFiles.isAvroFile(dataFile);
}

public boolean segmentShouldRewritePos(DataFile dataFile, List<ContentFile<?>> deletes) {
Preconditions.checkArgument(!isFragmentFile(dataFile), "Unsupported fragment file.");
long equalDeleteFileCount = 0;
Expand Down Expand Up @@ -407,7 +421,7 @@ public boolean isMajorNecessary() {
public boolean isMinorNecessary() {
int smallFileCount = fragmentFileCount + equalityDeleteFileCount;
return smallFileCount >= config.getMinorLeastFileCount()
|| (smallFileCount > 1 && reachMinorInterval())
|| ((smallFileCount > 1 || needRewriteAvroFile) && reachMinorInterval())
|| combinePosSegmentFileCount > 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.utils.ContentFiles;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.util.Pair;

Expand Down Expand Up @@ -75,7 +76,9 @@ protected boolean enoughInputFiles(SplitTask splitTask) {
boolean only1DataFile =
splitTask.getRewriteDataFiles().size() == 1
&& splitTask.getRewritePosDataFiles().size() == 0
&& splitTask.getDeleteFiles().size() == 0;
&& splitTask.getDeleteFiles().size() == 0
&& !(config.isRewriteAllAvro()
&& ContentFiles.isAvroFile(splitTask.getRewriteDataFiles().iterator().next()));
return !only1DataFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ private TableProperties() {}
"self-optimizing.full.rewrite-all-files";
public static final boolean SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT = true;

public static final String SELF_OPTIMIZING_REWRITE_ALL_AVRO = "self-optimizing.rewrite-all-avro";
public static final boolean SELF_OPTIMIZING_REWRITE_ALL_AVRO_DEFAULT = false;

public static final String SELF_OPTIMIZING_FILTER = "self-optimizing.filter";
public static final String SELF_OPTIMIZING_FILTER_DEFAULT = null;

Expand Down Expand Up @@ -250,6 +253,8 @@ private TableProperties() {}

public static final String FILE_FORMAT_ORC = "orc";

public static final String FILE_FORMAT_AVRO = "avro";

public static final String BASE_FILE_FORMAT = "base.write.format";
public static final String BASE_FILE_FORMAT_DEFAULT = FILE_FORMAT_PARQUET;

Expand Down
Loading