Skip to content

Commit

Permalink
fixup! Add support for case-insensitive object names in Iceberg REST …
Browse files Browse the repository at this point in the history
…catalog - use single cache
  • Loading branch information
mayankvadariya committed Nov 4, 2024
1 parent 09873f4 commit 72f3ba7
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ public class TrinoIcebergRestCatalogFactory
private final boolean uniqueTableLocation;
private final TypeManager typeManager;
private final boolean caseInsensitiveNameMatching;
private final Cache<Namespace, Namespace> remoteNamespaceMappingCache;
private final Cache<TableIdentifier, TableIdentifier> remoteTableMappingCache;
private final Cache<TableIdentifier, TableIdentifier> remoteViewMappingCache;
private final Optional<Cache<Namespace, Namespace>> remoteNamespaceMappingCache;
private final Optional<Cache<TableIdentifier, TableIdentifier>> remoteTableMappingCache;

@GuardedBy("this")
private RESTSessionCatalog icebergCatalog;
Expand Down Expand Up @@ -93,18 +92,20 @@ public TrinoIcebergRestCatalogFactory(
this.uniqueTableLocation = icebergConfig.isUniqueTableLocation();
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.caseInsensitiveNameMatching = restConfig.isCaseInsensitiveNameMatching();
this.remoteNamespaceMappingCache = EvictableCacheBuilder.newBuilder()
.expireAfterWrite(restConfig.getCaseInsensitiveNameMatchingCacheTtl().toMillis(), MILLISECONDS)
.shareNothingWhenDisabled()
.build();
this.remoteTableMappingCache = EvictableCacheBuilder.newBuilder()
.expireAfterWrite(restConfig.getCaseInsensitiveNameMatchingCacheTtl().toMillis(), MILLISECONDS)
.shareNothingWhenDisabled()
.build();
this.remoteViewMappingCache = EvictableCacheBuilder.newBuilder()
.expireAfterWrite(restConfig.getCaseInsensitiveNameMatchingCacheTtl().toMillis(), MILLISECONDS)
.shareNothingWhenDisabled()
.build();
this.remoteNamespaceMappingCache =
caseInsensitiveNameMatching
? Optional.of(EvictableCacheBuilder.newBuilder()
.expireAfterWrite(restConfig.getCaseInsensitiveNameMatchingCacheTtl().toMillis(), MILLISECONDS)
.shareNothingWhenDisabled()
.build())
: Optional.empty();
this.remoteTableMappingCache =
caseInsensitiveNameMatching
? Optional.of(EvictableCacheBuilder.newBuilder()
.expireAfterWrite(restConfig.getCaseInsensitiveNameMatchingCacheTtl().toMillis(), MILLISECONDS)
.shareNothingWhenDisabled()
.build())
: Optional.empty();
}

@Override
Expand Down Expand Up @@ -152,7 +153,6 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity)
uniqueTableLocation,
caseInsensitiveNameMatching,
remoteNamespaceMappingCache,
remoteTableMappingCache,
remoteViewMappingCache);
remoteTableMappingCache);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ public class TrinoRestCatalog
private final String trinoVersion;
private final boolean useUniqueTableLocation;
private final boolean caseInsensitiveNameMatching;
private final Cache<Namespace, Namespace> remoteNamespaceMappingCache;
private final Cache<TableIdentifier, TableIdentifier> remoteTableMappingCache;
private final Cache<TableIdentifier, TableIdentifier> remoteViewMappingCache;
private final Optional<Cache<Namespace, Namespace>> remoteNamespaceMappingCache;
private final Optional<Cache<TableIdentifier, TableIdentifier>> remoteTableMappingCache;

