Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload folder implementation. #618

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 174 additions & 1 deletion src/tiledb/cloud/files/utils.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
import base64
import logging
import os
import re
import urllib.parse
import warnings
from fnmatch import fnmatch
from typing import Mapping, Optional, Tuple, Union
from typing import Dict, List, Mapping, Optional, Tuple, Union

import attrs

import tiledb
import tiledb.cloud
import tiledb.cloud.tiledb_cloud_error as tce
from tiledb.cloud import array
from tiledb.cloud import client
from tiledb.cloud import config
from tiledb.cloud import groups
from tiledb.cloud import rest_api
from tiledb.cloud import tiledb_cloud_error
from tiledb.cloud._common import utils
from tiledb.cloud.rest_api import ApiException as GenApiException
from tiledb.cloud.rest_api import configuration
from tiledb.cloud.rest_api import models
from tiledb.cloud.utilities import get_logger_wrapper


def sanitize_filename(fname: str) -> str:
Expand Down Expand Up @@ -262,3 +268,170 @@ def _auth_headers(cfg: configuration.Configuration) -> Mapping[str, object]:
return {"authorization": basic}
return {}
# No authentication has been provided. Do nothing.


@attrs.define
class UploadFoldersResults:
directory: str
report: str
errors: dict
sub_folders: list["UploadFoldersResults"]


def upload_folder(
input_uri: str,
output_uri: str,
*,
parent_group_uri: Optional[str] = None,
access_credentials_name: Optional[str] = None,
config: Optional[dict] = None,
flatten: bool = False,
serializable: bool = True,
logger: Optional[logging.Logger] = None,
verbose: bool = False,
) -> Union[dict, UploadFoldersResults]:
"""
Uploads a folder to TileDB Cloud.
By default respects the initial folder structure in the destination.

:param input_uri: The URI or path of the input folder. May be an ordinary path
or any URI accessible via TileDB VFS.
:param output_uri: The TileDB URI to write the folder into.
:param parent_group_uri: A TileDB Group URI to add folder under,
defaults to None
:param access_credentials_name: If present, the name of the credentials
to use when writing the uploaded file to backend storage instead of
the defaults.
:param config: Config dictionary, defaults to None
:param flatten: Flag. If set to True, the upload will flatten the folder
structure instead of recreating it. (Not Implemented yet)
:param serializable: Flag. If set to True the function returns a dictionary
report. Differently it returns an UploadFoldersResults attrs class.
Defaults to True.
:param logger: A logging.Logger instance, defaults to None.
:param verbose: Verbose logging, defaults to None
:return: A dictionary containing a report message
and an upload errors dictionary (if any)
"""
logger = logger or get_logger_wrapper(verbose)
logger.info("=====")

if flatten:
raise NotImplementedError(
"The option to flatten a folder structure is not yet implemented"
)

# Prepare and sanitize arguments
output_uri = output_uri.strip("/")

input_uri = input_uri if input_uri.endswith(os.sep) else input_uri + os.sep
base_dir = os.path.dirname(input_uri)
base_dir = os.path.basename(base_dir)

namespace, name = utils.split_uri(output_uri)
_, sp, acn = groups._default_ns_path_cred(namespace=namespace)

# If `name` is a URL, assume it points to a cloud storage
storage_path = name if "://" in name else sp
storage_path = f"{storage_path.strip('/')}/{base_dir}"
tb_storage_uri = f"tiledb://{namespace}/{storage_path}"
logger.debug("Output storage path: %s", storage_path)
logger.debug("TileDB Storage URI: %r", tb_storage_uri)

access_credentials_name = access_credentials_name or acn
logger.debug("Storage path: %r", storage_path)
logger.debug("ACN: %s", access_credentials_name)

# Group check and/or creation
if "://" not in name:
group_uri = output_uri
else:
group_uri = f"tiledb://{namespace}/{base_dir}"
logger.debug("Group URI: %r", group_uri)

group_created = False
if not tiledb.object_type(group_uri, ctx=tiledb.cloud.Ctx()) == "group":
group_namespace, group_name = utils.split_uri(group_uri)
groups.create(
JohnMoutafis marked this conversation as resolved.
Show resolved Hide resolved
name=group_name,
namespace=group_namespace,
storage_uri=storage_path,
parent_uri=parent_group_uri,
credentials_name=access_credentials_name,
)
group_created = True
logger.debug("Group URI: %r created", group_uri)

# Upload Stats
logger.info("-----")
if parent_group_uri:
logger.info("Sub-Folder %r Upload Stats", base_dir)
else:
logger.info("Folder %r Upload Stats", base_dir)
logger.info("-----")
logger.info("- Input URI: %r", input_uri)
logger.info("- Output URI: %r", output_uri)
logger.info("-- Storage Path: %r", storage_path)
logger.info("- Group URI: %r", group_uri)
logger.info("-- Group Created: %s", group_created)

# Report results
uploaded = 0
dir_count = 0
upload_errors: Dict[str, str] = {}
results = UploadFoldersResults(
directory=base_dir, report="", errors={}, sub_folders=[]
)

vfs = tiledb.VFS(config=config)
# List local folder
input_ls: List[str] = vfs.ls(input_uri)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also a recursive=True flag available, I think this might be needed to list the whole hierarchy below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recursive vfs.ls flag lists every file AND folder in the input_uri.
The upload process does the recursion internally creating a Group for every sub-folder and then uploading the folder level listed files in it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JohnMoutafis can you check the performance of the current implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the following snippet:

