Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kedro-datasets: ibis.FileDataset w/ ibis.TableDataset in pipeline #935

Open
mark-druffel opened this issue Nov 19, 2024 · 11 comments
Open
Assignees
Labels
Community Issue/PR opened by the open-source community

Comments

@mark-druffel
Copy link

Description

I'm trying to update a pipeline to use the new ibis.FileDataset. My pipeline reads in csv files, but writes them to duckdb for all data engineering operations. My current catalog is:

seed_tracks:
  type: ibis.TableDataset
  filepath:  data/01_raw/tracks.csv
  file_format: csv
  connection: 
    backend: duckdb
    database: data/db/spotify.db

bronze_tracks:
  type: ibis.TableDataset
  table_name: bronze_tracks
  connection: 
    backend: duckdb
    database: data/db/spotify.db
  save_args:
    materialized: table
    overwrite: True

silver_tracks:
  type: ibis.TableDataset
  table_name: silver_tracks
  connection: 
    backend: duckdb
    database: data/db/spotify.db
  save_args:
    materialized: table
    overwrite: True

image

When I change the first catalog entry to FileDataset it fails with the message Catalog Error: Table with name ibis_read_csv_24taho52bbdw5nhlthjptakvyu does not exist!:

seed_tracks:
  type: ibis.FileDataset
  filepath:  data/01_raw/tracks.csv
  file_format: csv
  connection: 
    backend: duckdb
    database: data/db/spotify.db

bronze_tracks:
  type: ibis.TableDataset
  table_name: bronze_tracks
  connection: 
    backend: duckdb
    database: data/db/spotify.db
  save_args:
    materialized: table
    overwrite: True

silver_tracks:
  type: ibis.TableDataset
  table_name: silver_tracks
  connection: 
    backend: duckdb
    database: data/db/spotify.db
  save_args:
    materialized: table
    overwrite: True

image

The FileDataset entry loads fine in a kedro ipython session:
image

Context

For now I can continue using TableDataset with no impact.

Steps to Reproduce

  1. Create a pipeline w/ csv file
  2. Create catalog entry 1 - read csv file into pipeline using TableDataset
  3. Create catalog entry 2 - read entry 1 in using TableDataset
  4. Run pipeline and see it succeeds
  5. Modify entry 1 to FileDataset, run pipeline and see it fails

Expected Result

FileDataset should be able to read in a file and that catalog entry should be able to be used as input to a TableDataset node

Actual Result

Full Error message:

[11/19/24 00:24:05] INFO     Using 'conf/logging.yml' as logging configuration. You can change this by setting the KEDRO_LOGGING_CONFIG environment variable accordingly.                                                                                          __init__.py:270
[11/19/24 00:24:06] INFO     Kedro project camper                                                                                                                                                                                                                   session.py:329
                    WARNING  /usr/local/lib/python3.10/site-packages/kedro/io/core.py:190: KedroDeprecationWarning: Use 'FileDataset' to load and save files with an Ibis backend; the functionality will be removed from 'TableDataset' in Kedro-Datasets 6.0.0   warnings.py:109
                               dataset = class_obj(**config)                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                                                  
                    INFO     Using synchronous mode for loading and saving data. Use the --async flag for potential performance gains. https://docs.kedro.org/en/stable/nodes_and_pipelines/run_a_pipeline.html#load-and-save-asynchronously               sequential_runner.py:67
                    INFO     Loading data from seed_tracks (FileDataset)...                                                                                                                                                                                    data_catalog.py:389
                    INFO     Running node: bronze_tracks: seed([seed_tracks]) -> [bronze_tracks]                                                                                                                                                                       node.py:367
                    INFO     Saving data to bronze_tracks (TableDataset)...                                                                                                                                                                                    data_catalog.py:431
                    WARNING  No nodes ran. Repeat the previous command to attempt a new run.                                                                                                                                                                         runner.py:216
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/kedro/io/core.py", line 270, in save
    save_func(self, data)
  File "/usr/local/lib/python3.10/site-packages/kedro_datasets/ibis/table_dataset.py", line 186, in save
    writer(self._table_name, data, **self._save_args)
  File "/usr/local/lib/python3.10/site-packages/ibis/backends/duckdb/__init__.py", line 200, in create_table
    cur.execute(insert_stmt).fetchall()