private final Cache<SchemaTableName, Table> tableCache = EvictableCacheBuilder.newBuilder()
.maximumSize(PER_QUERY_CACHE_SIZE)
Expand All @@ -136,9 +135,8 @@ public TrinoRestCatalog(
TypeManager typeManager,
boolean useUniqueTableLocation,
boolean caseInsensitiveNameMatching,
Cache<Namespace, Namespace> remoteNamespaceMappingCache,
Cache<TableIdentifier, TableIdentifier> remoteTableMappingCache,
Cache<TableIdentifier, TableIdentifier> remoteViewMappingCache)
Optional<Cache<Namespace, Namespace>> remoteNamespaceMappingCache,
Optional<Cache<TableIdentifier, TableIdentifier>> remoteTableMappingCache)
{
this.restSessionCatalog = requireNonNull(restSessionCatalog, "restSessionCatalog is null");
this.catalogName = requireNonNull(catalogName, "catalogName is null");
Expand All @@ -151,7 +149,8 @@ public TrinoRestCatalog(
this.caseInsensitiveNameMatching = caseInsensitiveNameMatching;
this.remoteNamespaceMappingCache = requireNonNull(remoteNamespaceMappingCache, "remoteNamespaceMappingCache is null");
this.remoteTableMappingCache = requireNonNull(remoteTableMappingCache, "remoteTableMappingCache is null");
this.remoteViewMappingCache = requireNonNull(remoteViewMappingCache, "remoteViewMappingCache is null");
checkArgument(caseInsensitiveNameMatching == remoteNamespaceMappingCache.isPresent());
checkArgument(caseInsensitiveNameMatching == remoteTableMappingCache.isPresent());
}

@Override
Expand Down Expand Up @@ -194,7 +193,7 @@ public void dropNamespace(ConnectorSession session, String namespace)
throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to drop namespace: %s", namespace), e);
}
if (caseInsensitiveNameMatching) {
remoteNamespaceMappingCache.invalidate(toNamespace(namespace));
remoteNamespaceMappingCache.get().invalidate(toNamespace(namespace));
}
}

Expand Down Expand Up @@ -364,7 +363,7 @@ public void unregisterTable(ConnectorSession session, SchemaTableName tableName)
}
invalidateTableCache(tableName);
if (caseInsensitiveNameMatching) {
remoteTableMappingCache.invalidate(toIdentifier(tableName));
remoteTableMappingCache.get().invalidate(toIdentifier(tableName));
}
}

Expand All @@ -376,7 +375,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
}
invalidateTableCache(schemaTableName);
if (caseInsensitiveNameMatching) {
remoteTableMappingCache.invalidate(toIdentifier(schemaTableName));
remoteTableMappingCache.get().invalidate(toIdentifier(schemaTableName));
}
}

Expand All @@ -399,7 +398,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa
}
invalidateTableCache(from);
if (caseInsensitiveNameMatching) {
remoteTableMappingCache.invalidate(toIdentifier(from));
remoteTableMappingCache.get().invalidate(toIdentifier(from));
}
}

Expand Down Expand Up @@ -427,16 +426,17 @@ public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName

private TableIdentifier toRemoteObject(ConnectorSession session, SchemaTableName schemaTableName)
{
TableIdentifier liveTable = toLiveRemoteTable(session, schemaTableName);
TableIdentifier liveView = toLiveRemoteView(session, schemaTableName);
if (!liveTable.name().equals(schemaTableName.getTableName())) {
return liveTable;
TableIdentifier remoteTable = toLiveRemoteTable(session, schemaTableName);
if (!remoteTable.name().equals(schemaTableName.getTableName())) {
return remoteTable;
}
else if (!liveView.name().equals(schemaTableName.getTableName())) {
return liveView;

TableIdentifier remoteView = toLiveRemoteView(session, schemaTableName);
if (!remoteView.name().equals(schemaTableName.getTableName())) {
return remoteView;
}
else if (liveView.name().equals(schemaTableName.getTableName()) && liveTable.name().equals(schemaTableName.getTableName())) {
return liveTable; // table name and view name are same
else if (remoteView.name().equals(schemaTableName.getTableName()) && remoteTable.name().equals(schemaTableName.getTableName())) {
return remoteTable; // table name and view name are same
}
throw new RuntimeException("Unable to find remote object");
}
Expand Down Expand Up @@ -515,7 +515,7 @@ public void renameView(ConnectorSession session, SchemaTableName source, SchemaT
{
restSessionCatalog.renameView(convert(session), toRemoteView(session, source), toRemoteView(session, target));
if (caseInsensitiveNameMatching) {
remoteViewMappingCache.invalidate(toIdentifier(source));
remoteTableMappingCache.get().invalidate(toIdentifier(source));
}
}

Expand All @@ -530,7 +530,7 @@ public void dropView(ConnectorSession session, SchemaTableName schemaViewName)
{
restSessionCatalog.dropView(convert(session), toRemoteView(session, schemaViewName));
if (caseInsensitiveNameMatching) {
remoteViewMappingCache.invalidate(toIdentifier(schemaViewName));
remoteTableMappingCache.get().invalidate(toIdentifier(schemaViewName));
}
}

Expand Down Expand Up @@ -584,10 +584,8 @@ public Optional<ConnectorViewDefinition> getView(ConnectorSession session, Schem
private Optional<View> getIcebergView(ConnectorSession session, SchemaTableName viewName, boolean findLiveView)
{
try {
if (caseInsensitiveNameMatching) {
return Optional.of(restSessionCatalog.loadView(convert(session), findLiveView ? toLiveRemoteView(session, viewName) : toRemoteView(session, viewName)));
}
return Optional.of(restSessionCatalog.loadView(convert(session), toRemoteView(session, viewName)));
return Optional.of(
restSessionCatalog.loadView(convert(session), caseInsensitiveNameMatching && findLiveView ? toLiveRemoteView(session, viewName) : toRemoteView(session, viewName)));
}
catch (NoSuchViewException e) {
return Optional.empty();
Expand Down Expand Up @@ -766,7 +764,7 @@ private TableIdentifier toRemoteTable(ConnectorSession session, SchemaTableName
TableIdentifier tableIdentifier = toIdentifier(schemaTableName);
if (caseInsensitiveNameMatching) {
try {
return remoteTableMappingCache.get(tableIdentifier, () -> findRemoteTable(session, tableIdentifier));
return remoteTableMappingCache.get().get(tableIdentifier, () -> findRemoteTable(session, tableIdentifier));
}
catch (ExecutionException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -806,7 +804,7 @@ private TableIdentifier toRemoteView(ConnectorSession session, SchemaTableName s
TableIdentifier tableIdentifier = toIdentifier(schemaViewName);
if (caseInsensitiveNameMatching) {
try {
return remoteViewMappingCache.get(tableIdentifier, () -> findRemoteView(session, tableIdentifier));
return remoteTableMappingCache.get().get(tableIdentifier, () -> findRemoteView(session, tableIdentifier));
}
catch (ExecutionException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -845,7 +843,7 @@ private Namespace toRemoteNamespace(ConnectorSession session, Namespace trinoNam
{
if (caseInsensitiveNameMatching) {
try {
return remoteNamespaceMappingCache.get(trinoNamespace, () -> findRemoteNamespace(session, trinoNamespace));
return remoteNamespaceMappingCache.get().get(trinoNamespace, () -> findRemoteNamespace(session, trinoNamespace));
}
catch (ExecutionException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.trino.cache.EvictableCacheBuilder;
import io.trino.metastore.TableInfo;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.iceberg.CommitTaskData;
Expand Down Expand Up @@ -54,7 +53,6 @@
import static io.trino.testing.TestingConnectorSession.SESSION;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.util.Locale.ENGLISH;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -92,9 +90,8 @@ private static TrinoRestCatalog createTrinoRestCatalog(boolean useUniqueTableLoc
new TestingTypeManager(),
useUniqueTableLocations,
false,
EvictableCacheBuilder.newBuilder().expireAfterWrite(1, MILLISECONDS).shareNothingWhenDisabled().build(),
EvictableCacheBuilder.newBuilder().expireAfterWrite(1, MILLISECONDS).shareNothingWhenDisabled().build(),
EvictableCacheBuilder.newBuilder().expireAfterWrite(1, MILLISECONDS).shareNothingWhenDisabled().build());
Optional.empty(),
Optional.empty());
}

@Test
Expand Down

0 comments on commit 72f3ba7

Please sign in to comment.