Skip to content

Commit

Permalink
Updates for VCF ingestion with tiledb URIs (#623)
Browse files Browse the repository at this point in the history
  • Loading branch information
gspowley authored Jul 30, 2024
1 parent f79efa9 commit 43ce88b
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
3 changes: 2 additions & 1 deletion src/tiledb/cloud/utilities/consolidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ def consolidate_and_vacuum(
logger = get_logger()

with tiledb.scope_ctx(config):
if vacuum_fragments:
is_remote = array_uri.startswith("tiledb://")
if not is_remote and vacuum_fragments:
logger.info("Vacuuming fragments")
tiledb.vacuum(
array_uri,
Expand Down
30 changes: 18 additions & 12 deletions src/tiledb/cloud/vcf/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,12 @@ def get_logger_wrapper(
return logger


def create_manifest(dataset_uri: str) -> None:
def create_manifest(dataset_uri: str, group: tiledb.Group) -> None:
"""
Create a manifest array in the dataset.
:param dataset_uri: dataset URI
:param group: dataset group
"""

manifest_uri = f"{dataset_uri}/{MANIFEST_ARRAY}"
Expand Down Expand Up @@ -140,9 +141,10 @@ def create_manifest(dataset_uri: str) -> None:

tiledb.Array.create(manifest_uri, schema)

group = tiledb.Group(dataset_uri, "w")
group.add(MANIFEST_ARRAY, name=MANIFEST_ARRAY, relative=True)
group.close()
if dataset_uri.startswith("tiledb://"):
group.add(manifest_uri, name=MANIFEST_ARRAY, relative=False)
else:
group.add(MANIFEST_ARRAY, name=MANIFEST_ARRAY, relative=True)


# --------------------------------------------------------------------
Expand Down Expand Up @@ -207,14 +209,17 @@ def create_dataset_udf(
log_uri = f"{dataset_uri}/{LOG_ARRAY}"
create_log_array(log_uri)
with tiledb.Group(dataset_uri, "w") as group:
group.add(LOG_ARRAY, name=LOG_ARRAY, relative=True)
if dataset_uri.startswith("tiledb://"):
group.add(log_uri, name=LOG_ARRAY, relative=False)
else:
group.add(LOG_ARRAY, name=LOG_ARRAY, relative=True)

write_log_event(log_uri, "create_dataset_udf", "create", data=dataset_uri)
# Create manifest array and add it to the dataset group if
# not creating an annotation dataset.
if not annotation_dataset:
create_manifest(dataset_uri, group)

# Create manifest array and add it to the dataset group if
# not creating an annotation dataset.
if not annotation_dataset:
create_manifest(dataset_uri)
write_log_event(log_uri, "create_dataset_udf", "create", data=dataset_uri)
else:
logger.info("Using existing dataset: %r", dataset_uri)

Expand Down Expand Up @@ -897,16 +902,17 @@ def consolidate_dataset_udf(
for member in group:
uri = member.uri
name = member.name
is_remote = uri.startswith("tiledb://")

# Skip excluded and non-included arrays
if (exclude and name in exclude) or (include and name not in include):
continue

# NOTE: REST currently only supports fragment_meta, commits, metadata
modes = ["commits", "fragment_meta", "array_meta"]
modes = ["commits", "fragment_meta"]

# Consolidate fragments for selected arrays
if name in [LOG_ARRAY, MANIFEST_ARRAY, "vcf_headers"]:
if not is_remote and name in [LOG_ARRAY, MANIFEST_ARRAY, "vcf_headers"]:
modes += ["fragments"]

for mode in modes:
Expand Down

0 comments on commit 43ce88b

Please sign in to comment.