-
Notifications
You must be signed in to change notification settings - Fork 377
Description
What happened?
We have a Flink job that performs CDC synchronization from an upstream database to Iceberg. This job generates many equality delete files, and many of them are dangling delete files. When Amoro runs the clean-dangling-delete-files operation on this table, it fails with the following error.
2026-01-14 02:49:39,401 WARN [async-dangling-delete-files-cleaning-executor-7] [org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer] [] - Failed to commit dangling delete file for table $catalog.$database.$table, but ignore it
org.apache.iceberg.exceptions.ValidationException: Missing required files to delete: oss://omit-for-security-concern/data/_id_bucket_20=12/00000-0-5a087250-6137-4365-b5f9-9b04ceae2e47-305883.parquet,...dangling-delete-file-list-omitted
at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49) ~[iceberg-api-lilith-trino-1.6.1.1.jar:?]
at org.apache.iceberg.ManifestFilterManager.validateRequiredDeletes(ManifestFilterManager.java:237) ~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at org.apache.iceberg.ManifestFilterManager.filterManifests(ManifestFilterManager.java:199) ~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:837) ~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:236) ~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:386) ~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) ~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219) ~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203) ~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) ~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:384) ~[iceberg-core-lilith-trino-1.6.1.1.jar:?]
at org.apache.amoro.op.MixedUpdate.commit(MixedUpdate.java:143) ~[amoro-format-iceberg-0.8.0-incubating.jar:0.8.0-incubating]
at org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.clearInternalTableDanglingDeleteFiles(IcebergTableMaintainer.java:477) ~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
at org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.cleanDanglingDeleteFiles(IcebergTableMaintainer.java:336) ~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
at org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.cleanDanglingDeleteFiles(IcebergTableMaintainer.java:162) ~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
at org.apache.amoro.server.table.executor.DanglingDeleteFilesCleaningExecutor.execute(DanglingDeleteFilesCleaningExecutor.java:62) ~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
at org.apache.amoro.server.table.executor.BaseTableExecutor.executeTask(BaseTableExecutor.java:82) ~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
at org.apache.amoro.server.table.executor.BaseTableExecutor.lambda$null$0(BaseTableExecutor.java:72) ~[amoro-ams-0.8.0-incubating.jar:0.8.0-incubating]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_452]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_452]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_452]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_452]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_452]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_452]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_452]
Affects Versions
0.8.1-incubating
What table formats are you seeing the problem on?
Iceberg
What engines are you seeing the problem on?
Spark, Flink, AMS
How to reproduce
- Create an Iceberg table using spark.
create table iceberg.db.t (
id bigint NOT NULL,
data string)
using iceberg
partitioned BY (bucket(5, id))
tblproperties(
'format-version' = '2',
'write.upsert.enabled' = 'true',
'identifier-fields' = '[id]'
);
ALTER TABLE iceberg.db.t SET IDENTIFIER FIELDS id;- Upsert a row in Flink.
At this point, Flink will create an equality delete file and a data file containing the same id.
INSERT INTO iceberg.db.t /*+ OPTIONS('upsert-enabled'='true') */ values (5, 'spec=0');- Modify the partition column using Spark then call the rewrite_data_files procedure.
ALTER TABLE iceberg.db.t
REPLACE PARTITION FIELD bucket(5, id) WITH bucket(10, id);
call iceberg.system.rewrite_data_files(table => 'db.t', options => MAP('rewrite-all', 'true'));- Upsert a row in Flink. The rows with id=5 and id=8 are hashed into the same bucket.
INSERT INTO iceberg.db.t /*+ OPTIONS('upsert-enabled'='true') */ values (8, 'spec=1');- Run clean-dangling-delete-files on this table in Amoro.
Restarting Amoro can trigger the clean-dangling-delete-files operation, but once it starts, it immediately throws an error.
org.apache.iceberg.exceptions.ValidationException: Missing required files to delete: oss://omit-for-security-concern/db.db/t/data/id_bucket_10=3/00000-0-28473f67-0f9e-4e68-969a-91903fb026e6-00002.parquet
at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
at org.apache.iceberg.ManifestFilterManager.validateRequiredDeletes(ManifestFilterManager.java:237)
at org.apache.iceberg.ManifestFilterManager.filterManifests(ManifestFilterManager.java:199)
at org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:837)
at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:226)
at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:376)
at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:374)
...
Relevant log output
Anything else
The root cause is that in org.apache.amoro.server.utils.IcebergTableUtil#getDanglingDeleteFiles, the call to org.apache.amoro.scan.TableEntriesScan#entries returns an IcebergFileEntry whose file()'s partition() value is incorrect.
The underlying issue is in the implementation of org.apache.amoro.scan.TableEntriesScan#buildDeleteFile:
fileRecord.get(dataFileFieldIndex(DataFile.PARTITION_NAME), StructLike.class) returns values for all PartitionSpec columns, but the code assumes it only returns values for the columns in the current/latest PartitionSpec.
Are you willing to submit a PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct