Skip to content

Commit

Permalink
Move nested namespace support in REST catalog behind a config
Browse files Browse the repository at this point in the history
  • Loading branch information
mayankvadariya committed Nov 6, 2024
1 parent b00dc00 commit e53faa1
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 17 deletions.
3 changes: 3 additions & 0 deletions docs/src/main/sphinx/object-storage/metastores.md
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,9 @@ following properties:
* - `iceberg.rest-catalog.vended-credentials-enabled`
- Use credentials provided by the REST backend for file system access.
Defaults to `false`.
* - `iceberg.rest-catalog.nested-namespace-enabled`
- Support querying objects under nested namespace.
Defaults to `false`.
:::

The following example shows a minimal catalog configuration using an Iceberg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public enum SessionType
private Security security = Security.NONE;
private SessionType sessionType = SessionType.NONE;
private boolean vendedCredentialsEnabled;
private boolean nestedNamespaceEnabled;

@NotNull
public URI getBaseUri()
Expand Down Expand Up @@ -138,4 +139,17 @@ public IcebergRestCatalogConfig setVendedCredentialsEnabled(boolean vendedCreden
this.vendedCredentialsEnabled = vendedCredentialsEnabled;
return this;
}

public boolean isNestedNamespaceEnabled()
{
return nestedNamespaceEnabled;
}

@Config("iceberg.rest-catalog.nested-namespace-enabled")
@ConfigDescription("Support querying objects under nested namespace")
public IcebergRestCatalogConfig setNestedNamespaceEnabled(boolean nestedNamespaceEnabled)
{
this.nestedNamespaceEnabled = nestedNamespaceEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class TrinoIcebergRestCatalogFactory
private final SecurityProperties securityProperties;
private final boolean uniqueTableLocation;
private final TypeManager typeManager;
private final boolean nestedNamespaceEnabled;

@GuardedBy("this")
private RESTSessionCatalog icebergCatalog;
Expand Down Expand Up @@ -84,6 +85,7 @@ public TrinoIcebergRestCatalogFactory(
requireNonNull(icebergConfig, "icebergConfig is null");
this.uniqueTableLocation = icebergConfig.isUniqueTableLocation();
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.nestedNamespaceEnabled = restConfig.isNestedNamespaceEnabled();
}

@Override
Expand Down Expand Up @@ -128,6 +130,7 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity)
parentNamespace,
trinoVersion,
typeManager,
uniqueTableLocation);
uniqueTableLocation,
nestedNamespaceEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class TrinoRestCatalog
private final Namespace parentNamespace;
private final String trinoVersion;
private final boolean useUniqueTableLocation;
private final boolean nestedNamespaceEnabled;

private final Cache<SchemaTableName, Table> tableCache = EvictableCacheBuilder.newBuilder()
.maximumSize(PER_QUERY_CACHE_SIZE)
Expand All @@ -127,7 +128,8 @@ public TrinoRestCatalog(
Namespace parentNamespace,
String trinoVersion,
TypeManager typeManager,
boolean useUniqueTableLocation)
boolean useUniqueTableLocation,
boolean nestedNamespaceEnabled)
{
this.restSessionCatalog = requireNonNull(restSessionCatalog, "restSessionCatalog is null");
this.catalogName = requireNonNull(catalogName, "catalogName is null");
Expand All @@ -137,6 +139,7 @@ public TrinoRestCatalog(
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.useUniqueTableLocation = useUniqueTableLocation;
this.nestedNamespaceEnabled = nestedNamespaceEnabled;
}

@Override
Expand All @@ -154,7 +157,12 @@ public boolean namespaceExists(ConnectorSession session, String namespace)
@Override
public List<String> listNamespaces(ConnectorSession session)
{
return collectNamespaces(session, parentNamespace);
if (nestedNamespaceEnabled) {
return collectNamespaces(session, parentNamespace);
}
return restSessionCatalog.listNamespaces(convert(session), parentNamespace).stream()
.map(this::toSchemaName)
.collect(toImmutableList());
}

private List<String> collectNamespaces(ConnectorSession session, Namespace parentNamespace)
Expand Down Expand Up @@ -679,6 +687,9 @@ private void invalidateTableCache(SchemaTableName schemaTableName)

private Namespace toNamespace(String schemaName)
{
if (!nestedNamespaceEnabled && schemaName.contains(NAMESPACE_SEPARATOR)) {
throw new TrinoException(NOT_SUPPORTED, "Nested namespace is not enabled for this catalog");
}
if (!parentNamespace.isEmpty()) {
schemaName = parentNamespace + NAMESPACE_SEPARATOR + schemaName;
}
Expand All @@ -688,8 +699,14 @@ private Namespace toNamespace(String schemaName)
private String toSchemaName(Namespace namespace)
{
if (this.parentNamespace.isEmpty()) {
if (!nestedNamespaceEnabled && namespace.length() != 1) {
throw new TrinoException(NOT_SUPPORTED, "Nested namespace is not enabled for this catalog");
}
return namespace.toString();
}
if (!nestedNamespaceEnabled && ((namespace.length() - parentNamespace.length()) > 1)) {
throw new TrinoException(NOT_SUPPORTED, "Nested namespace is not enabled for this catalog");
}
return Arrays.stream(namespace.levels(), this.parentNamespace.length(), namespace.length())
.collect(joining(NAMESPACE_SEPARATOR));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public void testDefaults()
.setParentNamespace(null)
.setSessionType(IcebergRestCatalogConfig.SessionType.NONE)
.setSecurity(IcebergRestCatalogConfig.Security.NONE)
.setVendedCredentialsEnabled(false));
.setVendedCredentialsEnabled(false)
.setNestedNamespaceEnabled(false));
}