def vfs_ls_recursive(uri):
    vfs = tiledb.VFS()
    files = []
    for fname in vfs.ls(uri):
        files.append(fname)
        if vfs.is_dir(fname):
            files += vfs_ls_recursive(fname)
    return files

to time the implementation's manual recursion against vfs.ls(<folder URI>, recursive=True) and vfs.ls_recursive on a local folder containing sub-folders, got the following results:

  • vfs_ls_recursive: 1.18 ms ± 24.4 µs per loop (mean ± std. dev. of 100 runs, 1,000 loops each)
  • vfs.ls(<folder URI>, recursive=True): 1.95 ms ± 121 µs per loop (mean ± std. dev. of 100 runs, 1,000 loops each)
  • vfs.ls_recursive: 1.96 ms ± 35.4 µs per loop (mean ± std. dev. of 100 runs, 1,000 loops each)

Timings were collected using IPython's %timeit for 100 runs of 1000 loops per run.
The solutions seem comparable.

with tiledb.Group(
group_uri, mode="w", config=config, ctx=tiledb.cloud.Ctx()
) as grp:
for fname in input_ls:
if vfs.is_dir(fname):
dir_count += 1

results.sub_folders.append(
upload_folder(
input_uri=fname,
output_uri=tb_storage_uri,
parent_group_uri=group_uri,
access_credentials_name=access_credentials_name,
config=config,
# Serialization concerns the final results.
serializable=False,
logger=logger,
verbose=verbose,
)
)
else:
logger.info("Uploading %r to Group %r" % (fname, group_uri))
filename = fname.split(base_dir + os.sep)[1]
tb_output_uri = f"{tb_storage_uri}/{filename}"
try:
uploaded_uri = upload_file(
input_uri=fname,
output_uri=tb_output_uri,
filename=filename,
access_credentials_name=access_credentials_name,
)
grp.add(uploaded_uri)
uploaded += 1
except Exception as exc:
logger.exception(
"File %r while uploading to %r raised an exception"
% (filename, tb_output_uri)
)
upload_errors[fname] = str(exc)
continue

results.report = f"Uploaded {uploaded}/{len(input_ls) - dir_count} files"
results.errors = upload_errors
logger.info(results.report)
logger.info("=====")

if serializable:
return attrs.asdict(results)
return results
Empty file.
Empty file.
88 changes: 86 additions & 2 deletions tests/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import pathlib
import tempfile
import time
import unittest
from typing import List

Expand Down Expand Up @@ -191,8 +192,91 @@ def test_round_trip(self):
array.delete_array(uri)


# FIXME: Will be fixed with #595 implementation
# Disable until then.
class UploadFolderTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.ASSERT_EXIST_TRIES = 5
cls.folder_basename = "upload_folder_test"
cls.test_folder = os.path.join(CURRENT_DIR, "data", cls.folder_basename)
cls.namespace, cls.storage_path, cls.acn = groups._default_ns_path_cred()
return super().setUpClass()

def setUp(self):
self.group_uri = (
f"tiledb://{self.namespace}/{testonly.random_name('upload_folder')}"
)
return super().setUp()

def tearDown(self):
tiledb.cloud.groups.delete(self.group_uri, recursive=True)
return super().tearDown()

def group_info(self, uri: str):
for _ in range(self.ASSERT_EXIST_TRIES):
try:
return groups.info(uri)
except Exception:
pass
time.sleep(1)
self.fail(f"Group '{uri}' does not exist")

def test_upload_folder(self):
report = file_utils.upload_folder(
input_uri=self.test_folder,
output_uri=self.group_uri,
access_credentials_name=self.acn,
serializable=True,
)

info = self.group_info(self.group_uri)
self.assertEqual(info.size, 4) # 2 sub-folders, 2 files

base_storage_uri = f"{self.storage_path}/{self.folder_basename}"
info = self.group_info(f"tiledb://{self.namespace}/nested_folder_1")
self.assertEqual(info.size, 2) # 2 files
self.assertEqual(info.uri, f"{base_storage_uri}/nested_folder_1")

info = self.group_info(f"tiledb://{self.namespace}/nested_folder_2")
self.assertEqual(info.size, 3) # 1 sub-folder, 2 files
self.assertEqual(info.uri, f"{base_storage_uri}/nested_folder_2")

info = self.group_info(f"tiledb://{self.namespace}/double_nested_folder")
self.assertEqual(info.size, 2) # 2 files
self.assertEqual(
info.uri, f"{base_storage_uri}/nested_folder_2/double_nested_folder"
)

self.assertDictEqual(
report,
{
"directory": self.folder_basename,
"report": "Uploaded 2/2 files",
"errors": {},
"sub_folders": [
{
"directory": "nested_folder_1",
"report": "Uploaded 2/2 files",
"errors": {},
"sub_folders": [],
},
{
"directory": "nested_folder_2",
"report": "Uploaded 2/2 files",
"errors": {},
"sub_folders": [
{
"directory": "double_nested_folder",
"report": "Uploaded 2/2 files",
"errors": {},
"sub_folders": [],
}
],
},
],
},
)


@unittest.skip("Skip until fixed VFS access")
class TestFileIngestion(unittest.TestCase):
@classmethod
Expand Down
Loading