Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to save and load from versioned ManagedTableDatasets #920

Open
jstammers opened this issue Oct 30, 2024 · 6 comments
Open

Unable to save and load from versioned ManagedTableDatasets #920

jstammers opened this issue Oct 30, 2024 · 6 comments
Labels
bug Something isn't working datasets

Comments

@jstammers
Copy link

jstammers commented Oct 30, 2024

Description

I am trying to make use of a versioned ManagedTableDataset so that I can correctly load and save using different versions of a delta table. I'm encountering an error when trying to load from a catalog, because the version is incorrectly configured for a delta table

Context

How has this bug affected you? What were you trying to accomplish?

Steps to Reproduce

conf = {'ds': {'type': 'kedro_datasets.databricks.ManagedTableDataset',
  'versioned': True,
  'database': 'dev',
  'table': 'table',
  'write_mode': 'overwrite'},
 'ds_2': {'type': 'kedro_datasets.databricks.ManagedTableDataset',
  'versioned': True,
  'database': 'dev',
  'table': 'table',
  'write_mode': 'overwrite'}}
catalog = DataCatalog.from_config(conf)
catalog.load("ds")

Expected Result

When creating these datasets, I expect that the load version numbers should be resolved from the current version, the specified version number or 0 if the table doesn't exist.

When calling ManagedTableDataset.save, the load and save version numbers should be incremented accordingly

ds = catalog.ds
ds_2 = catalog.ds_2

#check initial versions
assert ds.resolve_load_version() == 0
assert ds.resolve_save_version() == 1
assert ds_2.resolve_load_version() == 0
assert ds_2.resolve_save_version() == 1

ds.save(spark.createDataFrame(...))

#check ds has updated versions
assert ds.resolve_load_version() == 1
assert ds.resolve_save_version() == 2

#check load version of ds_2 is fixed and save version is updated
assert ds_2.resolve_load_version() == 0
assert ds_2.resolve_save_version() == 2

Actual Result

DatasetError: Failed while loading data from dataset ManagedTableDataset(database=dev, dataframe_type=spark, table=table, version=Version(load=None, save='2024-10-30T10.17.55.613Z'), write_mode=overwrite).
'>=' not supported between instances of 'NoneType' and 'int'

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-74f91739-2f89-4042-8eb7-77d385bb6dde/lib/python3.10/site-packages/kedro_datasets/databricks/_base_table_dataset.py:359, in BaseTableDataset._load(self)
    348 """Loads the version of data in the format defined in the init
    349 (spark|pandas dataframe).
    350 
   (...)
    357         in the format defined in the init.
    358 """
--> 359 if self._version and self._version.load >= 0:
    360     try:

TypeError: '>=' not supported between instances of 'NoneType' and 'int'

Your Environment

Include as many relevant details about the environment in which you experienced the bug:

  • Kedro version used (pip show kedro or kedro -V): 0.19.9
  • Kedro plugin and kedro plugin version used (pip show kedro-airflow): kedro-datasets - 5.1.0
  • Python version used (python -V): 3.10.12
  • Operating system and version: Linux 5.15.0-1073-azure Release Kedro-Airflow 0.5.1 #82~20.04.1-Ubuntu SMP Tue Sep 3 12:27:43 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux
@lrcouto lrcouto added bug Something isn't working datasets labels Oct 30, 2024
@lrcouto
Copy link
Contributor

lrcouto commented Oct 30, 2024

Hey @jstammers, thanks for raising this issue! The _load method used by ManagedTableDataset (inherited from BaseTableDataset) is making this comparison between None with 0 and causing the invalid type comparison you're seeing. While this is not addressed, you can try explicitly setting the initial version of your datasets to prevent it from defaulting to None.

@noklam
Copy link
Contributor

noklam commented Oct 30, 2024

@MinuraPunchihewa Do you have an idea?

I am actually confused that ManagedTableDataset is an AbstractVersionedDataset, which has resolve_load_version() etc. This is designed for file-based data, not for Table. The table versioning is possible with Delta, which is a separate versioning scheme come with Delta and is not compatible with Kedro native one (a special format timestamp).

We can probably implement this for a specific dataset, but before that can I understand a bit more the use case here? Do you plan to use this as the example you stated or you are just checking if it's versioned?

@MinuraPunchihewa
Copy link
Contributor

@MinuraPunchihewa Do you have an idea?

I am actually confused that ManagedTableDataset is an AbstractVersionedDataset, which has resolve_load_version() etc. This is designed for file-based data, not for Table. The table versioning is possible with Delta, which is a separate versioning scheme come with Delta and is not compatible with Kedro native one (a special format timestamp).

We can probably implement this for a specific dataset, but before that can I understand a bit more the use case here? Do you plan to use this as the example you stated or you are just checking if it's versioned?

@noklam I am happy to take a look at this.

About you comment on AbstractVersionedDataset being used for file-based data. Is that how it is meant to be used? I was under the impression that it can be used for any dataset (or data source rather) that implements versioning? I would love an explanation on how exactly it is designed to be used.

@jstammers
Copy link
Author

@noklam yes, the use case I have in mind is to be able to load the previous version of a delta table, so that I can perform some validation of the changes to the table after updating it.

As a pipeline, it would look something like

pipeline = Pipeline([
    node(update_table, inputs=["table", "staging_table"], outputs="updated_table"),
    node(validate_changes, inputs=["table", "updated_table"], outputs="changes")
    ])

where "table" and "updated_table" reference the same underlying delta table. When calling validate_changes, I expect "updated_table" to load from version n and "table" to load from version n-1

As for inferring the version number, I think the simplest way to do that is to use the following spark SQL statement

current_version = spark.sql("DESCRIBE HISTORY <catalog>.<database>.<table>").select("version").first()[0]

@noklam
Copy link
Contributor

noklam commented Oct 31, 2024

I'd be looking at some PoC to play with Iceberg and versioning and may come back to this a little bit.

@jstammers The other options is do this validation with hook instead of a node (nothing wrong with the current approach as well). How does the node generate the delta change? I see that the nodes has two inputs and split out the "changes" as output. Is this some kind of incremental pipeline?

@noklam
Copy link
Contributor

noklam commented Oct 31, 2024

About you comment on AbstractVersionedDataset being used for file-based data. Is that how it is meant to be used? I was under the impression that it can be used for any dataset (or data source rather) that implements versioning?

From my understanding, it was designed for filebase data. Version is a class that takes load_version and save version.

There are couple of requirements here:

  1. Version number need to be monotonic, as Kedro requires this to ensure the pipeline running with correct data. i.e. "resolve_load_version" should pick the latest file, right now it's assuming the latest timestamp.
  2. Optionally, this "load_version" can be specified with a --load-versions argument with kedro run per datasets.
    • This is a less important requirement, because worst case we can choose to not support arbitrary version via CLI for this specific dataset
  3. save_version comes from the session_id, which is the timestamp Kedro generates.

Take this example:

my_data:
   type: some.FileDataset
   path: my_folder/abc.file
   versioned: true

This is expected to save file as

my_folder/abc.file/timestamp1/abc.file
my_folder/abc.file/timestamp2/abc.file
my_folder/abc.file/timestamp3/abc.file

Noted that abc.file is both a folder name (slightly awkward), but also the file name. So the assumption here is, a file need to have a path (not necessary true for a Database table), and most table format has their own versioning scheme that was designed with much stronger feature sets (ACID for Delta/Iceberg).

Cc @merelcht

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working datasets
Projects
None yet
Development

No branches or pull requests

4 participants