diff --git a/docs/src/main/sphinx/object-storage/metastores.md b/docs/src/main/sphinx/object-storage/metastores.md index 476052f1ad3fd4..65ea5cbeb43375 100644 --- a/docs/src/main/sphinx/object-storage/metastores.md +++ b/docs/src/main/sphinx/object-storage/metastores.md @@ -497,7 +497,12 @@ following properties: * - `iceberg.rest-catalog.vended-credentials-enabled` - Use credentials provided by the REST backend for file system access. Defaults to `false`. - ::: +* - `iceberg.rest-catalog.case-insensitive-name-matching` + - Match namespace, table, and view names case insensitively. Defaults to `false`. +* - `iceberg.rest-catalog.case-insensitive-name-matching.cache-ttl` + - [Duration](prop-type-duration) for which case-insensitive namespace, table, and view + names are cached. Defaults to `1m`. +::: The following example shows a minimal catalog configuration using an Iceberg REST metadata catalog: diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java index 2241abfb446de6..77377c59b84942 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java @@ -15,12 +15,16 @@ import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; +import io.airlift.units.Duration; +import io.airlift.units.MinDuration; import jakarta.validation.constraints.NotNull; import org.apache.iceberg.catalog.Namespace; import java.net.URI; import java.util.Optional; +import static java.util.concurrent.TimeUnit.MINUTES; + public class IcebergRestCatalogConfig { public enum Security @@ -42,6 +46,8 @@ public enum SessionType private Security security = Security.NONE; private SessionType sessionType = SessionType.NONE; private boolean vendedCredentialsEnabled; + private boolean caseInsensitiveNameMatching; + private Duration caseInsensitiveNameMatchingCacheTtl = new Duration(1, MINUTES); @NotNull public URI getBaseUri() @@ -138,4 +144,32 @@ public IcebergRestCatalogConfig setVendedCredentialsEnabled(boolean vendedCreden this.vendedCredentialsEnabled = vendedCredentialsEnabled; return this; } + + public boolean isCaseInsensitiveNameMatching() + { + return caseInsensitiveNameMatching; + } + + @Config("iceberg.rest-catalog.case-insensitive-name-matching") + @ConfigDescription("Match object names case-insensitively") + public IcebergRestCatalogConfig setCaseInsensitiveNameMatching(boolean caseInsensitiveNameMatching) + { + this.caseInsensitiveNameMatching = caseInsensitiveNameMatching; + return this; + } + + @NotNull + @MinDuration("0ms") + public Duration getCaseInsensitiveNameMatchingCacheTtl() + { + return caseInsensitiveNameMatchingCacheTtl; + } + + @Config("iceberg.rest-catalog.case-insensitive-name-matching.cache-ttl") + @ConfigDescription("Duration to keep case insensitive object mapping prior to eviction") + public IcebergRestCatalogConfig setCaseInsensitiveNameMatchingCacheTtl(Duration caseInsensitiveNameMatchingCacheTtl) + { + this.caseInsensitiveNameMatchingCacheTtl = caseInsensitiveNameMatchingCacheTtl; + return this; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java index 9af5ffbd703690..c81e9e0123b54f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java @@ -13,10 +13,12 @@ */ package io.trino.plugin.iceberg.catalog.rest; +import com.google.common.cache.Cache; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; +import io.trino.cache.EvictableCacheBuilder; import io.trino.plugin.hive.NodeVersion; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergFileSystemFactory; @@ -29,6 +31,7 @@ import io.trino.spi.type.TypeManager; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.rest.HTTPClient; import org.apache.iceberg.rest.RESTSessionCatalog; @@ -38,6 +41,7 @@ import java.util.Set; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.iceberg.rest.auth.OAuth2Properties.CREDENTIAL; import static org.apache.iceberg.rest.auth.OAuth2Properties.TOKEN; @@ -56,6 +60,10 @@ public class TrinoIcebergRestCatalogFactory private final SecurityProperties securityProperties; private final boolean uniqueTableLocation; private final TypeManager typeManager; + private final boolean caseInsensitiveNameMatching; + private final Cache remoteNamespaceMappingCache; + private final Cache remoteTableMappingCache; + private final Cache remoteViewMappingCache; @GuardedBy("this") private RESTSessionCatalog icebergCatalog; @@ -84,6 +92,19 @@ public TrinoIcebergRestCatalogFactory( requireNonNull(icebergConfig, "icebergConfig is null"); this.uniqueTableLocation = icebergConfig.isUniqueTableLocation(); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.caseInsensitiveNameMatching = restConfig.isCaseInsensitiveNameMatching(); + this.remoteNamespaceMappingCache = EvictableCacheBuilder.newBuilder() + .expireAfterWrite(restConfig.getCaseInsensitiveNameMatchingCacheTtl().toMillis(), MILLISECONDS) + .shareNothingWhenDisabled() + .build(); + this.remoteTableMappingCache = EvictableCacheBuilder.newBuilder() + .expireAfterWrite(restConfig.getCaseInsensitiveNameMatchingCacheTtl().toMillis(), MILLISECONDS) + .shareNothingWhenDisabled() + .build(); + this.remoteViewMappingCache = EvictableCacheBuilder.newBuilder() + .expireAfterWrite(restConfig.getCaseInsensitiveNameMatchingCacheTtl().toMillis(), MILLISECONDS) + .shareNothingWhenDisabled() + .build(); } @Override @@ -128,6 +149,10 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity) parentNamespace, trinoVersion, typeManager, - uniqueTableLocation); + uniqueTableLocation, + caseInsensitiveNameMatching, + remoteNamespaceMappingCache, + remoteTableMappingCache, + remoteViewMappingCache); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index 11fcf055ca4770..71c39cd5a6fa07 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.iceberg.catalog.rest; +import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList; @@ -77,6 +78,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.function.UnaryOperator; @@ -93,6 +95,7 @@ import static io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog.ICEBERG_VIEW_RUN_AS_OWNER; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.lang.String.format; +import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; import static java.util.stream.Collectors.joining; @@ -114,6 +117,10 @@ public class TrinoRestCatalog private final Namespace parentNamespace; private final String trinoVersion; private final boolean useUniqueTableLocation; + private final boolean caseInsensitiveNameMatching; + private final Cache remoteNamespaceMappingCache; + private final Cache remoteTableMappingCache; + private final Cache remoteViewMappingCache; private final Cache tableCache = EvictableCacheBuilder.newBuilder() .maximumSize(PER_QUERY_CACHE_SIZE) @@ -127,7 +134,11 @@ public TrinoRestCatalog( Namespace parentNamespace, String trinoVersion, TypeManager typeManager, - boolean useUniqueTableLocation) + boolean useUniqueTableLocation, + boolean caseInsensitiveNameMatching, + Cache remoteNamespaceMappingCache, + Cache remoteTableMappingCache, + Cache remoteViewMappingCache) { this.restSessionCatalog = requireNonNull(restSessionCatalog, "restSessionCatalog is null"); this.catalogName = requireNonNull(catalogName, "catalogName is null"); @@ -137,6 +148,10 @@ public TrinoRestCatalog( this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.useUniqueTableLocation = useUniqueTableLocation; + this.caseInsensitiveNameMatching = caseInsensitiveNameMatching; + this.remoteNamespaceMappingCache = requireNonNull(remoteNamespaceMappingCache, "remoteNamespaceMappingCache is null"); + this.remoteTableMappingCache = requireNonNull(remoteTableMappingCache, "remoteTableMappingCache is null"); + this.remoteViewMappingCache = requireNonNull(remoteViewMappingCache, "remoteViewMappingCache is null"); } @Override @@ -148,7 +163,7 @@ public Optional getNamespaceSeparator() @Override public boolean namespaceExists(ConnectorSession session, String namespace) { - return restSessionCatalog.namespaceExists(convert(session), toNamespace(namespace)); + return restSessionCatalog.namespaceExists(convert(session), toRemoteNamespace(session, toNamespace(namespace))); } @Override @@ -170,7 +185,7 @@ private List collectNamespaces(ConnectorSession session, Namespace paren public void dropNamespace(ConnectorSession session, String namespace) { try { - restSessionCatalog.dropNamespace(convert(session), toNamespace(namespace)); + restSessionCatalog.dropNamespace(convert(session), toRemoteNamespace(session, toNamespace(namespace))); } catch (NoSuchNamespaceException e) { throw new SchemaNotFoundException(namespace); @@ -178,6 +193,9 @@ public void dropNamespace(ConnectorSession session, String namespace) catch (RESTException e) { throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to drop namespace: %s", namespace), e); } + if (caseInsensitiveNameMatching) { + remoteNamespaceMappingCache.invalidate(toNamespace(namespace)); + } } @Override @@ -185,7 +203,7 @@ public Map loadNamespaceMetadata(ConnectorSession session, Strin { try { // Return immutable metadata as direct modifications will not be reflected on the namespace - return ImmutableMap.copyOf(restSessionCatalog.loadNamespaceMetadata(convert(session), toNamespace(namespace))); + return ImmutableMap.copyOf(restSessionCatalog.loadNamespaceMetadata(convert(session), toRemoteNamespace(session, toNamespace(namespace)))); } catch (NoSuchNamespaceException e) { throw new SchemaNotFoundException(namespace); @@ -233,10 +251,10 @@ public List listTables(ConnectorSession session, Optional nam ImmutableList.Builder tables = ImmutableList.builder(); for (Namespace restNamespace : namespaces) { - listTableIdentifiers(restNamespace, () -> restSessionCatalog.listTables(sessionContext, restNamespace)).stream() + listTableIdentifiers(restNamespace, () -> restSessionCatalog.listTables(sessionContext, toRemoteNamespace(session, restNamespace))).stream() .map(id -> new TableInfo(SchemaTableName.schemaTableName(toSchemaName(id.namespace()), id.name()), TableInfo.ExtendedRelationType.TABLE)) .forEach(tables::add); - listTableIdentifiers(restNamespace, () -> restSessionCatalog.listViews(sessionContext, restNamespace)).stream() + listTableIdentifiers(restNamespace, () -> restSessionCatalog.listViews(sessionContext, toRemoteNamespace(session, restNamespace))).stream() .map(id -> new TableInfo(SchemaTableName.schemaTableName(toSchemaName(id.namespace()), id.name()), TableInfo.ExtendedRelationType.OTHER_VIEW)) .forEach(tables::add); } @@ -251,7 +269,7 @@ public List listViews(ConnectorSession session, Optional viewNames = ImmutableList.builder(); for (Namespace restNamespace : namespaces) { - listTableIdentifiers(restNamespace, () -> restSessionCatalog.listViews(sessionContext, restNamespace)).stream() + listTableIdentifiers(restNamespace, () -> restSessionCatalog.listViews(sessionContext, toRemoteNamespace(session, restNamespace))).stream() .map(id -> SchemaTableName.schemaTableName(id.namespace().toString(), id.name())) .forEach(viewNames::add); } @@ -305,7 +323,7 @@ public Transaction newCreateTableTransaction( String location, Map properties) { - return restSessionCatalog.buildTable(convert(session), toIdentifier(schemaTableName), schema) + return restSessionCatalog.buildTable(convert(session), toRemoteTable(session, schemaTableName), schema) .withPartitionSpec(partitionSpec) .withSortOrder(sortOrder) .withLocation(location) @@ -323,7 +341,7 @@ public Transaction newCreateOrReplaceTableTransaction( String location, Map properties) { - return restSessionCatalog.buildTable(convert(session), toIdentifier(schemaTableName), schema) + return restSessionCatalog.buildTable(convert(session), toRemoteTable(session, schemaTableName), schema) .withPartitionSpec(partitionSpec) .withSortOrder(sortOrder) .withLocation(location) @@ -334,25 +352,32 @@ public Transaction newCreateOrReplaceTableTransaction( @Override public void registerTable(ConnectorSession session, SchemaTableName tableName, TableMetadata tableMetadata) { - restSessionCatalog.registerTable(convert(session), toIdentifier(tableName), tableMetadata.metadataFileLocation()); + TableIdentifier tableIdentifier = TableIdentifier.of(toRemoteNamespace(session, toNamespace(tableName.getSchemaName())), tableName.getTableName()); + restSessionCatalog.registerTable(convert(session), tableIdentifier, tableMetadata.metadataFileLocation()); } @Override public void unregisterTable(ConnectorSession session, SchemaTableName tableName) { - if (!restSessionCatalog.dropTable(convert(session), toIdentifier(tableName))) { + if (!restSessionCatalog.dropTable(convert(session), toRemoteTable(session, tableName))) { throw new TableNotFoundException(tableName); } invalidateTableCache(tableName); + if (caseInsensitiveNameMatching) { + remoteTableMappingCache.invalidate(toIdentifier(tableName)); + } } @Override public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) { - if (!restSessionCatalog.purgeTable(convert(session), toIdentifier(schemaTableName))) { + if (!restSessionCatalog.purgeTable(convert(session), toRemoteTable(session, schemaTableName))) { throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to drop table: %s", schemaTableName)); } invalidateTableCache(schemaTableName); + if (caseInsensitiveNameMatching) { + remoteTableMappingCache.invalidate(toIdentifier(schemaTableName)); + } } @Override @@ -367,12 +392,15 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) { try { - restSessionCatalog.renameTable(convert(session), toIdentifier(from), toIdentifier(to)); + restSessionCatalog.renameTable(convert(session), toRemoteTable(session, from), toRemoteTable(session, to)); } catch (RESTException e) { throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to rename table %s to %s", from, to), e); } invalidateTableCache(from); + if (caseInsensitiveNameMatching) { + remoteTableMappingCache.invalidate(toIdentifier(from)); + } } @Override @@ -384,9 +412,7 @@ public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName tableCache, schemaTableName, () -> { - TableIdentifier identifier = TableIdentifier.of(namespace, schemaTableName.getTableName()); - - BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), identifier); + BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), toRemoteTable(session, schemaTableName)); // Creating a new base table is necessary to adhere to Trino's expectations for quoted table names return new BaseTable(baseTable.operations(), quotedTableName(schemaTableName)); }); @@ -408,7 +434,7 @@ public Map> tryGetColumnMetadata(Connector @Override public void updateTableComment(ConnectorSession session, SchemaTableName schemaTableName, Optional comment) { - Table icebergTable = restSessionCatalog.loadTable(convert(session), toIdentifier(schemaTableName)); + Table icebergTable = restSessionCatalog.loadTable(convert(session), toRemoteTable(session, schemaTableName)); if (comment.isEmpty()) { icebergTable.updateProperties().remove(TABLE_COMMENT).commit(); } @@ -452,10 +478,10 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName, definition.getOwner().ifPresent(owner -> properties.put(ICEBERG_VIEW_RUN_AS_OWNER, owner)); definition.getComment().ifPresent(comment -> properties.put(COMMENT, comment)); Schema schema = IcebergUtil.schemaFromViewColumns(typeManager, definition.getColumns()); - ViewBuilder viewBuilder = restSessionCatalog.buildView(convert(session), toIdentifier(schemaViewName)); + ViewBuilder viewBuilder = restSessionCatalog.buildView(convert(session), toRemoteView(session, schemaViewName)); viewBuilder = viewBuilder.withSchema(schema) .withQuery("trino", definition.getOriginalSql()) - .withDefaultNamespace(toNamespace(schemaViewName.getSchemaName())) + .withDefaultNamespace(toRemoteNamespace(session, toNamespace(schemaViewName.getSchemaName()))) .withDefaultCatalog(definition.getCatalog().orElse(null)) .withProperties(properties.buildOrThrow()) .withLocation(defaultTableLocation(session, schemaViewName)); @@ -471,7 +497,10 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName, @Override public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target) { - restSessionCatalog.renameView(convert(session), toIdentifier(source), toIdentifier(target)); + restSessionCatalog.renameView(convert(session), toRemoteView(session, source), toRemoteView(session, target)); + if (caseInsensitiveNameMatching) { + remoteViewMappingCache.invalidate(toIdentifier(source)); + } } @Override @@ -483,7 +512,10 @@ public void setViewPrincipal(ConnectorSession session, SchemaTableName schemaVie @Override public void dropView(ConnectorSession session, SchemaTableName schemaViewName) { - restSessionCatalog.dropView(convert(session), toIdentifier(schemaViewName)); + restSessionCatalog.dropView(convert(session), toRemoteView(session, schemaViewName)); + if (caseInsensitiveNameMatching) { + remoteViewMappingCache.invalidate(toIdentifier(schemaViewName)); + } } @Override @@ -492,7 +524,7 @@ public Map getViews(ConnectorSession s SessionContext sessionContext = convert(session); ImmutableMap.Builder views = ImmutableMap.builder(); for (Namespace restNamespace : listNamespaces(session, namespace)) { - for (TableIdentifier restView : restSessionCatalog.listViews(sessionContext, restNamespace)) { + for (TableIdentifier restView : restSessionCatalog.listViews(sessionContext, toRemoteNamespace(session, restNamespace))) { SchemaTableName schemaTableName = SchemaTableName.schemaTableName(restView.namespace().toString(), restView.name()); try { getView(session, schemaTableName).ifPresent(view -> views.put(schemaTableName, view)); @@ -536,7 +568,7 @@ public Optional getView(ConnectorSession session, Schem private Optional getIcebergView(ConnectorSession session, SchemaTableName viewName) { try { - return Optional.of(restSessionCatalog.loadView(convert(session), toIdentifier(viewName))); + return Optional.of(restSessionCatalog.loadView(convert(session), toRemoteView(session, viewName))); } catch (NoSuchViewException e) { return Optional.empty(); @@ -709,4 +741,100 @@ private List listNamespaces(ConnectorSession session, Optional findRemoteTable(session, tableIdentifier)); + } + catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + return tableIdentifier; + } + + private TableIdentifier findRemoteTable(ConnectorSession session, TableIdentifier tableIdentifier) + { + Namespace remoteNamespace = toRemoteNamespace(session, tableIdentifier.namespace()); + List tableIdentifiers = restSessionCatalog.listTables(convert(session), remoteNamespace); + TableIdentifier matchingTable = null; + for (TableIdentifier identifier : tableIdentifiers) { + if (identifier.name().equalsIgnoreCase(tableIdentifier.name())) { + if (matchingTable != null) { + throw new TrinoException(NOT_SUPPORTED, "Duplicate table name is not supported with Iceberg REST catalog: " + Joiner.on(", ").join(matchingTable, identifier.name())); + } + matchingTable = identifier; + } + } + return matchingTable == null ? TableIdentifier.of(remoteNamespace, tableIdentifier.name()) : matchingTable; + } + + private TableIdentifier toRemoteView(ConnectorSession session, SchemaTableName schemaViewName) + { + TableIdentifier tableIdentifier = toIdentifier(schemaViewName); + if (caseInsensitiveNameMatching) { + try { + return remoteViewMappingCache.get(tableIdentifier, () -> findRemoteView(session, tableIdentifier)); + } + catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + return tableIdentifier; + } + + private TableIdentifier findRemoteView(ConnectorSession session, TableIdentifier tableIdentifier) + { + Namespace remoteNamespace = toRemoteNamespace(session, tableIdentifier.namespace()); + List tableIdentifiers = restSessionCatalog.listViews(convert(session), remoteNamespace); + TableIdentifier matchingView = null; + for (TableIdentifier identifier : tableIdentifiers) { + if (identifier.name().equalsIgnoreCase(tableIdentifier.name())) { + if (matchingView != null) { + throw new TrinoException(NOT_SUPPORTED, "Duplicate view name is not supported with Iceberg REST catalog: " + + Joiner.on(", ").join(matchingView.name(), identifier.name())); + } + matchingView = identifier; + } + } + return matchingView == null ? TableIdentifier.of(remoteNamespace, tableIdentifier.name()) : matchingView; + } + + private Namespace toRemoteNamespace(ConnectorSession session, Namespace trinoNamespace) + { + if (caseInsensitiveNameMatching) { + try { + return remoteNamespaceMappingCache.get(trinoNamespace, () -> findRemoteNamespace(session, trinoNamespace)); + } + catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + return trinoNamespace; + } + + private Namespace findRemoteNamespace(ConnectorSession session, Namespace trinoNamespace) + { + List matchingRemoteNamespaces = listNamespaces(session, Namespace.empty()).stream() + .filter(ns -> toTrinoNamespace(ns).equals(trinoNamespace)) + .collect(toImmutableList()); + if (matchingRemoteNamespaces.size() > 1) { + throw new TrinoException(NOT_SUPPORTED, "Duplicate namespace name is not supported with Iceberg REST catalog"); + } + return matchingRemoteNamespaces.isEmpty() ? trinoNamespace : matchingRemoteNamespaces.getFirst(); + } + + private List listNamespaces(ConnectorSession session, Namespace parentNamespace) + { + List childNamespaces = restSessionCatalog.listNamespaces(convert(session), parentNamespace); + return childNamespaces.stream().flatMap(childNamespace -> Stream.concat(Stream.of(childNamespace), listNamespaces(session, childNamespace).stream())).toList(); + } + + private static Namespace toTrinoNamespace(Namespace namespace) + { + return Namespace.of(Arrays.stream(namespace.levels()).map(level -> level.toLowerCase(ENGLISH)).toArray(String[]::new)); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogCaseInsensitiveMapping.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogCaseInsensitiveMapping.java new file mode 100644 index 00000000000000..beec5043d841a9 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogCaseInsensitiveMapping.java @@ -0,0 +1,299 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SessionCatalog.SessionContext; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.HTTPClient; +import org.apache.iceberg.rest.RESTSessionCatalog; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.view.ViewBuilder; +import org.assertj.core.util.Files; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; +import static io.trino.plugin.iceberg.catalog.rest.TestingPolarisCatalog.WAREHOUSE; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static java.lang.String.format; +import static java.nio.file.Files.createDirectories; +import static java.util.Locale.ENGLISH; +import static org.apache.iceberg.CatalogProperties.WAREHOUSE_LOCATION; +import static org.apache.iceberg.rest.auth.OAuth2Properties.CREDENTIAL; +import static org.apache.iceberg.rest.auth.OAuth2Properties.SCOPE; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +final class TestIcebergPolarisCatalogCaseInsensitiveMapping + extends AbstractTestQueryFramework +{ + private static final SessionContext SESSION_CONTEXT = new SessionContext("dummy", null, null, ImmutableMap.of(), null); + private static final String NAMESPACE_LEVEL1 = "LeVeL1_" + randomNameSuffix(); + private static final String NAMESPACE_LEVEL2 = "LeVeL2_" + randomNameSuffix(); + private static final String NAMESPACE_LEVEL3 = "LeVeL3" + randomNameSuffix(); + private static final String LOWERCASE_NAMESPACE_LEVEL1 = NAMESPACE_LEVEL1.toLowerCase(ENGLISH); + private static final String ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA = "%s.%s.%s".formatted(LOWERCASE_NAMESPACE_LEVEL1, NAMESPACE_LEVEL2.toLowerCase(ENGLISH), NAMESPACE_LEVEL3.toLowerCase(ENGLISH)); + private static final Namespace ICEBERG_ABSOLUTE_NAMESPACE_LEVEL3 = Namespace.of(NAMESPACE_LEVEL1, NAMESPACE_LEVEL2, NAMESPACE_LEVEL3); + + private RESTSessionCatalog icebergCatalog; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + File warehouseLocation = Files.newTemporaryFolder(); + TestingPolarisCatalog polarisCatalog = closeAfterClass(new TestingPolarisCatalog(warehouseLocation.getPath())); + + Map properties = ImmutableMap.builder() + .put(CatalogProperties.URI, polarisCatalog.restUri() + "/api/catalog") + .put(WAREHOUSE_LOCATION, WAREHOUSE) + .put(CREDENTIAL, polarisCatalog.oauth2Credentials()) + .put(SCOPE, "PRINCIPAL_ROLE:ALL") + .buildOrThrow(); + + RESTSessionCatalog icebergCatalogInstance = new RESTSessionCatalog( + config -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build(), null); + icebergCatalogInstance.initialize("test_catalog", properties); + + icebergCatalog = icebergCatalogInstance; + closeAfterClass(icebergCatalog); + + return IcebergQueryRunner.builder(ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA) + .setBaseDataDir(Optional.of(warehouseLocation.toPath())) + .addIcebergProperty("iceberg.catalog.type", "rest") + .addIcebergProperty("iceberg.rest-catalog.uri", polarisCatalog.restUri() + "/api/catalog") + .addIcebergProperty("iceberg.rest-catalog.warehouse", WAREHOUSE) + .addIcebergProperty("iceberg.rest-catalog.security", "OAUTH2") + .addIcebergProperty("iceberg.rest-catalog.oauth2.credential", polarisCatalog.oauth2Credentials()) + .addIcebergProperty("iceberg.rest-catalog.oauth2.scope", "PRINCIPAL_ROLE:ALL") + .addIcebergProperty("iceberg.register-table-procedure.enabled", "true") + .addIcebergProperty("iceberg.rest-catalog.case-insensitive-name-matching", "true") + .build(); + } + + @BeforeAll + void setup() + { + icebergCatalog.createNamespace(SESSION_CONTEXT, Namespace.of(NAMESPACE_LEVEL1)); + icebergCatalog.createNamespace(SESSION_CONTEXT, Namespace.of(NAMESPACE_LEVEL1, NAMESPACE_LEVEL2)); + icebergCatalog.createNamespace(SESSION_CONTEXT, ICEBERG_ABSOLUTE_NAMESPACE_LEVEL3); + + List namespaces = icebergCatalog.listNamespaces(SESSION_CONTEXT, Namespace.of(NAMESPACE_LEVEL1, NAMESPACE_LEVEL2)); + assertThat(getOnlyElement(namespaces)).isEqualTo(ICEBERG_ABSOLUTE_NAMESPACE_LEVEL3); + + String absoluteLowercaseLevel2Namespace = "%s.%s".formatted(LOWERCASE_NAMESPACE_LEVEL1, NAMESPACE_LEVEL2.toLowerCase(ENGLISH)); + assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()) + .containsExactlyInAnyOrder( + "information_schema", + "tpch", + LOWERCASE_NAMESPACE_LEVEL1, + "%s".formatted(absoluteLowercaseLevel2Namespace), + "%s".formatted(ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA)); + + assertThat(computeActual("SHOW SCHEMAS LIKE 'level%'").getOnlyColumnAsSet()) + .containsExactlyInAnyOrder( + LOWERCASE_NAMESPACE_LEVEL1, + "%s".formatted(absoluteLowercaseLevel2Namespace), + "%s".formatted(ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA)); + + assertQuery("SELECT * FROM information_schema.schemata", + """ + VALUES + ('iceberg', 'information_schema'), + ('iceberg', '%s'), + ('iceberg', '%s'), + ('iceberg', '%s'), + ('iceberg', 'tpch') + """.formatted(LOWERCASE_NAMESPACE_LEVEL1, absoluteLowercaseLevel2Namespace, ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA)); + } + + @Test + void testCaseInsensitiveMatchingForTable() + { + Map level3NamespaceMetadata = icebergCatalog.loadNamespaceMetadata(SESSION_CONTEXT, ICEBERG_ABSOLUTE_NAMESPACE_LEVEL3); + String level3NamespaceLocation = level3NamespaceMetadata.get(LOCATION_PROPERTY); + + // Create and query a mixed case letter table from Trino + String tableName1 = "MiXed_CaSe_TaBlE1_" + randomNameSuffix(); + String lowercaseTableName1 = tableName1.toLowerCase(ENGLISH); + String table1Location = level3NamespaceLocation + "/" + lowercaseTableName1; + assertUpdate("CREATE TABLE " + tableName1 + " WITH (location = '" + table1Location + "') AS SELECT BIGINT '42' a, DOUBLE '-38.5' b", 1); + assertQuery("SELECT * FROM " + tableName1, "VALUES (42, -38.5)"); + + // Create a mixed case letter table directly using rest catalog and query from Trino + String tableName2 = "mIxEd_cAsE_tAbLe2_" + randomNameSuffix(); + String lowercaseTableName2 = tableName2.toLowerCase(ENGLISH); + String table2Location = level3NamespaceLocation + "/" + lowercaseTableName2; + createDir(table2Location); + createDir(table2Location + "/data"); + createDir(table2Location + "/metadata"); + icebergCatalog + .buildTable(SESSION_CONTEXT, TableIdentifier.of(ICEBERG_ABSOLUTE_NAMESPACE_LEVEL3, tableName2), new Schema(required(1, "x", Types.LongType.get()))) + .withLocation(table2Location) + .createTransaction() + .commitTransaction(); + assertUpdate("INSERT INTO " + tableName2 + " VALUES (78)", 1); + assertQuery("SELECT * FROM " + tableName2, "VALUES (78)"); + + // Test register/unregister table. Re-register for further testing. + assertThat(icebergCatalog.dropTable(SESSION_CONTEXT, TableIdentifier.of(ICEBERG_ABSOLUTE_NAMESPACE_LEVEL3, lowercaseTableName1))).isTrue(); + assertQueryFails("SELECT * FROM " + tableName1, ".*'iceberg.\"%s\".%s' does not exist".formatted(ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA, lowercaseTableName1)); + assertUpdate("CALL system.register_table (CURRENT_SCHEMA, '" + tableName1 + "', '" + table1Location + "')"); + assertQuery("SELECT * FROM " + tableName1, "VALUES (42, -38.5)"); + assertUpdate("CALL system.unregister_table (CURRENT_SCHEMA, '" + tableName1 + "')"); + assertQueryFails("SELECT * FROM " + tableName1, ".*'iceberg.\"%s\".%s' does not exist".formatted(ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA, lowercaseTableName1)); + assertUpdate("CALL system.register_table (CURRENT_SCHEMA, '" + tableName1 + "', '" + table1Location + "')"); + + // Query information_schema and list objects + assertThat(computeActual("SHOW TABLES IN \"" + ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA + "\"").getOnlyColumnAsSet()).contains(lowercaseTableName1, lowercaseTableName2); + assertThat(computeActual("SHOW TABLES IN \"" + ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA + "\" LIKE 'mixed_case_table%'").getOnlyColumnAsSet()).isEqualTo(Set.of(lowercaseTableName1, lowercaseTableName2)); + assertQuery("SELECT * FROM information_schema.tables WHERE table_schema != 'information_schema' AND table_type = 'BASE TABLE'", + """ + VALUES + ('iceberg', '%1$s', '%2$s', 'BASE TABLE'), + ('iceberg', '%1$s', '%3$s', 'BASE TABLE') + """.formatted(ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA, lowercaseTableName1, lowercaseTableName2)); + + // Add table comment + assertUpdate("COMMENT ON TABLE " + tableName1 + " IS 'test comment' "); + assertThat(getTableComment(lowercaseTableName1)).isEqualTo("test comment"); + + // Add table column comment + assertUpdate("COMMENT ON COLUMN " + tableName1 + ".a IS 'test column comment'"); + assertThat(getColumnComment(lowercaseTableName1, "a")).isEqualTo("test column comment"); + + // Rename table + String renamedTableName1 = tableName1 + "_renamed"; + assertUpdate("ALTER TABLE " + lowercaseTableName1 + " RENAME TO " + renamedTableName1); + assertQueryFails("SELECT * FROM " + tableName1, ".*'iceberg.\"%s\".%s' does not exist".formatted(ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA, lowercaseTableName1)); + assertQuery("SELECT * FROM " + renamedTableName1, "VALUES (42, -38.5)"); + + // Drop tables + assertUpdate("DROP TABLE " + renamedTableName1); + assertUpdate("DROP TABLE " + tableName2); + + // Query dropped tables + assertQueryFails("SELECT * FROM " + renamedTableName1, ".*'iceberg.\"%s\".%s' does not exist".formatted(ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA, renamedTableName1.toLowerCase(ENGLISH))); + assertQueryFails("SELECT * FROM " + tableName2, ".*'iceberg.\"%s\".%s' does not exist".formatted(ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA, lowercaseTableName2)); + } + + @Test + void testCaseInsensitiveMatchingForView() + { + Map level3NamespaceMetadata = icebergCatalog.loadNamespaceMetadata(SESSION_CONTEXT, ICEBERG_ABSOLUTE_NAMESPACE_LEVEL3); + String level3NamespaceLocation = level3NamespaceMetadata.get(LOCATION_PROPERTY); + + // Create and query a mixed case letter view from Trino + String viewName1 = "MiXed_CaSe_vIeW_1" + randomNameSuffix(); + String lowercaseViewName1 = viewName1.toLowerCase(ENGLISH); + assertUpdate("CREATE VIEW " + viewName1 + " AS SELECT BIGINT '25' a, DOUBLE '99.4' b"); + assertQuery("SELECT * FROM " + viewName1, "VALUES (25, 99.4)"); + + // Create a mixed case letter view directly using rest catalog and query from Trino + String viewName2 = "mIxEd_cAsE_ViEw_2" + randomNameSuffix(); + String lowercaseViewName2 = viewName2.toLowerCase(ENGLISH); + String view2Location = level3NamespaceLocation + "/" + lowercaseViewName2; + createDir(view2Location); + createDir(view2Location + "/data"); + createDir(view2Location + "/metadata"); + ViewBuilder viewBuilder = icebergCatalog.buildView(SESSION_CONTEXT, TableIdentifier.of(ICEBERG_ABSOLUTE_NAMESPACE_LEVEL3, viewName2)); + viewBuilder + .withQuery("trino", "SELECT BIGINT '34' y") + .withSchema(new Schema(required(1, "y", Types.LongType.get()))) + .withDefaultNamespace(ICEBERG_ABSOLUTE_NAMESPACE_LEVEL3) + .withLocation(view2Location) + .createOrReplace(); + assertQuery("SELECT * FROM " + viewName2, "VALUES (34)"); + + // Query information_schema and list objects + assertThat(computeActual("SHOW TABLES IN \"" + ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA + "\"").getOnlyColumnAsSet()).contains(lowercaseViewName1, lowercaseViewName2); + assertThat(computeActual("SHOW TABLES IN \"" + ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA + "\" LIKE 'mixed_case_view%'").getOnlyColumnAsSet()).contains(lowercaseViewName1, lowercaseViewName2); + assertQuery("SELECT * FROM information_schema.tables WHERE table_schema != 'information_schema' AND table_type = 'VIEW'", + """ + VALUES + ('iceberg', '%1$s', '%2$s', 'VIEW'), + ('iceberg', '%1$s', '%3$s', 'VIEW') + """.formatted(ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA, lowercaseViewName1, lowercaseViewName2)); + + // Add view comment + assertUpdate("COMMENT ON VIEW " + viewName1 + " IS 'test comment' "); + assertThat(getTableComment(lowercaseViewName1)).isEqualTo("test comment"); + + // Add view column comment + assertUpdate("COMMENT ON COLUMN " + viewName1 + ".a IS 'test column comment'"); + assertThat(getColumnComment(lowercaseViewName1, "a")).isEqualTo("test column comment"); + + // Rename view + String renamedViewName1 = viewName1 + "_renamed"; + assertUpdate("ALTER VIEW " + lowercaseViewName1 + " RENAME TO " + renamedViewName1); + assertQueryFails("SELECT * FROM " + viewName1, ".*'iceberg.\"%s\".%s' does not exist".formatted(ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA, lowercaseViewName1)); + assertQuery("SELECT * FROM " + renamedViewName1, "VALUES (25, 99.4)"); + + // Drop views + assertUpdate("DROP VIEW " + renamedViewName1); + assertUpdate("DROP VIEW " + viewName2); + + // Query dropped views + assertQueryFails("SELECT * FROM " + renamedViewName1, ".*'iceberg.\"%s\".%s' does not exist".formatted(ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA, renamedViewName1.toLowerCase(ENGLISH))); + assertQueryFails("SELECT * FROM " + viewName2, ".*'iceberg.\"%s\".%s' does not exist".formatted(ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA, lowercaseViewName2)); + } + + private String getTableComment(String tableName) + { + String sql = format("SELECT comment FROM system.metadata.table_comments WHERE schema_name = '%s' AND table_name = '%s'", ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA, tableName); + return (String) computeScalar(sql); + } + + private String getColumnComment(String tableName, String columnName) + { + return (String) computeScalar(format( + "SELECT comment FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND column_name = '%s'", + ABSOLUTE_LOWERCASE_LEVEL3_SCHEMA, + tableName, + columnName)); + } + + private static void createDir(String absoluteDirPath) + { + Path path = Paths.get(URI.create(absoluteDirPath).getPath()); + try { + createDirectories(path); + } + catch (IOException e) { + throw new UncheckedIOException("Cannot create %s directory".formatted(absoluteDirPath), e); + } + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java index 82132f3b214895..af52a43f4026e6 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg.catalog.rest; import com.google.common.collect.ImmutableMap; +import io.airlift.units.Duration; import org.junit.jupiter.api.Test; import java.util.Map; @@ -21,6 +22,7 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static java.util.concurrent.TimeUnit.MINUTES; public class TestIcebergRestCatalogConfig { @@ -34,7 +36,9 @@ public void testDefaults() .setParentNamespace(null) .setSessionType(IcebergRestCatalogConfig.SessionType.NONE) .setSecurity(IcebergRestCatalogConfig.Security.NONE) - .setVendedCredentialsEnabled(false)); + .setVendedCredentialsEnabled(false) + .setCaseInsensitiveNameMatching(false) + .setCaseInsensitiveNameMatchingCacheTtl(new Duration(1, MINUTES))); } @Test @@ -48,6 +52,8 @@ public void testExplicitPropertyMappings() .put("iceberg.rest-catalog.security", "OAUTH2") .put("iceberg.rest-catalog.session", "USER") .put("iceberg.rest-catalog.vended-credentials-enabled", "true") + .put("iceberg.rest-catalog.case-insensitive-name-matching", "true") + .put("iceberg.rest-catalog.case-insensitive-name-matching.cache-ttl", "3m") .buildOrThrow(); IcebergRestCatalogConfig expected = new IcebergRestCatalogConfig() @@ -57,7 +63,9 @@ public void testExplicitPropertyMappings() .setParentNamespace("main") .setSessionType(IcebergRestCatalogConfig.SessionType.USER) .setSecurity(IcebergRestCatalogConfig.Security.OAUTH2) - .setVendedCredentialsEnabled(true); + .setVendedCredentialsEnabled(true) + .setCaseInsensitiveNameMatching(true) + .setCaseInsensitiveNameMatchingCacheTtl(new Duration(3, MINUTES)); assertFullMapping(properties, expected); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java index cfe8c363f2c2c8..389cc43a7d555b 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.log.Logger; +import io.trino.cache.EvictableCacheBuilder; import io.trino.metastore.TableInfo; import io.trino.plugin.hive.NodeVersion; import io.trino.plugin.iceberg.CommitTaskData; @@ -53,6 +54,7 @@ import static io.trino.testing.TestingConnectorSession.SESSION; import static io.trino.testing.TestingNames.randomNameSuffix; import static java.util.Locale.ENGLISH; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -88,7 +90,11 @@ private static TrinoRestCatalog createTrinoRestCatalog(boolean useUniqueTableLoc Namespace.empty(), "test", new TestingTypeManager(), - useUniqueTableLocations); + useUniqueTableLocations, + false, + EvictableCacheBuilder.newBuilder().expireAfterWrite(1, MILLISECONDS).shareNothingWhenDisabled().build(), + EvictableCacheBuilder.newBuilder().expireAfterWrite(1, MILLISECONDS).shareNothingWhenDisabled().build(), + EvictableCacheBuilder.newBuilder().expireAfterWrite(1, MILLISECONDS).shareNothingWhenDisabled().build()); } @Test