Skip to content

Commit 79154d4

Browse files
authored
[core] Remove Catalog.fileio method (#4973)
1 parent cfb0075 commit 79154d4

File tree

20 files changed

+126
-162
lines changed

20 files changed

+126
-162
lines changed

paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public Map<String, String> options() {
8686
return catalogOptions.toMap();
8787
}
8888

89-
@Override
89+
public abstract String warehouse();
90+
9091
public FileIO fileIO() {
9192
return fileIO;
9293
}

paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java

-7
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.paimon.catalog;
2020

2121
import org.apache.paimon.annotation.Public;
22-
import org.apache.paimon.fs.FileIO;
2322
import org.apache.paimon.partition.Partition;
2423
import org.apache.paimon.schema.Schema;
2524
import org.apache.paimon.schema.SchemaChange;
@@ -368,12 +367,6 @@ default void repairTable(Identifier identifier) throws TableNotExistException {
368367

369368
// ==================== Catalog Information ==========================
370369

371-
/** Warehouse root path for creating new databases. */
372-
String warehouse();
373-
374-
/** {@link FileIO} of this catalog. It can access {@link #warehouse()} path. */
375-
FileIO fileIO();
376-
377370
/** Catalog options for re-creating this catalog. */
378371
Map<String, String> options();
379372

paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java

+7-11
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.paimon.catalog;
2020

21-
import org.apache.paimon.fs.FileIO;
2221
import org.apache.paimon.partition.Partition;
2322
import org.apache.paimon.schema.Schema;
2423
import org.apache.paimon.schema.SchemaChange;
@@ -46,21 +45,11 @@ public boolean caseSensitive() {
4645
return wrapped.caseSensitive();
4746
}
4847

49-
@Override
50-
public String warehouse() {
51-
return wrapped.warehouse();
52-
}
53-
5448
@Override
5549
public Map<String, String> options() {
5650
return wrapped.options();
5751
}
5852

59-
@Override
60-
public FileIO fileIO() {
61-
return wrapped.fileIO();
62-
}
63-
6453
@Override
6554
public List<String> listDatabases() {
6655
return wrapped.listDatabases();
@@ -200,4 +189,11 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
200189
public void close() throws Exception {
201190
wrapped.close();
202191
}
192+
193+
public static Catalog rootCatalog(Catalog catalog) {
194+
while (catalog instanceof DelegateCatalog) {
195+
catalog = ((DelegateCatalog) catalog).wrapped();
196+
}
197+
return catalog;
198+
}
203199
}

paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java

+15-12
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.iceberg.migrate;
2020

21+
import org.apache.paimon.catalog.CatalogContext;
2122
import org.apache.paimon.catalog.Identifier;
2223
import org.apache.paimon.fs.FileIO;
2324
import org.apache.paimon.fs.Path;
@@ -38,16 +39,14 @@ public class IcebergMigrateHadoopMetadata implements IcebergMigrateMetadata {
3839
private static final String VERSION_HINT_FILENAME = "version-hint.text";
3940
private static final String ICEBERG_WAREHOUSE = "iceberg_warehouse";
4041

41-
private final FileIO fileIO;
4242
private final Identifier icebergIdentifier;
4343
private final Options icebergOptions;
4444

4545
private Path icebergLatestMetaVersionPath;
4646
private IcebergPathFactory icebergMetaPathFactory;
47+
private FileIO fileIO;
4748

48-
public IcebergMigrateHadoopMetadata(
49-
Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) {
50-
this.fileIO = fileIO;
49+
public IcebergMigrateHadoopMetadata(Identifier icebergIdentifier, Options icebergOptions) {
5150
this.icebergIdentifier = icebergIdentifier;
5251
this.icebergOptions = icebergOptions;
5352
}
@@ -58,15 +57,19 @@ public IcebergMetadata icebergMetadata() {
5857
icebergOptions.get(ICEBERG_WAREHOUSE) != null,
5958
"'iceberg_warehouse' is null. "
6059
+ "In hadoop-catalog, you should explicitly set this argument for finding iceberg metadata.");
60+
Path path =
61+
new Path(
62+
String.format(
63+
"%s/%s/metadata",
64+
icebergIdentifier.getDatabaseName(),
65+
icebergIdentifier.getTableName()));
66+
try {
67+
fileIO = FileIO.get(path, CatalogContext.create(icebergOptions));
68+
} catch (IOException e) {
69+
throw new RuntimeException(e);
70+
}
6171
this.icebergMetaPathFactory =
62-
new IcebergPathFactory(
63-
new Path(
64-
icebergOptions.get(ICEBERG_WAREHOUSE),
65-
new Path(
66-
String.format(
67-
"%s/%s/metadata",
68-
icebergIdentifier.getDatabaseName(),
69-
icebergIdentifier.getTableName()))));
72+
new IcebergPathFactory(new Path(icebergOptions.get(ICEBERG_WAREHOUSE), path));
7073
long icebergLatestMetaVersion = getIcebergLatestMetaVersion();
7174

7275
this.icebergLatestMetaVersionPath =

paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.paimon.iceberg.migrate;
2020

2121
import org.apache.paimon.catalog.Identifier;
22-
import org.apache.paimon.fs.FileIO;
2322
import org.apache.paimon.iceberg.IcebergOptions;
2423
import org.apache.paimon.options.Options;
2524

@@ -28,12 +27,12 @@ public class IcebergMigrateHadoopMetadataFactory implements IcebergMigrateMetada
2827

2928
@Override
3029
public String identifier() {
31-
return IcebergOptions.StorageType.HADOOP_CATALOG.toString() + "_migrate";
30+
return IcebergOptions.StorageType.HADOOP_CATALOG + "_migrate";
3231
}
3332

3433
@Override
3534
public IcebergMigrateHadoopMetadata create(
36-
Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) {
37-
return new IcebergMigrateHadoopMetadata(icebergIdentifier, fileIO, icebergOptions);
35+
Identifier icebergIdentifier, Options icebergOptions) {
36+
return new IcebergMigrateHadoopMetadata(icebergIdentifier, icebergOptions);
3837
}
3938
}

paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,10 @@
2020

2121
import org.apache.paimon.catalog.Identifier;
2222
import org.apache.paimon.factories.Factory;
23-
import org.apache.paimon.fs.FileIO;
2423
import org.apache.paimon.options.Options;
2524

2625
/** Factory to create {@link IcebergMigrateMetadata}. */
2726
public interface IcebergMigrateMetadataFactory extends Factory {
2827

29-
IcebergMigrateMetadata create(
30-
Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions);
28+
IcebergMigrateMetadata create(Identifier icebergIdentifier, Options icebergOptions);
3129
}

paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java

+7-19
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ public class IcebergMigrator implements Migrator {
7373
private final ThreadPoolExecutor executor;
7474

7575
private final Catalog paimonCatalog;
76-
private final FileIO paimonFileIO;
7776
private final String paimonDatabaseName;
7877
private final String paimonTableName;
7978

@@ -100,7 +99,6 @@ public IcebergMigrator(
10099
Options icebergOptions,
101100
Integer parallelism) {
102101
this.paimonCatalog = paimonCatalog;
103-
this.paimonFileIO = paimonCatalog.fileIO();
104102
this.paimonDatabaseName = paimonDatabaseName;
105103
this.paimonTableName = paimonTableName;
106104

@@ -126,9 +124,7 @@ public IcebergMigrator(
126124

127125
icebergMigrateMetadata =
128126
icebergMigrateMetadataFactory.create(
129-
Identifier.create(icebergDatabaseName, icebergTableName),
130-
paimonFileIO,
131-
icebergOptions);
127+
Identifier.create(icebergDatabaseName, icebergTableName), icebergOptions);
132128

133129
this.icebergMetadata = icebergMigrateMetadata.icebergMetadata();
134130
this.icebergLatestMetadataLocation = icebergMigrateMetadata.icebergLatestMetadataLocation();
@@ -148,6 +144,7 @@ public void executeMigrate() throws Exception {
148144

149145
try {
150146
FileStoreTable paimonTable = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);
147+
FileIO fileIO = paimonTable.fileIO();
151148

152149
IcebergManifestFile manifestFile =
153150
IcebergManifestFile.create(paimonTable, icebergMetaPathFactory);
@@ -214,8 +211,8 @@ public void executeMigrate() throws Exception {
214211
for (Map.Entry<Path, Path> entry : rollback.entrySet()) {
215212
Path newPath = entry.getKey();
216213
Path origin = entry.getValue();
217-
if (paimonFileIO.exists(newPath)) {
218-
paimonFileIO.rename(newPath, origin);
214+
if (fileIO.exists(newPath)) {
215+
fileIO.rename(newPath, origin);
219216
}
220217
}
221218

@@ -331,8 +328,7 @@ private MigrateTask importUnPartitionedTable(
331328
BinaryRow partitionRow = BinaryRow.EMPTY_ROW;
332329
Path newDir = paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
333330

334-
return new MigrateTask(
335-
icebergDataFileMetas, paimonFileIO, paimonTable, partitionRow, newDir, rollback);
331+
return new MigrateTask(icebergDataFileMetas, paimonTable, partitionRow, newDir, rollback);
336332
}
337333

338334
private List<MigrateTask> importPartitionedTable(
@@ -347,13 +343,7 @@ private List<MigrateTask> importPartitionedTable(
347343
BinaryRow partitionRow = entry.getKey();
348344
Path newDir = paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
349345
migrateTasks.add(
350-
new MigrateTask(
351-
entry.getValue(),
352-
paimonFileIO,
353-
paimonTable,
354-
partitionRow,
355-
newDir,
356-
rollback));
346+
new MigrateTask(entry.getValue(), paimonTable, partitionRow, newDir, rollback));
357347
}
358348
return migrateTasks;
359349
}
@@ -362,21 +352,18 @@ private List<MigrateTask> importPartitionedTable(
362352
public static class MigrateTask implements Callable<CommitMessage> {
363353

364354
private final List<IcebergDataFileMeta> icebergDataFileMetas;
365-
private final FileIO fileIO;
366355
private final FileStoreTable paimonTable;
367356
private final BinaryRow partitionRow;
368357
private final Path newDir;
369358
private final Map<Path, Path> rollback;
370359

371360
public MigrateTask(
372361
List<IcebergDataFileMeta> icebergDataFileMetas,
373-
FileIO fileIO,
374362
FileStoreTable paimonTable,
375363
BinaryRow partitionRow,
376364
Path newDir,
377365
Map<Path, Path> rollback) {
378366
this.icebergDataFileMetas = icebergDataFileMetas;
379-
this.fileIO = fileIO;
380367
this.paimonTable = paimonTable;
381368
this.partitionRow = partitionRow;
382369
this.newDir = newDir;
@@ -385,6 +372,7 @@ public MigrateTask(
385372

386373
@Override
387374
public CommitMessage call() throws Exception {
375+
FileIO fileIO = paimonTable.fileIO();
388376
if (!fileIO.exists(newDir)) {
389377
fileIO.mkdirs(newDir);
390378
}

paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java

+6-20
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.paimon.table.FileStoreTable;
2828
import org.apache.paimon.table.Table;
2929
import org.apache.paimon.utils.Pair;
30-
import org.apache.paimon.utils.SerializableConsumer;
3130

3231
import javax.annotation.Nullable;
3332

@@ -81,15 +80,11 @@ public LocalOrphanFilesClean(FileStoreTable table) {
8180
}
8281

8382
public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis) {
84-
this(table, olderThanMillis, path -> table.fileIO().deleteQuietly(path), false);
83+
this(table, olderThanMillis, false);
8584
}
8685

87-
public LocalOrphanFilesClean(
88-
FileStoreTable table,
89-
long olderThanMillis,
90-
SerializableConsumer<Path> fileCleaner,
91-
boolean dryRun) {
92-
super(table, olderThanMillis, fileCleaner);
86+
public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis, boolean dryRun) {
87+
super(table, olderThanMillis, dryRun);
9388
this.deleteFiles = new ArrayList<>();
9489
this.executor =
9590
createCachedThreadPool(
@@ -125,7 +120,7 @@ public CleanOrphanFilesResult clean()
125120
.forEach(
126121
deleteFileInfo -> {
127122
deletedFilesLenInBytes.addAndGet(deleteFileInfo.getRight());
128-
fileCleaner.accept(deleteFileInfo.getLeft());
123+
cleanFile(deleteFileInfo.getLeft());
129124
});
130125
deleteFiles.addAll(
131126
candidateDeletes.stream()
@@ -239,7 +234,6 @@ public static List<LocalOrphanFilesClean> createOrphanFilesCleans(
239234
String databaseName,
240235
@Nullable String tableName,
241236
long olderThanMillis,
242-
SerializableConsumer<Path> fileCleaner,
243237
@Nullable Integer parallelism,
244238
boolean dryRun)
245239
throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
@@ -269,8 +263,7 @@ public static List<LocalOrphanFilesClean> createOrphanFilesCleans(
269263
table.getClass().getName());
270264

271265
orphanFilesCleans.add(
272-
new LocalOrphanFilesClean(
273-
(FileStoreTable) table, olderThanMillis, fileCleaner, dryRun));
266+
new LocalOrphanFilesClean((FileStoreTable) table, olderThanMillis, dryRun));
274267
}
275268

276269
return orphanFilesCleans;
@@ -281,19 +274,12 @@ public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
281274
String databaseName,
282275
@Nullable String tableName,
283276
long olderThanMillis,
284-
SerializableConsumer<Path> fileCleaner,
285277
@Nullable Integer parallelism,
286278
boolean dryRun)
287279
throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
288280
List<LocalOrphanFilesClean> tableCleans =
289281
createOrphanFilesCleans(
290-
catalog,
291-
databaseName,
292-
tableName,
293-
olderThanMillis,
294-
fileCleaner,
295-
parallelism,
296-
dryRun);
282+
catalog, databaseName, tableName, olderThanMillis, parallelism, dryRun);
297283

298284
ExecutorService executorService =
299285
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

0 commit comments

Comments
 (0)