Skip to content

Commit

Permalink
Added method to validate specified path in storage api + other changes
Browse files Browse the repository at this point in the history
  • Loading branch information
vikrambohra committed Nov 26, 2024
1 parent 01ee894 commit ea7a771
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,15 @@ public interface Storage {
*/
String allocateTableLocation(
String databaseId, String tableId, String tableUUID, String tableCreator);

/**
* Checks if the provided path is a valid path for this storage type. It defaults to checking if
* the path starts with the endpoint (scheme) specified in cluster.yaml
*
* @param path path to a file/object
* @return true if endpoint is specified in cluster.yaml else false
*/
default boolean isPathValid(String path) {
return path.startsWith(getClient().getEndpoint());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,14 @@ public interface StorageClient<T> {
String getRootPrefix();

/**
* Checks if the path exists on the backend storage. Path is the absolute path to file which may
* or may not include scheme. Scheme is not specified in the path for local and hdfs storage. See:
* https://github.com/linkedin/openhouse/issues/121 Example: For Hdfs and local file system, the
* path would be /rootPath/db/table/file. For S3, the path would be s3://rootPath/db/table/file
* Checks if the path exists on the backend storage. Path is the absolute path to file/object
* which may or may not include scheme. Scheme is not specified in the path for local and hdfs
* storage. See: https://github.com/linkedin/openhouse/issues/121 Example: For Hdfs and local file
* system, the path would be /rootPath/db/table/file. For S3, the path would be
* s3://rootPath/db/table/file
*
* @param path path to a file
* @return true if path exists else false
*/
boolean fileExists(String path);
boolean exists(String path);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Map;
import java.util.Optional;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
Expand All @@ -17,6 +18,7 @@
* The StorageManager class is responsible for managing the storage types and providing the
* appropriate storage implementation based on the configuration.
*/
@Slf4j
@Component
public class StorageManager {

Expand Down Expand Up @@ -107,9 +109,11 @@ public Storage getStorage(StorageType.Type storageType) {
public Storage getStorageFromPath(String path) {
for (Storage storage : storages) {
if (storage.isConfigured()) {
if (StorageType.LOCAL.equals(storage.getType())) {
if (storage.isPathValid(path)) {
log.info("Resolved to {} storage for path {}", storage.getType().toString(), path);
return storage;
} else if (path.startsWith(storage.getClient().getEndpoint())) {
} else if (StorageType.LOCAL.equals(storage.getType())) {
log.info("Resolved to local storage for path {}", path);
return storage;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public StorageType.Type getStorageType() {
}

@Override
public boolean fileExists(String path) {
public boolean exists(String path) {
// TODO: Support pathExists on ADLS
throw new UnsupportedOperationException("Path existence check not implemented yet");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,18 @@ public String allocateTableLocation(
String databaseId, String tableId, String tableUUID, String tableCreator) {
return Paths.get(getClient().getRootPrefix(), databaseId, tableId + "-" + tableUUID).toString();
}

/**
* Checks if the provided path is a valid path for Hdfs storage type. It checks if the path starts
* with the endpoint (scheme) specified in cluster.yaml ir if no endpoint is specified. This
* method future proofs for when we start prefixing hdfs paths with endpoint (scheme) See:
* https://github.com/linkedin/openhouse/issues/121
*
* @param path path to a file/object
* @return true if endpoint is specified in cluster.yaml or no endpoint else false
*/
@Override
public boolean isPathValid(String path) {
return (super.isPathValid(path) || path.startsWith("/"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.exceptions.ServiceUnavailableException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -59,11 +60,12 @@ public StorageType.Type getStorageType() {
* @return true if path exists else false
*/
@Override
public boolean fileExists(String path) {
public boolean exists(String path) {
try {
return fs.exists(new Path(path));
} catch (IOException e) {
throw new RuntimeException("Exception checking path existence " + e.getMessage(), e);
throw new ServiceUnavailableException(
"Exception checking path existence " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,18 @@ public String allocateTableLocation(
String databaseId, String tableId, String tableUUID, String tableCreator) {
return Paths.get(getClient().getRootPrefix(), databaseId, tableId + "-" + tableUUID).toString();
}

/**
* Checks if the provided path is a valid path for Local storage type. It checks if the path
* starts with the endpoint (scheme) specified in cluster.yaml ir if no endpoint is specified.
* This method future proofs for when we start prefixing local paths with endpoint (scheme) See:
* https://github.com/linkedin/openhouse/issues/121
*
* @param path path to a file/object
* @return true if endpoint is specified in cluster.yaml or no endpoint else false
*/
@Override
public boolean isPathValid(String path) {
return (super.isPathValid(path) || path.startsWith("/"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.exceptions.ServiceUnavailableException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -141,11 +142,12 @@ public String getRootPrefix() {
* @return true if path exists else false
*/
@Override
public boolean fileExists(String path) {
public boolean exists(String path) {
try {
return fs.exists(new Path(path));
} catch (IOException e) {
throw new RuntimeException("Exception checking path existence " + e.getMessage(), e);
throw new ServiceUnavailableException(
"Exception checking path existence " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.iceberg.aws.AwsClientFactories;
import org.apache.iceberg.aws.AwsClientFactory;
import org.apache.iceberg.exceptions.ServiceUnavailableException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -70,7 +71,7 @@ public StorageType.Type getStorageType() {
* @return true if path exists else false
*/
@Override
public boolean fileExists(String path) {
public boolean exists(String path) {
Preconditions.checkArgument(
path.startsWith(getEndpoint()), String.format("Invalid S3 URL format %s", path));
try {
Expand All @@ -92,7 +93,8 @@ public boolean fileExists(String path) {
// Object does not exist
return false;
} catch (URISyntaxException | S3Exception e) {
throw new RuntimeException("Error checking S3 object existence: " + e.getMessage(), e);
throw new ServiceUnavailableException(
"Error checking S3 object existence: " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static com.linkedin.openhouse.internal.catalog.InternalCatalogMetricsConstant.METRICS_PREFIX;

import com.linkedin.openhouse.cluster.metrics.micrometer.MetricsReporter;
import com.linkedin.openhouse.cluster.storage.Storage;
import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.cluster.storage.StorageType;
import com.linkedin.openhouse.cluster.storage.selector.StorageSelector;
Expand Down Expand Up @@ -138,11 +137,7 @@ public void renameTable(TableIdentifier from, TableIdentifier to) {
* @param tableIdentifier
* @return fileIO
*/
public FileIO resolveFileIO(TableIdentifier tableIdentifier) {
return fileIOManager.getFileIO(resolveStorage(tableIdentifier).getType());
}

public Storage resolveStorage(TableIdentifier tableIdentifier) {
protected FileIO resolveFileIO(TableIdentifier tableIdentifier) {
Optional<HouseTable> houseTable = Optional.empty();
try {
houseTable =
Expand All @@ -157,9 +152,13 @@ public Storage resolveStorage(TableIdentifier tableIdentifier) {
tableIdentifier.namespace().toString(),
tableIdentifier.name());
}
return houseTable.isPresent()
? storageManager.getStorage(storageType.fromString(houseTable.get().getStorageType()))
: storageSelector.selectStorage(
tableIdentifier.namespace().toString(), tableIdentifier.name());
StorageType.Type type =
houseTable.isPresent()
? storageType.fromString(houseTable.get().getStorageType())
: storageSelector
.selectStorage(tableIdentifier.namespace().toString(), tableIdentifier.name())
.getType();

return fileIOManager.getFileIO(type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,8 @@ private void validatePathOfProvidedRequest(
String tableLocation = extractFromTblPropsIfExists(tableURI, tableProperties, TBL_LOC_RAW_KEY);
Storage storage = storageManager.getStorageFromPath(tableLocation);

if (TableType.REPLICA_TABLE != tableType && !storage.getClient().fileExists(tableLocation)) {
log.error(
"Previous tableLocation: {} doesn't exist",
storage.allocateTableLocation(databaseId, tableId, tableUUIDProperty, ""));
if (TableType.REPLICA_TABLE != tableType && !storage.getClient().exists(tableLocation)) {
log.error("Previous tableLocation: {} doesn't exist", tableLocation);
throw new RequestValidationFailureException(
String.format("Provided snapshot is invalid for %s.%s", dbIdFromProps, tblIdFromProps));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.common.collect.ImmutableList;
import com.jayway.jsonpath.JsonPath;
import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.cluster.storage.local.LocalStorage;
import com.linkedin.openhouse.common.test.cluster.PropertyOverrideContextInitializer;
import com.linkedin.openhouse.tables.api.spec.v0.request.CreateUpdateTableRequestBody;
import com.linkedin.openhouse.tables.api.spec.v0.request.IcebergSnapshotsRequestBody;
Expand Down Expand Up @@ -63,6 +64,8 @@ public class SnapshotsControllerTest {

@Autowired StorageManager storageManager;

@Autowired LocalStorage localStorage;

/** For now starting with a naive object feeder. */
private static Stream<GetTableResponseBody> responseBodyFeeder() {
return Stream.of(GET_TABLE_RESPONSE_BODY);
Expand Down Expand Up @@ -247,7 +250,10 @@ public void testPutSnapshotsReplicaTableType(GetTableResponseBody getTableRespon
"openhouse.tableLocation",
String.format(
"%s/%s/%s-%s/metadata.json",
"/tmp", getTableResponseBody.getDatabaseId(), getTableResponseBody.getTableId(), uuid));
localStorage.getClient().getRootPrefix(),
getTableResponseBody.getDatabaseId(),
getTableResponseBody.getTableId(),
uuid));

MvcResult createResult =
RequestAndValidateHelper.createTableAndValidateResponse(
Expand Down Expand Up @@ -355,8 +361,11 @@ private Map<String, String> tablePropsHelperForResponseBody(
originalProps.put(
"openhouse.tableLocation",
String.format(
"/tmp/%s/%s-%s/metadata.json",
responseBody.getDatabaseId(), responseBody.getTableId(), uuid));
"%s/%s/%s-%s/metadata.json",
localStorage.getClient().getRootPrefix(),
responseBody.getDatabaseId(),
responseBody.getTableId(),
uuid));
return originalProps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void setUp() {
when(storageManager.getStorageFromPath(any())).thenReturn(storage);
when(storage.getClient()).thenReturn(storageClient);
when(storageClient.getRootPrefix()).thenReturn("/tmp");
when(storageClient.fileExists(any())).thenReturn(true);
when(storageClient.exists(any())).thenReturn(true);
}

@Test
Expand Down Expand Up @@ -163,7 +163,7 @@ public void testUUIDFailsForInvalidSnapshot() {

@Test
public void testUUIDFailsForNonExistingOpenhouseDotPropertyPath() {
when(storage.getClient().fileExists(any())).thenReturn(false);
when(storage.getClient().exists(any())).thenReturn(false);

RequestValidationFailureException exception =
Assertions.assertThrows(
Expand Down

0 comments on commit ea7a771

Please sign in to comment.