@Test
Expand All @@ -48,6 +49,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.rest-catalog.security", "OAUTH2")
.put("iceberg.rest-catalog.session", "USER")
.put("iceberg.rest-catalog.vended-credentials-enabled", "true")
.put("iceberg.rest-catalog.nested-namespace-enabled", "true")
.buildOrThrow();

IcebergRestCatalogConfig expected = new IcebergRestCatalogConfig()
Expand All @@ -57,7 +59,8 @@ public void testExplicitPropertyMappings()
.setParentNamespace("main")
.setSessionType(IcebergRestCatalogConfig.SessionType.USER)
.setSecurity(IcebergRestCatalogConfig.Security.OAUTH2)
.setVendedCredentialsEnabled(true);
.setVendedCredentialsEnabled(true)
.setNestedNamespaceEnabled(true);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
*/
package io.trino.plugin.iceberg.catalog.rest;

import com.google.common.collect.ImmutableMap;
import io.airlift.http.server.testing.TestingHttpServer;
import io.trino.filesystem.Location;
import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest;
import io.trino.plugin.iceberg.IcebergConfig;
import io.trino.plugin.iceberg.IcebergQueryRunner;
import io.trino.plugin.iceberg.SchemaInitializer;
import io.trino.plugin.iceberg.TestingIcebergPlugin;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;
import org.apache.iceberg.BaseTable;
Expand All @@ -32,14 +35,18 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.airlift.testing.Closeables.closeAllSuppress;
import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting;
import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting;
import static io.trino.plugin.iceberg.catalog.rest.RestCatalogTestUtils.backendCatalog;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static org.apache.iceberg.FileFormat.PARQUET;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -48,6 +55,9 @@
final class TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest
extends BaseIcebergConnectorSmokeTest
{
private static final String NESTED_NAMESPACE_ENABLED = ICEBERG_CATALOG;
private static final String NESTED_NAMESPACE_DISABLED = "nested_namespace_disabled";

private File warehouseLocation;
private JdbcCatalog backend;

Expand Down Expand Up @@ -82,18 +92,85 @@ protected QueryRunner createQueryRunner()
testServer.start();
closeAfterClass(testServer::stop);

return IcebergQueryRunner.builder("level_1.level_2")
.setBaseDataDir(Optional.of(warehouseLocation.toPath()))
.addIcebergProperty("iceberg.file-format", format.name())
.addIcebergProperty("iceberg.catalog.type", "rest")
.addIcebergProperty("iceberg.rest-catalog.uri", testServer.getBaseUrl().toString())
.addIcebergProperty("iceberg.register-table-procedure.enabled", "true")
.addIcebergProperty("iceberg.writer-sort-buffer-size", "1MB")
.setSchemaInitializer(SchemaInitializer.builder()
.withSchemaName("level_1.level_2")
.withClonedTpchTables(REQUIRED_TPCH_TABLES)
String nestedSchema = "level_1.level_2";
QueryRunner queryRunner = DistributedQueryRunner.builder(testSessionBuilder()
.setCatalog(NESTED_NAMESPACE_ENABLED)
.setSchema(nestedSchema)
.build())
.setBaseDataDir(Optional.of(warehouseLocation.toPath()))
.build();

Map<String, String> nestedNamespaceDisabled = ImmutableMap.<String, String>builder()
.put("fs.hadoop.enabled", "true")
.put("iceberg.file-format", format.name())
.put("iceberg.catalog.type", "rest")
.put("iceberg.rest-catalog.uri", testServer.getBaseUrl().toString())
.put("iceberg.register-table-procedure.enabled", "true")
.put("iceberg.writer-sort-buffer-size", "1MB")
.buildOrThrow();

Map<String, String> nestedNamespaceEnabled = ImmutableMap.<String, String>builder()
.putAll(nestedNamespaceDisabled)
.put("iceberg.rest-catalog.nested-namespace-enabled", "true")
.buildOrThrow();
try {
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

Path dataDir = queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data");
queryRunner.installPlugin(new TestingIcebergPlugin(dataDir));

queryRunner.createCatalog(NESTED_NAMESPACE_ENABLED, "iceberg", nestedNamespaceEnabled);
queryRunner.createCatalog(NESTED_NAMESPACE_DISABLED, "iceberg", nestedNamespaceDisabled);

SchemaInitializer.builder()
.withSchemaName(nestedSchema)
.withClonedTpchTables(REQUIRED_TPCH_TABLES)
.build()
.accept(queryRunner);

return queryRunner;
}
catch (Exception e) {
closeAllSuppress(e, queryRunner);
throw e;
}
}

@Test
void testNestedNamespace()
{
// Test create nested namespace
String createNestedSchemaQuery = "CREATE SCHEMA %s.\"level_1.level_2.level_3\"";
assertQueryFails(createNestedSchemaQuery.formatted(NESTED_NAMESPACE_DISABLED), "Nested namespace is not enabled for this catalog");
assertUpdate(createNestedSchemaQuery.formatted(NESTED_NAMESPACE_ENABLED));

// Test show nested namespace
String showSchemasQuery = "SHOW SCHEMAS FROM %s";
assertThat(query(showSchemasQuery.formatted(NESTED_NAMESPACE_ENABLED)))
.skippingTypesCheck()
.containsAll("VALUES ('level_1'), ('level_1.level_2'), ('level_1.level_2.level_3')");
assertThat(query(showSchemasQuery.formatted(NESTED_NAMESPACE_DISABLED)))
.skippingTypesCheck()
.containsAll("VALUES ('level_1')");

// Test create and querying a table under nested namespace
assertUpdate("CREATE TABLE " + NESTED_NAMESPACE_ENABLED + ".\"level_1.level_2.level_3\".table1 as SELECT 'value' col", 1);
String selectQuery1 = "SELECT * FROM %s.\"level_1.level_2.level_3\".table1";
assertThat(query(selectQuery1.formatted(NESTED_NAMESPACE_ENABLED)))
.skippingTypesCheck()
.matches("VALUES ('value')");
assertQueryFails(selectQuery1.formatted(NESTED_NAMESPACE_DISABLED), "Nested namespace is not enabled for this catalog");

// Test create and querying a table under un-nested namespace
assertUpdate("CREATE TABLE " + NESTED_NAMESPACE_ENABLED + ".\"level_1\".table1 as SELECT 'value' col", 1);
String selectQuery2 = "SELECT * FROM %s.\"level_1\".table1";
assertThat(query(selectQuery2.formatted(NESTED_NAMESPACE_ENABLED)))
.skippingTypesCheck()
.matches("VALUES ('value')");
assertThat(query(selectQuery2.formatted(NESTED_NAMESPACE_DISABLED)))
.skippingTypesCheck()
.matches("VALUES ('value')");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ private static TrinoRestCatalog createTrinoRestCatalog(boolean useUniqueTableLoc
Namespace.empty(),
"test",
new TestingTypeManager(),
useUniqueTableLocations);
useUniqueTableLocations,
false);
}

@Test
Expand Down

0 comments on commit e53faa1

Please sign in to comment.