Skip to content

Commit

Permalink
Read VCF URIs from a metadata array (#454)
Browse files Browse the repository at this point in the history
  • Loading branch information
gspowley authored Aug 21, 2023
1 parent 9738cf1 commit e377de1
Showing 1 changed file with 69 additions and 9 deletions.
78 changes: 69 additions & 9 deletions src/tiledb/cloud/vcf/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,44 @@ def read_uris_udf(
return result


def read_metadata_uris_udf(
dataset_uri: str,
*,
config: Optional[Mapping[str, Any]] = None,
metadata_uri: str,
metadata_attr: str = "uri",
max_files: Optional[int] = None,
verbose: bool = False,
) -> Sequence[str]:
"""
Read a list of URIs from a TileDB array. The URIs will be read from the
attribute specified in the `metadata_attr` argument.
:param dataset_uri: dataset URI
:param config: TileDB config, defaults to None
:param metadata_uri: metadata array URI
:param metadata_attr: name of metadata attribute containing URIs, defaults to "uri"
:param max_files: maximum number of URIs returned, defaults to None
:param verbose: verbose logging, defaults to False
:return: list of URIs
"""
logger = get_logger_wrapper(verbose)

with tiledb.scope_ctx(config):
with Profiler(group_uri=dataset_uri, group_member=LOG_ARRAY) as prof:
with tiledb.open(metadata_uri) as A:
df = A.query(dims=[], attrs=[metadata_attr]).df[:]
results = df[metadata_attr].to_list()

if max_files:
results = results[:max_files]

logger.info("Read %d VCF URIs from the metadata array.", len(results))
prof.write("count", len(results))

return results


def find_uris_udf(
dataset_uri: str,
search_uri: str,
Expand Down Expand Up @@ -918,6 +956,8 @@ def ingest_manifest_dag(
pattern: Optional[str] = None,
ignore: Optional[str] = None,
sample_list_uri: Optional[str] = None,
metadata_uri: Optional[str] = None,
metadata_attr: str = "uri",
max_files: Optional[int] = None,
batch_size: int = MANIFEST_BATCH_SIZE,
workers: int = MANIFEST_WORKERS,
Expand All @@ -939,6 +979,8 @@ def ingest_manifest_dag(
:param pattern: pattern to match when searching for VCF files, defaults to None
:param ignore: pattern to ignore when searching for VCF files, defaults to None
:param sample_list_uri: URI with a list of VCF URIs, defaults to None
:param metadata_uri: URI of metadata array holding VCF URIs, defaults to None
:param metadata_attr: name of metadata attribute containing URIs, defaults to "uri"
:param max_files: maximum number of URIs to ingest, defaults to None
:param batch_size: manifest batch size, defaults to MANIFEST_BATCH_SIZE
:param workers: maximum number of parallel workers, defaults to MANIFEST_WORKERS
Expand Down Expand Up @@ -1004,7 +1046,19 @@ def ingest_manifest_dag(
**kwargs,
)

sample_uris = submit(
if metadata_uri:
sample_uris = submit(
read_metadata_uris_udf,
dataset_uri_result,
config=config,
metadata_uri=metadata_uri,
metadata_attr=metadata_attr,
verbose=verbose,
name="Read VCF URIs from metadata ",
**kwargs,
)

filtered_sample_uris = submit(
filter_uris_udf,
dataset_uri_result,
sample_uris,
Expand All @@ -1016,7 +1070,7 @@ def ingest_manifest_dag(

run_dag(graph)

sample_uris = sample_uris.result()
sample_uris = filtered_sample_uris.result()

if not sample_uris:
logger.info("All samples found are already in the manifest.")
Expand Down Expand Up @@ -1330,6 +1384,8 @@ def ingest(
pattern: Optional[str] = None,
ignore: Optional[str] = None,
sample_list_uri: Optional[str] = None,
metadata_uri: Optional[str] = None,
metadata_attr: str = "uri",
max_files: Optional[int] = None,
max_samples: Optional[int] = None,
contigs: Optional[Union[Sequence[str], Contigs]] = Contigs.ALL,
Expand Down Expand Up @@ -1364,6 +1420,8 @@ def ingest(
:param ignore: Unix shell style pattern to ignore when searching for VCF files,
defaults to None
:param sample_list_uri: URI with a list of VCF URIs, defaults to None
:param metadata_uri: URI of metadata array holding VCF URIs, defaults to None
:param metadata_attr: name of metadata attribute containing URIs, defaults to "uri"
:param max_files: maximum number of VCF URIs to read/find,
defaults to None (no limit)
:param max_samples: maximum number of samples to ingest, defaults to None (no limit)
Expand Down Expand Up @@ -1398,14 +1456,14 @@ def ingest(
"""

# Validate user input
if not search_uri and not sample_list_uri:
raise ValueError("Either `search_uri` or `sample_list_uri` must be provided.")

if search_uri and sample_list_uri:
raise ValueError("Cannot specify both `search_uri` and `sample_list_uri`.")
if sum([bool(search_uri), bool(sample_list_uri), bool(metadata_uri)]) != 1:
raise ValueError(
"Exactly one of `search_uri`, `sample_list_uri`, or `metadata_uri`"
" must be provided."
)

if sample_list_uri and (pattern or ignore):
raise ValueError("Cannot specify `pattern` or `ignore` with `sample_list_uri`.")
if not search_uri and (pattern or ignore):
raise ValueError("Only specify `pattern` or `ignore` with `search_uri`.")

if not batch_mode and access_credentials_name:
raise ValueError(
Expand All @@ -1427,6 +1485,8 @@ def ingest(
pattern=pattern,
ignore=ignore,
sample_list_uri=sample_list_uri,
metadata_uri=metadata_uri,
metadata_attr=metadata_attr,
max_files=max_files,
batch_size=manifest_batch_size,
workers=manifest_workers,
Expand Down

0 comments on commit e377de1

Please sign in to comment.