Skip to content

Commit

Permalink
[apache#6133] Improvement(core): Supports get Fileset schema location…
Browse files Browse the repository at this point in the history
… in the AuthorizationUtils

Supports get Fileset schema location in the AuthorizationUtils.
  • Loading branch information
Abyss-lord committed Jan 13, 2025
1 parent 9815cc3 commit accfc59
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ public class Constants {

public static final String BUILTIN_LOCAL_FS_PROVIDER = "builtin-local";
public static final String BUILTIN_HDFS_FS_PROVIDER = "builtin-hdfs";
public static final String LOCATION = "location";
public static final String SLASH = "/";
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Entity;
import org.apache.gravitino.GravitinoEnv;
Expand All @@ -39,8 +41,10 @@
import org.apache.gravitino.Schema;
import org.apache.gravitino.catalog.CatalogManager;
import org.apache.gravitino.catalog.FilesetDispatcher;
import org.apache.gravitino.catalog.hadoop.Constants;
import org.apache.gravitino.catalog.hive.HiveConstants;
import org.apache.gravitino.connector.BaseCatalog;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
import org.apache.gravitino.dto.authorization.PrivilegeDTO;
import org.apache.gravitino.dto.util.DTOConverters;
Expand All @@ -51,10 +55,12 @@
import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
import org.apache.gravitino.exceptions.NoSuchUserException;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetCatalog;
import org.apache.gravitino.meta.RoleEntity;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.utils.MetadataObjectUtil;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.NamespaceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -465,30 +471,72 @@ public static List<String> getMetadataObjectLocation(
}
break;
case SCHEMA:
{
Catalog catalogObj =
GravitinoEnv.getInstance()
.catalogDispatcher()
.loadCatalog(
NameIdentifier.of(ident.namespace().level(0), ident.namespace().level(1)));
LOG.info("Catalog provider is %s", catalogObj.provider());
if (catalogObj.provider().equals("hive")) {
Schema schema = GravitinoEnv.getInstance().schemaDispatcher().loadSchema(ident);
if (schema.properties().containsKey(HiveConstants.LOCATION)) {
Catalog catalogObj =
GravitinoEnv.getInstance()
.catalogDispatcher()
.loadCatalog(
NameIdentifier.of(ident.namespace().level(0), ident.namespace().level(1)));
Schema schema = GravitinoEnv.getInstance().schemaDispatcher().loadSchema(ident);

switch (catalogObj.type()) {
case RELATIONAL:
LOG.info("Catalog provider is {}", catalogObj.provider());
if ("hive".equals(catalogObj.provider())
&& schema.properties().containsKey(HiveConstants.LOCATION)) {
String schemaLocation = schema.properties().get(HiveConstants.LOCATION);
if (schemaLocation != null && schemaLocation.isEmpty()) {
if (StringUtils.isNotBlank(schemaLocation)) {
locations.add(schemaLocation);
} else {
LOG.warn("Schema %s location is not found", ident);
LOG.warn("Schema {} location is not found", ident);
}
}
}
// TODO: [#6133] Supports get Fileset schema location in the AuthorizationUtils
break;

case FILESET:
if (catalogObj instanceof HasPropertyMetadata) {
HasPropertyMetadata catalogObjWithProperties = (HasPropertyMetadata) catalogObj;
Map<String, String> properties = schema.properties();
String schemaLocation =
(String)
catalogObjWithProperties
.schemaPropertiesMetadata()
.getOrDefault(properties, Constants.LOCATION);
if (StringUtils.isNotBlank(schemaLocation)) {
locations.add(normalizeFilesetLocation(schemaLocation));
}
} else {
FilesetCatalog filesetCatalog = catalogObj.asFilesetCatalog();
String catalogObjLocation = catalogObj.properties().get(Constants.LOCATION);
Namespace namespace = NamespaceUtil.toFileset(ident);
NameIdentifier[] nameIdentifiers = filesetCatalog.listFilesets(namespace);
if (nameIdentifiers.length == 0) {
LOG.warn(
"{} is empty, use catalog location {} as schema location.",
ident.toString(),
catalogObjLocation);
locations.add(catalogObjLocation);
} else {
Arrays.stream(nameIdentifiers)
.forEach(
nameIdentifier -> {
Fileset fileset = filesetCatalog.loadFileset(nameIdentifier);
String filesetLocation = fileset.storageLocation();
if (filesetLocation != null && !filesetLocation.isEmpty())
locations.add(filesetLocation);
});
}
}
break;

default:
LOG.warn("Unsupported catalog type {}", catalogObj.type());
break;
}
break;

case TABLE:
{
Catalog catalogObj =
catalogObj =
GravitinoEnv.getInstance()
.catalogDispatcher()
.loadCatalog(
Expand Down Expand Up @@ -531,4 +579,17 @@ private static NameIdentifier getObjectNameIdentifier(
String metalake, MetadataObject metadataObject) {
return NameIdentifier.parse(String.format("%s.%s", metalake, metadataObject.fullName()));
}

/**
* Normalize the fileset location to end with a slash.
*
* @param location the hadoop schema location
* @return if the location ends with a slash, return the location; otherwise, return the location
* with a slash at the end.
*/
public static String normalizeFilesetLocation(String location) {
Preconditions.checkArgument(StringUtils.isNotBlank(location), "Location is blank");
location = location.endsWith(Constants.SLASH) ? location : location + Constants.SLASH;
return location;
}
}
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.gravitino.utils;

import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import org.apache.gravitino.Entity;
Expand Down Expand Up @@ -107,6 +108,17 @@ public static Namespace ofFileset(String metalake, String catalog, String schema
return Namespace.of(metalake, catalog, schema);
}

/**
* Create a namespace for fileset from a schema name identifier.
*
* @param ident The schema name identifier.
* @return A namespace for fileset.
*/
public static Namespace toFileset(NameIdentifier ident) {
Preconditions.checkArgument(ident.namespace().length() == 2, "Invalid namespace length");
return ofFileset(ident.namespace().level(0), ident.namespace().level(1), ident.name());
}

/**
* Create a namespace for topic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,33 @@
*/
package org.apache.gravitino.utils;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.exceptions.IllegalNamespaceException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestNamespaceUtil {
private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
private final PrintStream originalOut = System.out;
private final PrintStream originalErr = System.err;

@BeforeEach
void setUp() {
System.setOut(new PrintStream(outContent));
System.setErr(new PrintStream(errContent));
}

@AfterEach
public void restoreStreams() {
System.setOut(originalOut);
System.setErr(originalErr);
}

@Test
public void testCheckNamespace() {
Expand Down Expand Up @@ -82,4 +103,23 @@ public void testCheckNamespace() {
Assertions.assertTrue(
excep5.getMessage().contains("Model version namespace must be non-null and have 4 levels"));
}

@Test
void testToFileset() {
NameIdentifier ident = NameIdentifier.of("metalake_demo", "catalog", "schema");
Namespace filesetNamespace = NamespaceUtil.toFileset(ident);
Assertions.assertEquals(3, filesetNamespace.levels().length);
Assertions.assertEquals("metalake_demo", filesetNamespace.level(0));
Assertions.assertEquals("catalog", filesetNamespace.level(1));
Assertions.assertEquals("schema", filesetNamespace.level(2));
}

@Test
void testToFilesetWithIncorrectLevel() {
NameIdentifier ident1 = NameIdentifier.of("metalake_demo", "catalog", "schema", "table");
Assertions.assertThrows(IllegalArgumentException.class, () -> NamespaceUtil.toFileset(ident1));

NameIdentifier ident2 = NameIdentifier.of("metalake_demo", "catalog");
Assertions.assertThrows(IllegalArgumentException.class, () -> NamespaceUtil.toFileset(ident2));
}
}

0 comments on commit accfc59

Please sign in to comment.