Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CTAS for non-hdfs storages, also fixes multi storage cases #256

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

vikrambohra
Copy link
Collaborator

@vikrambohra vikrambohra commented Nov 19, 2024

Summary

This PR introduces the following changes

  1. Fix CTAS for non-hdfs storage types
    While extracting UUID from a snapshot, the code constructs the database path excluding the endpoint (scheme) when checking if it is a prefix of the manifestList that is part of the snapshot

Example
ManifestList (from snapshot): s3://bucket-name/database/table-uuid/file.avro
Database prefix: bucket-name/database (not a prefix of above)

Fix: Strip the endpoint(scheme) from the manifest list by resolving the correct storage from the tableLocation

After fix
ManifestList stripped : bucket-name/database/table-uuid/file.avro
Database prefix: bucket-name/database (is a prefix of above)

  1. Fix multiple storage scenario
    There are assumptions in code that storage is always cluster default. This fails when the default is a storage without scheme (hdfs) and the db.table is being stored in a storage with scheme (S3, BlobFs)

Fix: Resolve the correct storage for by extracting the tableLocation from table props and checking the scheme (endpoint)

  1. Adds a method to storage interface to check if the tableLocation exists
  2. Add a method to StorageClient to check if a specified path exists.

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

For all the boxes checked, please include additional details of the changes made in this pull request.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.
  1. Updated TableUUIDGeneratorTest
  2. Added TableUUIDGeneratorMultiStorageTest
  3. Ran Docker setup for both s3 and hdfs
HDFS

scala> spark.sql("CREATE TABLE openhouse.db.t1 (name string)")
res0: org.apache.spark.sql.DataFrame = []

scala> spark.sql("SHOW TBLPROPERTIES openhouse.db.t1").show(100, false)
+------------------------------------------+-------------------------------------------------------------------------------------------------------------------+
|key                                       |value                                                                                                              |
+------------------------------------------+-------------------------------------------------------------------------------------------------------------------+
|openhouse.tableUri                        |LocalHadoopCluster.db.t1                                                                                           |
|openhouse.creationTime                    |1732661881733                                                                                                      |
|current-snapshot-id                       |none                                                                                                               |
|write.metadata.delete-after-commit.enabled|true                                                                                                               |
|write.metadata.previous-versions-max      |28                                                                                                                 |
|openhouse.tableCreator                    |openhouse                                                                                                          |
|openhouse.lastModifiedTime                |1732661881733                                                                                                      |
|openhouse.tableType                       |PRIMARY_TABLE                                                                                                      |
|policies                                  |                                                                                                                   |
|openhouse.tableId                         |t1                                                                                                                 |
|openhouse.tableUUID                       |8faacae2-3b3c-4b24-8169-251576b31e04                                                                               |
|openhouse.databaseId                      |db                                                                                                                 |
|openhouse.clusterId                       |LocalHadoopCluster                                                                                                 |
|format                                    |iceberg/orc                                                                                                        |
|openhouse.tableVersion                    |INITIAL_VERSION                                                                                                    |
|write.format.default                      |orc                                                                                                                |
|write.parquet.compression-codec           |zstd                                                                                                               |
|openhouse.tableLocation                   |/data/openhouse/db/t1-8faacae2-3b3c-4b24-8169-251576b31e04/00000-13b06eb9-e1db-4857-b152-842fbe52b2eb.metadata.json|
+------------------------------------------+-------------------------------------------------------------------------------------------------------------------+


scala> spark.sql("INSERT INTO openhouse.db.t1 values ('value1')")
res2: org.apache.spark.sql.DataFrame = []                                       

scala> spark.sql("SELECT * FROM openhouse.db.t1").show()
+------+
|  name|
+------+
|value1|
+------+


scala> spark.sql("CREATE TABLE openhouse.db.ctas1 AS SELECT * FROM openhouse.db.t1")
res4: org.apache.spark.sql.DataFrame = []                                       

scala> spark.sql("SELECT * FROM openhouse.db.ctas1").show()
+------+
|  name|
+------+
|value1|
+------+


