Skip to content

Commit

Permalink
[core] Introduce catalog loader in Catalog to replace MetastoreClient (
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jan 9, 2025
1 parent 4edc05b commit 24c703a
Show file tree
Hide file tree
Showing 49 changed files with 899 additions and 825 deletions.
23 changes: 11 additions & 12 deletions paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.metastore.AddPartitionTagCallback;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.ManifestsReader;
Expand All @@ -45,6 +44,7 @@
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.PartitionHandler;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TagCallback;
Expand Down Expand Up @@ -345,11 +345,9 @@ public PartitionExpire newPartitionExpire(String commitUser) {
return null;
}

MetastoreClient.Factory metastoreClientFactory =
catalogEnvironment.metastoreClientFactory();
MetastoreClient metastoreClient = null;
if (options.partitionedTableInMetastore() && metastoreClientFactory != null) {
metastoreClient = metastoreClientFactory.create();
PartitionHandler partitionHandler = null;
if (options.partitionedTableInMetastore()) {
partitionHandler = catalogEnvironment.partitionHandler();
}

return new PartitionExpire(
Expand All @@ -358,7 +356,7 @@ public PartitionExpire newPartitionExpire(String commitUser) {
PartitionExpireStrategy.createPartitionExpireStrategy(options, partitionType()),
newScan(),
newCommit(commitUser),
metastoreClient,
partitionHandler,
options.endInputCheckPartitionExpire(),
options.partitionExpireMaxNum());
}
Expand All @@ -377,11 +375,12 @@ public TagAutoManager newTagCreationManager() {
public List<TagCallback> createTagCallbacks() {
List<TagCallback> callbacks = new ArrayList<>(CallbackUtils.loadTagCallbacks(options));
String partitionField = options.tagToPartitionField();
MetastoreClient.Factory metastoreClientFactory =
catalogEnvironment.metastoreClientFactory();
if (partitionField != null && metastoreClientFactory != null) {
callbacks.add(
new AddPartitionTagCallback(metastoreClientFactory.create(), partitionField));

if (partitionField != null) {
PartitionHandler partitionHandler = catalogEnvironment.partitionHandler();
if (partitionHandler != null) {
callbacks.add(new AddPartitionTagCallback(partitionHandler, partitionField));
}
}
if (options.tagCreateSuccessFile()) {
callbacks.add(new SuccessFileTagCallback(fileIO, newTagManager().tagDirectory()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -410,7 +409,7 @@ protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExist
lockFactory().orElse(null),
lockContext().orElse(null),
identifier),
metastoreClientFactory(identifier).orElse(null)));
catalogLoader()));
CoreOptions options = table.coreOptions();
if (options.type() == TableType.OBJECT_TABLE) {
String objectLocation = options.objectLocation();
Expand Down Expand Up @@ -477,11 +476,6 @@ protected TableMeta getDataTableMeta(Identifier identifier) throws TableNotExist
protected abstract TableSchema getDataTableSchema(Identifier identifier)
throws TableNotExistException;

/** Get metastore client factory for the table specified by {@code identifier}. */
public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier identifier) {
return Optional.empty();
}

public Path getTableLocation(Identifier identifier) {
return new Path(newDatabasePath(identifier.getDatabaseName()), identifier.getTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.catalog;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -53,60 +54,45 @@
/** A {@link Catalog} to cache databases and tables and manifests. */
public class CachingCatalog extends DelegateCatalog {

private final Options options;

private final Duration expirationInterval;
private final int snapshotMaxNumPerTable;
private final long cachedPartitionMaxNum;

protected final Cache<String, Database> databaseCache;
protected final Cache<Identifier, Table> tableCache;
protected Cache<String, Database> databaseCache;
protected Cache<Identifier, Table> tableCache;
@Nullable protected final SegmentsCache<Path> manifestCache;

// partition cache will affect data latency
@Nullable protected final Cache<Identifier, List<Partition>> partitionCache;

public CachingCatalog(Catalog wrapped) {
this(
wrapped,
CACHE_EXPIRATION_INTERVAL_MS.defaultValue(),
CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue(),
CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes(),
CACHE_PARTITION_MAX_NUM.defaultValue(),
CACHE_SNAPSHOT_MAX_NUM_PER_TABLE.defaultValue());
}

public CachingCatalog(
Catalog wrapped,
Duration expirationInterval,
MemorySize manifestMaxMemory,
long manifestCacheThreshold,
long cachedPartitionMaxNum,
int snapshotMaxNumPerTable) {
this(
wrapped,
expirationInterval,
manifestMaxMemory,
manifestCacheThreshold,
cachedPartitionMaxNum,
snapshotMaxNumPerTable,
Ticker.systemTicker());
}
@Nullable protected Cache<Identifier, List<Partition>> partitionCache;

public CachingCatalog(
Catalog wrapped,
Duration expirationInterval,
MemorySize manifestMaxMemory,
long manifestCacheThreshold,
long cachedPartitionMaxNum,
int snapshotMaxNumPerTable,
Ticker ticker) {
public CachingCatalog(Catalog wrapped, Options options) {
super(wrapped);
this.options = options;
MemorySize manifestMaxMemory = options.get(CACHE_MANIFEST_SMALL_FILE_MEMORY);
long manifestCacheThreshold = options.get(CACHE_MANIFEST_SMALL_FILE_THRESHOLD).getBytes();
Optional<MemorySize> maxMemory = options.getOptional(CACHE_MANIFEST_MAX_MEMORY);
if (maxMemory.isPresent() && maxMemory.get().compareTo(manifestMaxMemory) > 0) {
// cache all manifest files
manifestMaxMemory = maxMemory.get();
manifestCacheThreshold = Long.MAX_VALUE;
}

this.expirationInterval = options.get(CACHE_EXPIRATION_INTERVAL_MS);
if (expirationInterval.isZero() || expirationInterval.isNegative()) {
throw new IllegalArgumentException(
"When cache.expiration-interval is set to negative or 0, the catalog cache should be disabled.");
}
this.snapshotMaxNumPerTable = options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE);
this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold);

this.expirationInterval = expirationInterval;
this.snapshotMaxNumPerTable = snapshotMaxNumPerTable;
this.cachedPartitionMaxNum = options.get(CACHE_PARTITION_MAX_NUM);
init(Ticker.systemTicker());
}

@VisibleForTesting
void init(Ticker ticker) {
this.databaseCache =
Caffeine.newBuilder()
.softValues()
Expand All @@ -121,7 +107,6 @@ public CachingCatalog(
.expireAfterAccess(expirationInterval)
.ticker(ticker)
.build();
this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold);
this.partitionCache =
cachedPartitionMaxNum == 0
? null
Expand All @@ -142,21 +127,12 @@ public static Catalog tryToCreate(Catalog catalog, Options options) {
return catalog;
}

MemorySize manifestMaxMemory = options.get(CACHE_MANIFEST_SMALL_FILE_MEMORY);
long manifestThreshold = options.get(CACHE_MANIFEST_SMALL_FILE_THRESHOLD).getBytes();
Optional<MemorySize> maxMemory = options.getOptional(CACHE_MANIFEST_MAX_MEMORY);
if (maxMemory.isPresent() && maxMemory.get().compareTo(manifestMaxMemory) > 0) {
// cache all manifest files
manifestMaxMemory = maxMemory.get();
manifestThreshold = Long.MAX_VALUE;
}
return new CachingCatalog(
catalog,
options.get(CACHE_EXPIRATION_INTERVAL_MS),
manifestMaxMemory,
manifestThreshold,
options.get(CACHE_PARTITION_MAX_NUM),
options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE));
return new CachingCatalog(catalog, options);
}

@Override
public CatalogLoader catalogLoader() {
return new CachingCatalogLoader(wrapped.catalogLoader(), options);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.options.Options;

/** Loader to create {@link CachingCatalog}. */
public class CachingCatalogLoader implements CatalogLoader {

private static final long serialVersionUID = 1L;

private final CatalogLoader catalogLoader;
private final Options options;

public CachingCatalogLoader(CatalogLoader catalogLoader, Options options) {
this.catalogLoader = catalogLoader;
this.options = options;
}

@Override
public Catalog load() {
return CachingCatalog.tryToCreate(catalogLoader.load(), options);
}
}
75 changes: 42 additions & 33 deletions paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,39 +40,6 @@
@Public
public interface Catalog extends AutoCloseable {

// constants for system table and database
String SYSTEM_TABLE_SPLITTER = "$";
String SYSTEM_DATABASE_NAME = "sys";
String SYSTEM_BRANCH_PREFIX = "branch_";

// constants for table and database
String COMMENT_PROP = "comment";
String OWNER_PROP = "owner";

// constants for database
String DEFAULT_DATABASE = "default";
String DB_SUFFIX = ".db";
String DB_LOCATION_PROP = "location";

// constants for table
String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
String NUM_ROWS_PROP = "numRows";
String NUM_FILES_PROP = "numFiles";
String TOTAL_SIZE_PROP = "totalSize";
String LAST_UPDATE_TIME_PROP = "lastUpdateTime";

/** Warehouse root path for creating new databases. */
String warehouse();

/** {@link FileIO} of this catalog. It can access {@link #warehouse()} path. */
FileIO fileIO();

/** Catalog options for re-creating this catalog. */
Map<String, String> options();

/** Return a boolean that indicates whether this catalog is case-sensitive. */
boolean caseSensitive();

// ======================= database methods ===============================

/**
Expand Down Expand Up @@ -399,6 +366,48 @@ default void repairTable(Identifier identifier) throws TableNotExistException {
throw new UnsupportedOperationException();
}

// ==================== Catalog Information ==========================

/** Warehouse root path for creating new databases. */
String warehouse();

/** {@link FileIO} of this catalog. It can access {@link #warehouse()} path. */
FileIO fileIO();

/** Catalog options for re-creating this catalog. */
Map<String, String> options();

/** Serializable loader to create catalog. */
CatalogLoader catalogLoader();

/** Return a boolean that indicates whether this catalog is case-sensitive. */
boolean caseSensitive();

// ======================= Constants ===============================

// constants for system table and database
String SYSTEM_TABLE_SPLITTER = "$";
String SYSTEM_DATABASE_NAME = "sys";
String SYSTEM_BRANCH_PREFIX = "branch_";

// constants for table and database
String COMMENT_PROP = "comment";
String OWNER_PROP = "owner";

// constants for database
String DEFAULT_DATABASE = "default";
String DB_SUFFIX = ".db";
String DB_LOCATION_PROP = "location";

// constants for table
String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
String NUM_ROWS_PROP = "numRows";
String NUM_FILES_PROP = "numFiles";
String TOTAL_SIZE_PROP = "totalSize";
String LAST_UPDATE_TIME_PROP = "lastUpdateTime";

// ======================= Exceptions ===============================

/** Exception for trying to drop on a database that is not empty. */
class DatabaseNotEmptyException extends Exception {
private static final String MSG = "Database %s is not empty.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@

package org.apache.paimon.catalog;

import org.apache.paimon.annotation.Public;

import java.io.Serializable;

/** Loader for creating a {@link Catalog}. */
/**
* Loader for creating a {@link Catalog}.
*
* @since 1.1.0
*/
@Public
@FunctionalInterface
public interface CatalogLoader extends Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Map;

/** A {@link Catalog} to delegate all operations to another {@link Catalog}. */
public class DelegateCatalog implements Catalog {
public abstract class DelegateCatalog implements Catalog {

protected final Catalog wrapped;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ public String warehouse() {
return warehouse.toString();
}

@Override
public CatalogLoader catalogLoader() {
return new FileSystemCatalogLoader(fileIO, warehouse, catalogOptions);
}

@Override
public boolean caseSensitive() {
return catalogOptions.getOptional(CASE_SENSITIVE).orElse(true);
Expand Down
Loading

0 comments on commit 24c703a

Please sign in to comment.