Skip to content

Commit

Permalink
SOMA ingestion: Refactor access_credentials_name with acn (#630)
Browse files Browse the repository at this point in the history
Co-authored-by: John Kerl <[email protected]>
  • Loading branch information
JohnMoutafis and johnkerl authored Aug 9, 2024
1 parent 5d4cd82 commit cf9b4d0
Showing 1 changed file with 41 additions and 19 deletions.
60 changes: 41 additions & 19 deletions src/tiledb/cloud/soma/ingest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import re
import warnings
from typing import ContextManager, Dict, Optional
from unittest import mock

Expand All @@ -20,26 +21,28 @@ def run_ingest_workflow_udf(
output_uri: str,
input_uri: str,
measurement_name: str,
# Some kwargs are eaten by the tiledb.cloud package, and won't reach
# our child. In order to propagate these to a _grandchild_ we need to
# package these up with different names. We use a dict as a single bag.
carry_along: Dict[str, Optional[str]],
pattern: Optional[str] = None,
extra_tiledb_config: Optional[Dict[str, object]] = None,
platform_config: Optional[Dict[str, object]] = None,
ingest_mode: str = "write",
resources: Optional[Dict[str, object]] = None,
namespace: Optional[str] = None,
access_credentials_name: Optional[str] = None,
acn: Optional[str] = None,
logging_level: int = logging.INFO,
dry_run: bool = False,
**kwargs,
) -> Dict[str, str]:
"""
This is the highest-level ingestor component that runs on-node. Only here
can we do VFS with access_credentials_name -- that does not work correctly
on the client.
"""

# Some kwargs are eaten by the tiledb.cloud package, and won't reach
# our child. In order to propagate these to a _grandchild_ we need to
# package these up with different names. We use a dict as a single bag.
carry_along: Dict[str, str] = kwargs.pop("carry_along", {})

# For more information on "that does not work correctly on the client" please see
# https://github.com/TileDB-Inc/TileDB-Cloud-Py/pull/512

Expand All @@ -59,7 +62,7 @@ def run_ingest_workflow_udf(
grf = dag.DAG(
name=name,
mode=dag.Mode.BATCH,
namespace=carry_along["namespace"],
namespace=carry_along.get("namespace", namespace),
)
grf.submit(
_ingest_h5ad_byval,
Expand All @@ -69,8 +72,8 @@ def run_ingest_workflow_udf(
extra_tiledb_config=extra_tiledb_config,
ingest_mode=ingest_mode,
platform_config=platform_config,
resources=carry_along["resources"],
access_credentials_name=carry_along["access_credentials_name"],
resources=carry_along.get("resources", resources),
access_credentials_name=carry_along.get("access_credentials_name", acn),
logging_level=logging_level,
dry_run=dry_run,
)
Expand Down Expand Up @@ -117,8 +120,8 @@ def run_ingest_workflow_udf(
extra_tiledb_config=extra_tiledb_config,
ingest_mode=ingest_mode,
platform_config=platform_config,
resources=carry_along["resources"],
access_credentials_name=carry_along["access_credentials_name"],
resources=carry_along.get("resources", resources),
access_credentials_name=carry_along.get("access_credentials_name", acn),
logging_level=logging_level,
dry_run=dry_run,
)
Expand Down Expand Up @@ -243,9 +246,10 @@ def run_ingest_workflow(
ingest_mode: str = "write",
resources: Optional[Dict[str, object]] = None,
namespace: Optional[str] = None,
access_credentials_name: Optional[str] = None,
acn: Optional[str] = None,
logging_level: int = logging.INFO,
dry_run: bool = False,
**kwargs,
) -> Dict[str, str]:
"""Starts a workflow to ingest H5AD data into SOMA.
Expand Down Expand Up @@ -274,14 +278,30 @@ def run_ingest_workflow(
:param resources: A specification for the amount of resources to provide
to the UDF executing the ingestion process, to override the default.
:param namespace: An alternate namespace to run the ingestion process under.
:param access_credentials_name: If provided, the name of the credentials
to pass to the executing UDF.
:param acn: The name of the credentials to pass to the executing UDF.
:param dry_run: If provided and set to ``True``, does the input-path
traversals without ingesting data.
:return: A dictionary of ``{"status": "started", "graph_id": ...}``,
with the UUID of the graph on the server side, which can be used to
manage execution and monitor progress.
"""
# Demand for mutual exclusion of the two arguments and existence.
access_credentials_name = kwargs.pop("access_credentials_name", None)
if bool(acn) == bool(access_credentials_name):
raise ValueError(
"Ingestion graph requires either 'acn' or 'access_credentials_name'"
" (deprecated), cannot decipher correct credential when both specified."
)
# Backwards compatibility: Assign when only access_credentials_name is set
if not acn:
acn = access_credentials_name
warnings.warn(
DeprecationWarning(
"The 'access_credentials_name' parameter is about to be"
"deprecated and will be removed in future versions."
"Please use the 'acn' parameter instead."
)
)

# Graph init
grf = dag.DAG(
Expand All @@ -291,6 +311,12 @@ def run_ingest_workflow(
)

# Step 1: Ingest workflow UDF
carry_along: Dict[str, str] = {
"resources": _DEFAULT_RESOURCES if resources is None else resources,
"namespace": namespace,
"access_credentials_name": acn,
}

grf.submit(
_run_ingest_workflow_udf_byval,
output_uri=output_uri,
Expand All @@ -302,12 +328,8 @@ def run_ingest_workflow(
ingest_mode=ingest_mode,
resources=resources,
namespace=namespace,
access_credentials_name=access_credentials_name,
carry_along={
"resources": _DEFAULT_RESOURCES if resources is None else resources,
"namespace": namespace,
"access_credentials_name": access_credentials_name,
},
access_credentials_name=acn,
carry_along=carry_along,
logging_level=logging_level,
dry_run=dry_run,
)
Expand Down

0 comments on commit cf9b4d0

Please sign in to comment.