scala> spark.sql("DROP TABLE openhouse.db.ctas1").show()
++
||
++
++


S3

scala> spark.sql("CREATE TABLE openhouse.db.t1 (name string)")
res0: org.apache.spark.sql.DataFrame = []

scala> spark.sql("SHOW TBLPROPERTIES openhouse.db.t1").show(100, false)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
+------------------------------------------+-------------------------------------------------------------------------------------------------------------------------+
|key                                       |value                                                                                                                    |
+------------------------------------------+-------------------------------------------------------------------------------------------------------------------------+
|openhouse.tableLocation                   |s3://openhouse-bucket/db/t1-9f9c16ca-49e5-4cc6-a32d-4116bfd28ded/00000-b7feaf18-4a32-4624-b903-0ae92d85d482.metadata.json|
|openhouse.clusterId                       |LocalS3Cluster                                                                                                           |
|current-snapshot-id                       |none                                                                                                                     |
|openhouse.lastModifiedTime                |1732662172134                                                                                                            |
|write.metadata.delete-after-commit.enabled|true                                                                                                                     |
|write.metadata.previous-versions-max      |28                                                                                                                       |
|openhouse.tableCreator                    |openhouse                                                                                                                |
|openhouse.tableUri                        |LocalS3Cluster.db.t1                                                                                                     |
|openhouse.tableType                       |PRIMARY_TABLE                                                                                                            |
|policies                                  |                                                                                                                         |
|openhouse.tableId                         |t1                                                                                                                       |
|openhouse.creationTime                    |1732662172134                                                                                                            |
|openhouse.databaseId                      |db                                                                                                                       |
|format                                    |iceberg/orc                                                                                                              |
|openhouse.tableVersion                    |INITIAL_VERSION                                                                                                          |
|openhouse.tableUUID                       |9f9c16ca-49e5-4cc6-a32d-4116bfd28ded                                                                                     |
|write.format.default                      |orc                                                                                                                      |
|write.parquet.compression-codec           |zstd                                                                                                                     |
+------------------------------------------+-------------------------------------------------------------------------------------------------------------------------+


scala> spark.sql("INSERT INTO openhouse.db.t1 values ('value1')")
res2: org.apache.spark.sql.DataFrame = []                                       

scala> spark.sql("SELECT * FROM openhouse.db.t1").show()
+------+
|  name|
+------+
|value1|
+------+


scala> spark.sql("CREATE TABLE openhouse.db.ctas1 AS SELECT * FROM openhouse.db.t1")
res4: org.apache.spark.sql.DataFrame = []

scala> spark.sql("SELECT * FROM openhouse.db.ctas1").show()
+------+
|  name|
+------+
|value1|
+------+


scala> spark.sql("DROP TABLE openhouse.db.ctas1").show()
++
||
++
++

For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

For all the boxes checked, include additional details of the changes made in this pull request.

Copy link
Collaborator

@HotSushi HotSushi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Storage API changes. One feedback on not introducing iceberg at rest layer.

@vikrambohra
Copy link
Collaborator Author

vikrambohra commented Nov 22, 2024

@HotSushi @jainlavina Addressed the comments. Some changes in the latest commit

  1. Removed tableLocationExists() from the storage api - we dont need to construct the table location since we fetch it from table properties.
  2. Changed the pathExists in storageClient api to fileExists to be explicit about the check since we are now checking the absolute path of metadata,json file (table location) instead of only the path till table directory.

