diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java index c891a24f4d..220fb0bce1 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java @@ -226,6 +226,22 @@ private static boolean validateExpirationField( */ @VisibleForTesting public static OptimizingConfig parseOptimizingConfig(Map properties) { + boolean rewriteAllAvro = + CompatiblePropertyUtil.propertyAsBoolean( + properties, + TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO, + TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO_DEFAULT); + String defaultFileFormat = + CompatiblePropertyUtil.propertyAsString( + properties, + TableProperties.DEFAULT_FILE_FORMAT, + TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + if (TableProperties.FILE_FORMAT_AVRO.equalsIgnoreCase(defaultFileFormat) && rewriteAllAvro) { + LOG.warn( + "Table output format is avro, {} will be ignored.", + TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO); + rewriteAllAvro = false; + } return new OptimizingConfig() .setEnabled( CompatiblePropertyUtil.propertyAsBoolean( @@ -307,6 +323,7 @@ public static OptimizingConfig parseOptimizingConfig(Map propert properties, TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES, TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT)) + .setRewriteAllAvro(rewriteAllAvro) .setFilter( CompatiblePropertyUtil.propertyAsString( properties, diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/MixedTablePlanTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/MixedTablePlanTestBase.java index 679aa46122..a050aa12af 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/MixedTablePlanTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/MixedTablePlanTestBase.java @@ -39,9 +39,12 @@ import org.apache.amoro.table.TableProperties; import org.apache.amoro.table.UnkeyedTable; import org.apache.amoro.utils.MixedDataFiles; +import org.apache.iceberg.AppendFiles; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionData; import org.apache.iceberg.StructLike; import org.apache.iceberg.Transaction; @@ -67,6 +70,7 @@ public abstract class MixedTablePlanTestBase extends TableTestBase { protected DefaultTableRuntime tableRuntime; + private long avroFileSeq = 0; public MixedTablePlanTestBase( CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { @@ -413,6 +417,24 @@ protected void updateTableProperty(String key, String value) { getMixedTable().updateProperties().set(key, value).commit(); } + protected DataFile appendAvroDataFile(long fileSizeBytes) { + DataFile dataFile = + DataFiles.builder(getMixedTable().spec()) + .withPath(String.format("avro-data-%s.avro", avroFileSeq++)) + .withFileSizeInBytes(fileSizeBytes) + .withPartitionPath(getPartitionPath()) + .withRecordCount(1) + .withFormat(FileFormat.AVRO) + .build(); + AppendFiles appendFiles = + getMixedTable().isKeyedTable() + ? getMixedTable().asKeyedTable().baseTable().newAppend() + : getMixedTable().asUnkeyedTable().newAppend(); + appendFiles.appendFile(dataFile); + appendFiles.commit(); + return dataFile; + } + protected void updatePartitionProperty(StructLike partition, String key, String value) { UnkeyedTable table; if (getMixedTable().isKeyedTable()) { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java index 7ad399709b..b387ad8da0 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java @@ -24,6 +24,7 @@ import org.apache.amoro.catalog.BasicCatalogTestHelper; import org.apache.amoro.catalog.CatalogTestHelper; import org.apache.amoro.optimizing.IcebergRewriteExecutorFactory; +import org.apache.amoro.optimizing.RewriteStageTask; import org.apache.amoro.optimizing.TaskProperties; import org.apache.amoro.optimizing.plan.AbstractPartitionPlan; import org.apache.amoro.optimizing.plan.IcebergPartitionPlan; @@ -32,10 +33,15 @@ import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.utils.IcebergTableUtil; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.amoro.table.TableProperties; +import org.apache.iceberg.DataFile; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.util.Collections; +import java.util.List; import java.util.Map; @RunWith(Parameterized.class) @@ -73,6 +79,56 @@ public void testOnlyOneFragmentFiles() { testOnlyOneFragmentFileBase(); } + @Test + public void testRewriteAllAvroWithFragmentFile() { + testRewriteAllAvro( + TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT + / TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT); + } + + @Test + public void testRewriteAllAvroWithUndersizedSegmentFile() { + testRewriteAllAvro( + TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT + / TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT + + 1); + } + + @Test + public void testRewriteAllAvroWithTargetSizeReachedFile() { + testRewriteAllAvro( + (long) + (TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT + * TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO_DEFAULT) + + 1); + } + + @Test + public void testRewriteAllAvroWithTargetSizeReachedFile2() { + testRewriteAllAvro(TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT + 1); + } + + private void testRewriteAllAvro(long fileSizeBytes) { + closeFullOptimizingInterval(); + updateTableProperty(TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO, "false"); + DataFile avroFile = appendAvroDataFile(fileSizeBytes); + Assert.assertTrue(planWithCurrentFiles().isEmpty()); + + updateTableProperty(TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO, "true"); + List tasks = planWithCurrentFiles(); + Assert.assertEquals(1, tasks.size()); + assertTask( + tasks.get(0), + Collections.singletonList(avroFile), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList()); + + updateTableProperty(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.FILE_FORMAT_AVRO); + updateTableProperty(TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO, "true"); + Assert.assertTrue(planWithCurrentFiles().isEmpty()); + } + @Override protected AbstractPartitionPlan getPartitionPlan() { DefaultTableRuntime tableRuntime = getTableRuntime(); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java index 0ad80622c3..0aa4986e2c 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java @@ -24,6 +24,7 @@ import org.apache.amoro.catalog.BasicCatalogTestHelper; import org.apache.amoro.catalog.CatalogTestHelper; import org.apache.amoro.optimizing.MixedIcebergRewriteExecutorFactory; +import org.apache.amoro.optimizing.RewriteStageTask; import org.apache.amoro.optimizing.TaskProperties; import org.apache.amoro.optimizing.plan.AbstractPartitionPlan; import org.apache.amoro.optimizing.plan.MixedIcebergPartitionPlan; @@ -31,11 +32,16 @@ import org.apache.amoro.optimizing.scan.UnkeyedTableFileScanHelper; import org.apache.amoro.server.utils.IcebergTableUtil; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.amoro.table.TableProperties; import org.apache.amoro.table.UnkeyedTable; +import org.apache.iceberg.DataFile; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.util.Collections; +import java.util.List; import java.util.Map; @RunWith(Parameterized.class) @@ -79,6 +85,56 @@ public void testOnlyOneFragmentFiles() { testOnlyOneFragmentFileBase(); } + @Test + public void testRewriteAllAvroWithFragmentFile() { + testRewriteAllAvro( + TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT + / TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT); + } + + @Test + public void testRewriteAllAvroWithUndersizedSegmentFile() { + testRewriteAllAvro( + TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT + / TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT + + 1); + } + + @Test + public void testRewriteAllAvroWithTargetSizeReachedFile() { + testRewriteAllAvro( + (long) + (TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT + * TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO_DEFAULT) + + 1); + } + + @Test + public void testRewriteAllAvroWithTargetSizeReachedFile2() { + testRewriteAllAvro(TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT + 1); + } + + private void testRewriteAllAvro(long fileSizeBytes) { + closeFullOptimizingInterval(); + updateTableProperty(TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO, "false"); + DataFile avroFile = appendAvroDataFile(fileSizeBytes); + Assert.assertTrue(planWithCurrentFiles().isEmpty()); + + updateTableProperty(TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO, "true"); + List tasks = planWithCurrentFiles(); + Assert.assertEquals(1, tasks.size()); + assertTask( + tasks.get(0), + Collections.singletonList(avroFile), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList()); + + updateTableProperty(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.FILE_FORMAT_AVRO); + updateTableProperty(TableProperties.SELF_OPTIMIZING_REWRITE_ALL_AVRO, "true"); + Assert.assertTrue(planWithCurrentFiles().isEmpty()); + } + @Override protected AbstractPartitionPlan getPartitionPlan() { return new MixedIcebergPartitionPlan( diff --git a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java index 0c743ac6bd..1fde6872df 100644 --- a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java +++ b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java @@ -76,6 +76,9 @@ public class OptimizingConfig { // self-optimizing.full.rewrite-all-files private boolean fullRewriteAllFiles; + // self-optimizing.rewrite-all-avro + private boolean rewriteAllAvro; + // self-optimizing.filter private String filter; @@ -260,6 +263,15 @@ public OptimizingConfig setFullRewriteAllFiles(boolean fullRewriteAllFiles) { return this; } + public boolean isRewriteAllAvro() { + return rewriteAllAvro; + } + + public OptimizingConfig setRewriteAllAvro(boolean rewriteAllAvro) { + this.rewriteAllAvro = rewriteAllAvro; + return this; + } + public OptimizingConfig setFilter(String filter) { this.filter = filter; return this; @@ -343,6 +355,7 @@ public boolean equals(Object o) { && Double.compare(that.majorDuplicateRatio, majorDuplicateRatio) == 0 && fullTriggerInterval == that.fullTriggerInterval && fullRewriteAllFiles == that.fullRewriteAllFiles + && rewriteAllAvro == that.rewriteAllAvro && Objects.equal(filter, that.filter) && baseHashBucket == that.baseHashBucket && baseRefreshInterval == that.baseRefreshInterval @@ -373,6 +386,7 @@ public int hashCode() { majorDuplicateRatio, fullTriggerInterval, fullRewriteAllFiles, + rewriteAllAvro, filter, baseHashBucket, baseRefreshInterval, @@ -401,6 +415,7 @@ public String toString() { .add("majorDuplicateRatio", majorDuplicateRatio) .add("fullTriggerInterval", fullTriggerInterval) .add("fullRewriteAllFiles", fullRewriteAllFiles) + .add("rewriteAllAvro", rewriteAllAvro) .add("filter", filter) .add("baseHashBucket", baseHashBucket) .add("baseRefreshInterval", baseRefreshInterval) diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/CommonPartitionEvaluator.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/CommonPartitionEvaluator.java index 2cd11cbd51..3ece7013fb 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/CommonPartitionEvaluator.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/CommonPartitionEvaluator.java @@ -26,6 +26,7 @@ import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; +import org.apache.amoro.utils.ContentFiles; import org.apache.amoro.utils.TableFileUtil; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; @@ -55,6 +56,8 @@ public class CommonPartitionEvaluator implements PartitionEvaluator { private final boolean reachFullInterval; + protected boolean needRewriteAvroFile = false; + // fragment files protected int fragmentFileCount = 0; protected long fragmentFileSize = 0; @@ -161,6 +164,7 @@ private boolean addFragmentFile(DataFile dataFile, List> deletes) fragmentFileSize += dataFile.fileSizeInBytes(); fragmentFileCount++; fragmentFileRecords += dataFile.recordCount(); + needRewriteAvroFile = needRewriteAvroFile || isAvroFileAndNeedRewrite(dataFile); for (ContentFile delete : deletes) { addDelete(delete); @@ -250,7 +254,10 @@ protected boolean fileShouldFullOptimizing(DataFile dataFile, List> deletes) { @@ -260,6 +267,9 @@ public boolean fileShouldRewrite(DataFile dataFile, List> deletes if (isFragmentFile(dataFile)) { return true; } + if (isAvroFileAndNeedRewrite(dataFile)) { + return true; + } // When Upsert writing is enabled in the Flink engine, both INSERT and UPDATE_AFTER will // generate deletes files (Most are eq-delete), and eq-delete file will be associated // with the data file before the current snapshot. @@ -271,6 +281,10 @@ public boolean fileShouldRewrite(DataFile dataFile, List> deletes > dataFile.recordCount() * config.getMajorDuplicateRatio(); } + private boolean isAvroFileAndNeedRewrite(DataFile dataFile) { + return config.isRewriteAllAvro() && ContentFiles.isAvroFile(dataFile); + } + public boolean segmentShouldRewritePos(DataFile dataFile, List> deletes) { Preconditions.checkArgument(!isFragmentFile(dataFile), "Unsupported fragment file."); long equalDeleteFileCount = 0; @@ -407,7 +421,7 @@ public boolean isMajorNecessary() { public boolean isMinorNecessary() { int smallFileCount = fragmentFileCount + equalityDeleteFileCount; return smallFileCount >= config.getMinorLeastFileCount() - || (smallFileCount > 1 && reachMinorInterval()) + || ((smallFileCount > 1 || needRewriteAvroFile) && reachMinorInterval()) || combinePosSegmentFileCount > 0; } diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java index f1eb92104d..dc9e0f7598 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java @@ -24,6 +24,7 @@ import org.apache.amoro.optimizing.TaskProperties; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.table.MixedTable; +import org.apache.amoro.utils.ContentFiles; import org.apache.iceberg.StructLike; import org.apache.iceberg.util.Pair; @@ -75,7 +76,9 @@ protected boolean enoughInputFiles(SplitTask splitTask) { boolean only1DataFile = splitTask.getRewriteDataFiles().size() == 1 && splitTask.getRewritePosDataFiles().size() == 0 - && splitTask.getDeleteFiles().size() == 0; + && splitTask.getDeleteFiles().size() == 0 + && !(config.isRewriteAllAvro() + && ContentFiles.isAvroFile(splitTask.getRewriteDataFiles().iterator().next())); return !only1DataFile; } } diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java index 0d804cdec7..d334c074a3 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java @@ -126,6 +126,9 @@ private TableProperties() {} "self-optimizing.full.rewrite-all-files"; public static final boolean SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT = true; + public static final String SELF_OPTIMIZING_REWRITE_ALL_AVRO = "self-optimizing.rewrite-all-avro"; + public static final boolean SELF_OPTIMIZING_REWRITE_ALL_AVRO_DEFAULT = false; + public static final String SELF_OPTIMIZING_FILTER = "self-optimizing.filter"; public static final String SELF_OPTIMIZING_FILTER_DEFAULT = null; @@ -250,6 +253,8 @@ private TableProperties() {} public static final String FILE_FORMAT_ORC = "orc"; + public static final String FILE_FORMAT_AVRO = "avro"; + public static final String BASE_FILE_FORMAT = "base.write.format"; public static final String BASE_FILE_FORMAT_DEFAULT = FILE_FORMAT_PARQUET; diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/ContentFiles.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/ContentFiles.java index e4fc035676..93ec61d190 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/ContentFiles.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/ContentFiles.java @@ -23,6 +23,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; public class ContentFiles { @@ -38,6 +39,10 @@ public static boolean isDataFile(ContentFile contentFile) { return contentFile.content() == FileContent.DATA; } + public static boolean isAvroFile(DataFile dataFile) { + return FileFormat.AVRO.equals(dataFile.format()); + } + public static DataFile asDataFile(ContentFile contentFile) { Preconditions.checkArgument(isDataFile(contentFile), "Not a data file"); return (DataFile) contentFile; diff --git a/docs/user-guides/configurations.md b/docs/user-guides/configurations.md index 1895812cea..a91a20814a 100644 --- a/docs/user-guides/configurations.md +++ b/docs/user-guides/configurations.md @@ -62,6 +62,8 @@ Self-optimizing configurations are applicable to both Iceberg Format and Mixed s | self-optimizing.full.rewrite-all-files | true | Whether full optimizing rewrites all files or skips files that do not need to be optimized | | self-optimizing.min-plan-interval | 60000 | The minimum time interval between two self-optimizing planning action | | self-optimizing.filter | NULL | Filter conditions for self-optimizing, using SQL conditional expressions, without supporting any functions. For the timestamp column condition, the ISO date-time formatter must be used. For example: op_time > '2007-12-03T10:15:30'. | +| self-optimizing.rewrite-all-avro | false | Whether to rewrite all avro files during optimizing (Ignored when write.format.default is avro). Mainly for high-throughput write scenarios. Writing with avro greatly improves write performance; enabling this option will rewrite all avro files to the file format defined by write.format.default(parquet or orc), ensuring read performance is not affected. | + ## Data-cleaning configurations