duckdb.duckdb.CatalogException: Catalog Error: Table with name ibis_read_csv_gwv376qnvnb6pouhrmz65n4q74 does not exist!
Did you mean "ibis_duckdb_table_z7yvo6nudfaxhpr6c7pq2dnrze"?
LINE 1: ...eyshaeze4vamv45u24tkkju" SELECT * FROM "ibis_read_csv_gwv376qnvnb6pouhrmz65n4q...
                                                  ^

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/kedro", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.10/site-packages/kedro/framework/cli/cli.py", line 263, in main
    cli_collection()
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/kedro/framework/cli/cli.py", line 163, in main
    super().main(
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/kedro/framework/cli/project.py", line 228, in run
    return session.run(
  File "/usr/local/lib/python3.10/site-packages/kedro/framework/session/session.py", line 404, in run
    run_result = runner.run(
  File "/usr/local/lib/python3.10/site-packages/kedro/runner/runner.py", line 123, in run
    self._run(pipeline, catalog, hook_or_null_manager, session_id)  # type: ignore[arg-type]
  File "/usr/local/lib/python3.10/site-packages/kedro/runner/sequential_runner.py", line 78, in _run
    run_node(node, catalog, hook_manager, self._is_async, session_id)
  File "/usr/local/lib/python3.10/site-packages/kedro/runner/runner.py", line 419, in run_node
    node = _run_node_sequential(node, catalog, hook_manager, session_id)
  File "/usr/local/lib/python3.10/site-packages/kedro/runner/runner.py", line 532, in _run_node_sequential
    catalog.save(name, data)
  File "/usr/local/lib/python3.10/site-packages/kedro/io/data_catalog.py", line 438, in save
    dataset.save(data)
  File "/usr/local/lib/python3.10/site-packages/kedro/io/core.py", line 275, in save
    raise DatasetError(message) from exc
kedro.io.core.DatasetError: Failed while saving data to dataset TableDataset(backend=duckdb, load_args={}, materialized=table, save_args={'overwrite': True}, table_name=bronze_tracks).
Catalog Error: Table with name ibis_read_csv_gwv376qnvnb6pouhrmz65n4q74 does not exist!
Did you mean "ibis_duckdb_table_z7yvo6nudfaxhpr6c7pq2dnrze"?
LINE 1: ...eyshaeze4vamv45u24tkkju" SELECT * FROM "ibis_read_csv_gwv376qnvnb6pouhrmz65n4q...

Your Environment

I'm using kedro 0.19.9, kedro-datasets 5.1.0, & ibis 9.5.0

@merelcht merelcht added the Community Issue/PR opened by the open-source community label Nov 19, 2024
@deepyaman
Copy link
Member

I haven't looked into this at all, but my intuition is that it stems from #842 (comment); because the connections are different, temporary tables are not shared.

I can try to look into creating a shared cache, but no other dataset follows this pattern (they are pretty independent the way they're currently designed); not sure if @merelcht @astrojuanlu @idanov @noklam any of you may have thoughts on this.

@noklam
Copy link
Contributor

noklam commented Nov 19, 2024

Make sense to me. @deepyaman Maybe this can be solved in the similar way like pandas.SQLDataset with shared cls._connections? In that case both file and table dataset has to share a parent class.

@deepyaman
Copy link
Member

In that case both file and table dataset has to share a parent class.

I think all of the datasets we provide inherit directly from AbstractDataset (or AbstractVersionedDataset).

I'm going to implement this as a mixin to not change that. The other benefit of using a mixin is that it should be reusable (e.g. for the Ibis datasets, and pandas SQL datasets), rather than defining a separate piece in the inheritance hierarchy for both.

(The other alternative could be to throw this into the base class, but I don't know if that's necessary.)

@mark-druffel
Copy link
Author

I haven't looked into this at all, but my intuition is that it stems from #842 (comment); because the connections are different, temporary tables are not shared.

I can try to look into creating a shared cache, but no other dataset follows this pattern (they are pretty independent the way they're currently designed); not sure if @merelcht @astrojuanlu @idanov @noklam any of you may have thoughts on this.

This was my initial thought, but I dismissed it because it failed similarly when I materialized the FileDataset as a table. Shouldn't that work if it's just the connection or am I overlooking something?

seed_tracks:
  type: ibis.FileDataset
  filepath:  data/01_raw/tracks.csv
  file_format: csv
  table_name: seed_tracks
  connection: ${globals:spotify}
  save_args:
    materialized: table
    overwrite: True

bronze_tracks:
  type: ibis.TableDataset
  table_name: bronze_tracks
  connection: ${globals:spotify}
  save_args:
    materialized: table
    overwrite: True

silver_tracks:
  type: ibis.TableDataset
  table_name: silver_tracks
  connection: ${globals:spotify}
  save_args:
    materialized: table
    overwrite: True

Error:

[11/19/24 18:00:48] INFO     Using 'conf/logging.yml' as logging configuration. You can change this by setting the KEDRO_LOGGING_CONFIG environment variable accordingly.                                                                                          __init__.py:270
[11/19/24 18:00:49] INFO     Kedro project camper                                                                                                                                                                                                                   session.py:329
                    WARNING  /usr/local/lib/python3.10/site-packages/kedro/io/core.py:190: KedroDeprecationWarning: Use 'FileDataset' to load and save files with an Ibis backend; the functionality will be removed from 'TableDataset' in Kedro-Datasets 6.0.0   warnings.py:109
                               dataset = class_obj(**config)                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                                                  
                    INFO     Using synchronous mode for loading and saving data. Use the --async flag for potential performance gains. https://docs.kedro.org/en/stable/nodes_and_pipelines/run_a_pipeline.html#load-and-save-asynchronously               sequential_runner.py:67
                    INFO     Loading data from seed_tracks (FileDataset)...                                                                                                                                                                                    data_catalog.py:389
                    INFO     Running node: bronze_tracks: seed([seed_tracks]) -> [bronze_tracks]                                                                                                                                                                       node.py:367
                    INFO     Saving data to bronze_tracks (TableDataset)...                                                                                                                                                                                    data_catalog.py:431
                    WARNING  No nodes ran. Repeat the previous command to attempt a new run.                                                                                                                                                                         runner.py:216
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/kedro/io/core.py", line 270, in save
    save_func(self, data)
  File "/usr/local/lib/python3.10/site-packages/kedro_datasets/ibis/table_dataset.py", line 186, in save
    writer(self._table_name, data, **self._save_args)
  File "/usr/local/lib/python3.10/site-packages/ibis/backends/duckdb/__init__.py", line 200, in create_table
    cur.execute(insert_stmt).fetchall()
duckdb.duckdb.CatalogException: Catalog Error: Table with name seed_tracks does not exist!
Did you mean "sqlite_schema"?
LINE 1: ...447tspvhyjmnxqnyxxevb4a" SELECT * FROM "seed_tracks"
                                                  ^

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/kedro", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.10/site-packages/kedro/framework/cli/cli.py", line 263, in main
    cli_collection()
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/kedro/framework/cli/cli.py", line 163, in main
    super().main(
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/kedro/framework/cli/project.py", line 228, in run
    return session.run(
  File "/usr/local/lib/python3.10/site-packages/kedro/framework/session/session.py", line 404, in run
    run_result = runner.run(
  File "/usr/local/lib/python3.10/site-packages/kedro/runner/runner.py", line 123, in run
    self._run(pipeline, catalog, hook_or_null_manager, session_id)  # type: ignore[arg-type]
  File "/usr/local/lib/python3.10/site-packages/kedro/runner/sequential_runner.py", line 78, in _run
    run_node(node, catalog, hook_manager, self._is_async, session_id)
  File "/usr/local/lib/python3.10/site-packages/kedro/runner/runner.py", line 419, in run_node
    node = _run_node_sequential(node, catalog, hook_manager, session_id)
  File "/usr/local/lib/python3.10/site-packages/kedro/runner/runner.py", line 532, in _run_node_sequential
    catalog.save(name, data)
  File "/usr/local/lib/python3.10/site-packages/kedro/io/data_catalog.py", line 438, in save
    dataset.save(data)
  File "/usr/local/lib/python3.10/site-packages/kedro/io/core.py", line 275, in save
    raise DatasetError(message) from exc
kedro.io.core.DatasetError: Failed while saving data to dataset TableDataset(backend=duckdb, load_args={}, materialized=table, save_args={'overwrite': True}, table_name=bronze_tracks).
Catalog Error: Table with name seed_tracks does not exist!
Did you mean "sqlite_schema"?
LINE 1: ...447tspvhyjmnxqnyxxevb4a" SELECT * FROM "seed_tracks"

@deepyaman
Copy link
Member

seed_tracks is still on the ibis.FileDataset-tied connection, so bronze_tracks's ibis.TableDataset-tied connection cannot write it.

This actually feels like an even simpler ask than ibis-project/ibis#8115 (I think it's a bit different, because DuckDB supports a way to load from other databases, and the ask is to expose it there).

@cpcloud @gforsyth do you know if I'm either:

  1. missing an easier way to get around this
  2. missing an issue/context where this has been discussed

I think the answer I recall from some months ago was that the output of a read_* call is meant to be persisted first to a table if want to do such operations. https://kedro.org/blog/building-scalable-data-pipelines-with-kedro-and-ibis also follows the pattern of running a "seed" pipeline to load data into the database first, and then being able to do everything else post-initialization.

@gforsyth
Copy link

I'm missing some context, I think. If the FileDataset has materialized the table into a known file spotify.db, I would expect the following tasks to be able to read that table.
But also, if these nodes are each opening the same file, there will be contention for the lock.

I guess I'm not clear on what the ask is from the Ibis side?

@deepyaman
Copy link
Member

I'm missing some context, I think. If the FileDataset has materialized the table into a known file spotify.db, I would expect the following tasks to be able to read that table. But also, if these nodes are each opening the same file, there will be contention for the lock.

I guess I'm not clear on what the ask is from the Ibis side?

You can think of FileDataset.load() as a wrapper around Backend.read_{file_format}() and FileDataset.save() as a wrapper around Backend.to_{file_format}(). Similarly, TableDataset.load() is a wrapper around Backend.table() and TableDataset.save() is a wrapper around Backend.create_{table_or_view}().

If FileDataset.load() refers to a different Backend object than TableDataset.save(), is there any way for this to work with Ibis? I'm guessing no--you need to make sure the Backend objects are identical. Ibis will not (or does not plan to?) automatically recognize that the expression comes from a different Backend object reference and do some intermediate .to_pyarrow() or something to facilitate accessing it from the second object?

@gforsyth
Copy link

Ibis currently doesn't support things across multiple connections -- that may happen in the future, but there's no work happening on that front at the moment.

Either the Backend objects should be the same, or, if the FileDataset does a CTAS of some sort and TableDataset knows to look for the correct table name, that should work(?). This would require the two separate Backend connections to be sharing a DuckDB session (which you can't do and also write data) or be pointing at the same underlying DuckDB on-disk file

@mark-druffel
Copy link
Author

@deepyaman sorry this might be a stupid idea, but could we just do something similar to the SparkDataset?

It uses a get_spark() function that gets or creates the active session in the load method. Pipelines using SparkDataset use a hook to create the spark session when the pipeline loads.

@deepyaman
Copy link
Member

@deepyaman sorry this might be a stupid idea, but could we just do something similar to the SparkDataset?

It uses a get_spark() function that gets or creates the active session in the load method. Pipelines using SparkDataset use a hook to create the spark session when the pipeline loads.

Unfortunately, not really; getting the active session is a Spark thing.

@deepyaman
Copy link
Member

Ibis currently doesn't support things across multiple connections -- that may happen in the future, but there's no work happening on that front at the moment.

Thanks! That's all I wanted to check.

@deepyaman deepyaman self-assigned this Nov 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Community Issue/PR opened by the open-source community
Projects
None yet
Development

No branches or pull requests

5 participants