public Storage getStorageFromPath(String path) {
for (Storage storage : storages) {
if (storage.isConfigured()) {
if (StorageType.LOCAL.equals(storage.getType())) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be fallback only if path does not start with any other configured endpoint?
What if the path is an S3 storage path but local storage is also configured for some other tables?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

➕ lets have if (StorageType.LOCAL.equals(storage.getType())) { in the fallback

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. Please check

extractFromTblPropsIfExists(databaseId + "." + tableId, tableProperties, DB_RAW_KEY);
String tblIdFromProps =
extractFromTblPropsIfExists(databaseId + "." + tableId, tableProperties, TBL_RAW_KEY);
String tableURI = String.format("%s.%s", databaseId, tableId);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rohitkum2506 can you check if the table reinstatement logic for replication is still intact? especially at:

String tableLocation = extractFromTblPropsIfExists(tableURI, tableProperties, TBL_LOC_RAW_KEY);

public Storage getStorageFromPath(String path) {
for (Storage storage : storages) {
if (storage.isConfigured()) {
if (StorageType.LOCAL.equals(storage.getType())) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

➕ lets have if (StorageType.LOCAL.equals(storage.getType())) { in the fallback

Copy link
Collaborator

@HotSushi HotSushi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For extractUUIDFromTableProperties, the logic before would check for directory existence thats missing now. We should be able to incorporate it as part of Storage.isPathValid().

For extractUUIDFromExistingManifestListPath we end up calling storage.getClient().getEndpoint(), we should introduce better layering/ or redefine the logic. But i'm ok pursuing this in a future PR.

* Checks if the provided path is a valid path for this storage type. It defaults to checking if
* the path starts with the endpoint (scheme) specified in cluster.yaml
*
* @param path path to a file/object
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you add example of what a path string looks like for HDFS + s3, similar to StorageClient doc. Also add a doc explicitly that says prefix check wouldn't work, only object existance check works

storageManager, dbIdFromProps, tblIdFromProps, tableUUIDProperty);
if (TableType.REPLICA_TABLE != tableType && !doesPathExist(previousPath)) {
log.error("Previous tableLocation: {} doesn't exist", previousPath);
if (TableType.REPLICA_TABLE != tableType && !storage.getClient().exists(tableLocation)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

storage.getClient().exists(tableLocation) -> storage.isPathValid(tableLocation)? after incorporating existence check feedback.

* @return true if endpoint is specified in cluster.yaml else false
*/
default boolean isPathValid(String path) {
return path.startsWith(getClient().getEndpoint());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt we also check for object existence here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isPathValid should check three things:

  • its appropriate for the right storage (ie: prefix matches s3://)
  • its directory structure is valid
    • rootprefix is intact (ie. /data/openhouse/db/tb/metadata.json has rootPrefix data/openhouse)
    • db/table directory is intact (ie. /data/openhouse/db/tb/metadata.json has directory structure thats created by storage.allocateTableLocation(), /db/tb-uuid)
  • and object exists(ie. /data/openhouse/db/tb/metadata.json);

To achieve this, you might need to change the signature to: boolean isPathValid(String path, dbname, tbname, tbuuid)

*/
@Override
public boolean isPathValid(String path) {
return (super.isPathValid(path) || path.startsWith("/"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll also need to validate rootprefix as well, basically this method needs to be equivalent to:

java.nio.file.Path previousPath =
        InternalRepositoryUtils.constructTablePath(
            storageManager, dbIdFromProps, tblIdFromProps, tableUUIDProperty);

!doesPathExist(previousPath)

in TableUUIDGenerator

Preconditions.checkArgument(
path.startsWith(getEndpoint()), String.format("Invalid S3 URL format %s", path));
try {
URI uri = new URI(path);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use s3 utils to get bucket, key information?

import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Utilities;
import software.amazon.awssdk.services.s3.model.S3Uri;

public class S3UriParser {
    public static void main(String[] args) {
        S3Utilities s3Utilities = s3Client.utilities();
        String s3UriString = "s3://my-bucket/path/to/file.txt";
        S3Uri s3Uri = s3Utilities.parseUri(URI.create(s3UriString));
        
        String bucket = s3Uri.bucket().orElse(null);
        String key = s3Uri.key().orElse(null);
        
        System.out.println("Bucket: " + bucket);
        System.out.println("Key: " + key);
    }
}

String manifestListPathString =
new Gson().fromJson(snapshotStr, JsonObject.class).get(manifestListKey).getAsString();
manifestListPathString =
StringUtils.removeStart(manifestListPathString, storage.getClient().getEndpoint());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a better way to achieve this without calling storage.getClient().getEndpoint, I'm ok with deferring this part to future pr.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants