Skip to content

Commit

Permalink
fix(datasets): add metadata parameter to datasets (#708)
Browse files Browse the repository at this point in the history
Signed-off-by: michal-mmm <[email protected]>
Co-authored-by: Ankita Katiyar <[email protected]>
  • Loading branch information
michal-mmm and ankatiyar authored Jun 12, 2024
1 parent 8c15f03 commit 105fcf2
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ def __init__( # noqa: PLR0913
schema: dict[str, Any] | None = None,
partition_columns: list[str] | None = None,
owner_group: str | None = None,
metadata: dict[str, Any] | None = None,
) -> None:
"""Creates a new instance of ``ManagedTableDataset``.
Expand Down Expand Up @@ -259,6 +260,8 @@ def __init__( # noqa: PLR0913
owner_group: if table access control is enabled in your workspace,
specifying owner_group will transfer ownership of the table and database to
this owner. All databases should have the same owner_group. Defaults to None.
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.
Raises:
DatasetError: Invalid configuration supplied (through ManagedTable validation)
"""
Expand All @@ -276,6 +279,7 @@ def __init__( # noqa: PLR0913
)

self._version = version
self.metadata = metadata

super().__init__(
filepath=None, # type: ignore[arg-type]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ def __init__(
*,
dataset_name: str,
dataset_kwargs: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
):
self.dataset_name = dataset_name
self._dataset_kwargs = dataset_kwargs or {}
self.metadata = metadata

def _load(self):
return load_dataset(self.dataset_name, **self._dataset_kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ def __init__(
task: str | None = None,
model_name: str | None = None,
pipeline_kwargs: dict[str, t.Any] | None = None,
metadata: dict[str, t.Any] | None = None,
):
if task is None and model_name is None:
raise ValueError("At least 'task' or 'model_name' are needed")
self._task = task if task else None
self._model_name = model_name
self._pipeline_kwargs = pipeline_kwargs or {}
self.metadata = metadata

if self._pipeline_kwargs and (
"task" in self._pipeline_kwargs or "model" in self._pipeline_kwargs
Expand Down
4 changes: 4 additions & 0 deletions kedro-datasets/kedro_datasets/ibis/table_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def __init__( # noqa: PLR0913
connection: dict[str, Any] | None = None,
load_args: dict[str, Any] | None = None,
save_args: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
) -> None:
"""Creates a new ``TableDataset`` pointing to a table (or file).
Expand Down Expand Up @@ -117,6 +118,8 @@ def __init__( # noqa: PLR0913
objects are materialized as views. To save a table using
a different materialization strategy, supply a value for
`materialized` in `save_args`.
metadata: Any arbitrary metadata. This is ignored by Kedro,
but may be consumed by users or external plugins.
"""
if filepath is None and table_name is None:
raise DatasetError(
Expand All @@ -127,6 +130,7 @@ def __init__( # noqa: PLR0913
self._file_format = file_format
self._table_name = table_name
self._connection_config = connection
self.metadata = metadata

# Set load and save arguments, overwriting defaults if provided.
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
Expand Down
2 changes: 2 additions & 0 deletions kedro-datasets/kedro_datasets/polars/eager_polars_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__( # noqa: PLR0913
version: Version | None = None,
credentials: dict[str, Any] | None = None,
fs_args: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
):
"""Creates a new instance of ``EagerPolarsDataset`` pointing to a concrete data file
on a specific filesystem. The appropriate polars load/save methods are dynamically
Expand Down Expand Up @@ -124,6 +125,7 @@ def __init__( # noqa: PLR0913

self._protocol = protocol
self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args)
self.metadata = metadata

super().__init__(
filepath=PurePosixPath(path),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ class SparkStreamingDataset(AbstractDataset):
DEFAULT_LOAD_ARGS = {} # type: dict[str, Any]
DEFAULT_SAVE_ARGS = {} # type: dict[str, Any]

def __init__(
def __init__( # noqa: PLR0913
self,
*,
filepath: str = "",
file_format: str = "",
save_args: dict[str, Any] | None = None,
load_args: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
) -> None:
"""Creates a new instance of SparkStreamingDataset.
Expand All @@ -73,10 +74,13 @@ def __init__(
respectively. You can find a list of options for each selected format in
Spark DataFrame write documentation, see
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.
"""
self._file_format = file_format
self._save_args = save_args
self._load_args = load_args
self.metadata = metadata

fs_prefix, filepath = _split_filepath(filepath)

Expand Down

0 comments on commit 105fcf2

Please sign in to comment.