kedro-org / kedro-plugins

First-party plugins maintained by the Kedro team.
Apache License 2.0
94 stars 90 forks source link

kedro-datasets: ibis.FileDataset w/ ibis.TableDataset in pipeline #935

Open mark-druffel opened 3 days ago

mark-druffel commented 3 days ago

Description

I'm trying to update a pipeline to use the new ibis.FileDataset. My pipeline reads in csv files, but writes them to duckdb for all data engineering operations. My current catalog is:

seed_tracks:
  type: ibis.TableDataset
  filepath:  data/01_raw/tracks.csv
  file_format: csv
  connection: 
    backend: duckdb
    database: data/db/spotify.db

bronze_tracks:
  type: ibis.TableDataset
  table_name: bronze_tracks
  connection: 
    backend: duckdb
    database: data/db/spotify.db
  save_args:
    materialized: table
    overwrite: True

silver_tracks:
  type: ibis.TableDataset
  table_name: silver_tracks
  connection: 
    backend: duckdb
    database: data/db/spotify.db
  save_args:
    materialized: table
    overwrite: True

image

When I change the first catalog entry to FileDataset it fails with the message Catalog Error: Table with name ibis_read_csv_24taho52bbdw5nhlthjptakvyu does not exist!:

seed_tracks:
  type: ibis.FileDataset
  filepath:  data/01_raw/tracks.csv
  file_format: csv
  connection: 
    backend: duckdb
    database: data/db/spotify.db

bronze_tracks:
  type: ibis.TableDataset
  table_name: bronze_tracks
  connection: 
    backend: duckdb
    database: data/db/spotify.db
  save_args:
    materialized: table
    overwrite: True

silver_tracks:
  type: ibis.TableDataset
  table_name: silver_tracks
  connection: 
    backend: duckdb
    database: data/db/spotify.db
  save_args:
    materialized: table
    overwrite: True

image

The FileDataset entry loads fine in a kedro ipython session: image

Context

For now I can continue using TableDataset with no impact.

Steps to Reproduce

  1. Create a pipeline w/ csv file
  2. Create catalog entry 1 - read csv file into pipeline using TableDataset
  3. Create catalog entry 2 - read entry 1 in using TableDataset
  4. Run pipeline and see it succeeds
  5. Modify entry 1 to FileDataset, run pipeline and see it fails

Expected Result

FileDataset should be able to read in a file and that catalog entry should be able to be used as input to a TableDataset node

Actual Result

Full Error message:

[11/19/24 00:24:05] INFO     Using 'conf/logging.yml' as logging configuration. You can change this by setting the KEDRO_LOGGING_CONFIG environment variable accordingly.                                                                                          __init__.py:270
[11/19/24 00:24:06] INFO     Kedro project camper                                                                                                                                                                                                                   session.py:329
                    WARNING  /usr/local/lib/python3.10/site-packages/kedro/io/core.py:190: KedroDeprecationWarning: Use 'FileDataset' to load and save files with an Ibis backend; the functionality will be removed from 'TableDataset' in Kedro-Datasets 6.0.0   warnings.py:109
                               dataset = class_obj(**config)                                                                                                                                                                                                                      

                    INFO     Using synchronous mode for loading and saving data. Use the --async flag for potential performance gains. https://docs.kedro.org/en/stable/nodes_and_pipelines/run_a_pipeline.html#load-and-save-asynchronously               sequential_runner.py:67
                    INFO     Loading data from seed_tracks (FileDataset)...                                                                                                                                                                                    data_catalog.py:389
                    INFO     Running node: bronze_tracks: seed([seed_tracks]) -> [bronze_tracks]                                                                                                                                                                       node.py:367
                    INFO     Saving data to bronze_tracks (TableDataset)...                                                                                                                                                                                    data_catalog.py:431
                    WARNING  No nodes ran. Repeat the previous command to attempt a new run.                                                                                                                                                                         runner.py:216
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/kedro/io/core.py", line 270, in save
    save_func(self, data)
  File "/usr/local/lib/python3.10/site-packages/kedro_datasets/ibis/table_dataset.py", line 186, in save
    writer(self._table_name, data, **self._save_args)
  File "/usr/local/lib/python3.10/site-packages/ibis/backends/duckdb/__init__.py", line 200, in create_table
    cur.execute(insert_stmt).fetchall()
duckdb.duckdb.CatalogException: Catalog Error: Table with name ibis_read_csv_gwv376qnvnb6pouhrmz65n4q74 does not exist!
Did you mean "ibis_duckdb_table_z7yvo6nudfaxhpr6c7pq2dnrze"?
LINE 1: ...eyshaeze4vamv45u24tkkju" SELECT * FROM "ibis_read_csv_gwv376qnvnb6pouhrmz65n4q...
                                                  ^

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/kedro", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.10/site-packages/kedro/framework/cli/cli.py", line 263, in main
    cli_collection()
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/kedro/framework/cli/cli.py", line 163, in main
    super().main(
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/kedro/framework/cli/project.py", line 228, in run
    return session.run(
  File "/usr/local/lib/python3.10/site-packages/kedro/framework/session/session.py", line 404, in run
    run_result = runner.run(
  File "/usr/local/lib/python3.10/site-packages/kedro/runner/runner.py", line 123, in run
    self._run(pipeline, catalog, hook_or_null_manager, session_id)  # type: ignore[arg-type]
  File "/usr/local/lib/python3.10/site-packages/kedro/runner/sequential_runner.py", line 78, in _run
    run_node(node, catalog, hook_manager, self._is_async, session_id)
  File "/usr/local/lib/python3.10/site-packages/kedro/runner/runner.py", line 419, in run_node
    node = _run_node_sequential(node, catalog, hook_manager, session_id)
  File "/usr/local/lib/python3.10/site-packages/kedro/runner/runner.py", line 532, in _run_node_sequential
    catalog.save(name, data)
  File "/usr/local/lib/python3.10/site-packages/kedro/io/data_catalog.py", line 438, in save
    dataset.save(data)
  File "/usr/local/lib/python3.10/site-packages/kedro/io/core.py", line 275, in save
    raise DatasetError(message) from exc
kedro.io.core.DatasetError: Failed while saving data to dataset TableDataset(backend=duckdb, load_args={}, materialized=table, save_args={'overwrite': True}, table_name=bronze_tracks).
Catalog Error: Table with name ibis_read_csv_gwv376qnvnb6pouhrmz65n4q74 does not exist!
Did you mean "ibis_duckdb_table_z7yvo6nudfaxhpr6c7pq2dnrze"?
LINE 1: ...eyshaeze4vamv45u24tkkju" SELECT * FROM "ibis_read_csv_gwv376qnvnb6pouhrmz65n4q...

Your Environment

I'm using kedro 0.19.9, kedro-datasets 5.1.0, & ibis 9.5.0

deepyaman commented 3 days ago

I haven't looked into this at all, but my intuition is that it stems from https://github.com/kedro-org/kedro-plugins/pull/842#issuecomment-2394011470; because the connections are different, temporary tables are not shared.

I can try to look into creating a shared cache, but no other dataset follows this pattern (they are pretty independent the way they're currently designed); not sure if @merelcht @astrojuanlu @idanov @noklam any of you may have thoughts on this.

noklam commented 3 days ago

Make sense to me. @deepyaman Maybe this can be solved in the similar way like pandas.SQLDataset with shared cls._connections? In that case both file and table dataset has to share a parent class.

deepyaman commented 2 days ago

In that case both file and table dataset has to share a parent class.

I think all of the datasets we provide inherit directly from AbstractDataset (or AbstractVersionedDataset).

I'm going to implement this as a mixin to not change that. The other benefit of using a mixin is that it should be reusable (e.g. for the Ibis datasets, and pandas SQL datasets), rather than defining a separate piece in the inheritance hierarchy for both.

(The other alternative could be to throw this into the base class, but I don't know if that's necessary.)

mark-druffel commented 2 days ago

I haven't looked into this at all, but my intuition is that it stems from #842 (comment); because the connections are different, temporary tables are not shared.

I can try to look into creating a shared cache, but no other dataset follows this pattern (they are pretty independent the way they're currently designed); not sure if @merelcht @astrojuanlu @idanov @noklam any of you may have thoughts on this.

This was my initial thought, but I dismissed it because it failed similarly when I materialized the FileDataset as a table. Shouldn't that work if it's just the connection or am I overlooking something?

seed_tracks:
  type: ibis.FileDataset
  filepath:  data/01_raw/tracks.csv
  file_format: csv
  table_name: seed_tracks
  connection: ${globals:spotify}
  save_args:
    materialized: table
    overwrite: True

bronze_tracks:
  type: ibis.TableDataset
  table_name: bronze_tracks
  connection: ${globals:spotify}
  save_args:
    materialized: table
    overwrite: True

silver_tracks:
  type: ibis.TableDataset
  table_name: silver_tracks
  connection: ${globals:spotify}
  save_args:
    materialized: table
    overwrite: True

Error:

[11/19/24 18:00:48] INFO     Using 'conf/logging.yml' as logging configuration. You can change this by setting the KEDRO_LOGGING_CONFIG environment variable accordingly.                                                                                          __init__.py:270
[11/19/24 18:00:49] INFO     Kedro project camper                                                                                                                                                                                                                   session.py:329
                    WARNING  /usr/local/lib/python3.10/site-packages/kedro/io/core.py:190: KedroDeprecationWarning: Use 'FileDataset' to load and save files with an Ibis backend; the functionality will be removed from 'TableDataset' in Kedro-Datasets 6.0.0   warnings.py:109
                               dataset = class_obj(**config)                                                                                                                                                                                                                      

                    INFO     Using synchronous mode for loading and saving data. Use the --async flag for potential performance gains. https://docs.kedro.org/en/stable/nodes_and_pipelines/run_a_pipeline.html#load-and-save-asynchronously               sequential_runner.py:67
                    INFO     Loading data from seed_tracks (FileDataset)...                                                                                                                                                                                    data_catalog.py:389
                    INFO     Running node: bronze_tracks: seed([seed_tracks]) -> [bronze_tracks]                                                                                                                                                                       node.py:367
                    INFO     Saving data to bronze_tracks (TableDataset)...                                                                                                                                                                                    data_catalog.py:431
                    WARNING  No nodes ran. Repeat the previous command to attempt a new run.                                                                                                                                                                         runner.py:216
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/kedro/io/core.py", line 270, in save
    save_func(self, data)
  File "/usr/local/lib/python3.10/site-packages/kedro_datasets/ibis/table_dataset.py", line 186, in save
    writer(self._table_name, data, **self._save_args)
  File "/usr/local/lib/python3.10/site-packages/ibis/backends/duckdb/__init__.py", line 200, in create_table
    cur.execute(insert_stmt).fetchall()
duckdb.duckdb.CatalogException: Catalog Error: Table with name seed_tracks does not exist!
Did you mean "sqlite_schema"?
LINE 1: ...447tspvhyjmnxqnyxxevb4a" SELECT * FROM "seed_tracks"
                                                  ^

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/kedro", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.10/site-packages/kedro/framework/cli/cli.py", line 263, in main
    cli_collection()
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/kedro/framework/cli/cli.py", line 163, in main
    super().main(
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/kedro/framework/cli/project.py", line 228, in run
    return session.run(
  File "/usr/local/lib/python3.10/site-packages/kedro/framework/session/session.py", line 404, in run
    run_result = runner.run(
  File "/usr/local/lib/python3.10/site-packages/kedro/runner/runner.py", line 123, in run
    self._run(pipeline, catalog, hook_or_null_manager, session_id)  # type: ignore[arg-type]
  File "/usr/local/lib/python3.10/site-packages/kedro/runner/sequential_runner.py", line 78, in _run
    run_node(node, catalog, hook_manager, self._is_async, session_id)
  File "/usr/local/lib/python3.10/site-packages/kedro/runner/runner.py", line 419, in run_node
    node = _run_node_sequential(node, catalog, hook_manager, session_id)
  File "/usr/local/lib/python3.10/site-packages/kedro/runner/runner.py", line 532, in _run_node_sequential
    catalog.save(name, data)
  File "/usr/local/lib/python3.10/site-packages/kedro/io/data_catalog.py", line 438, in save
    dataset.save(data)
  File "/usr/local/lib/python3.10/site-packages/kedro/io/core.py", line 275, in save
    raise DatasetError(message) from exc
kedro.io.core.DatasetError: Failed while saving data to dataset TableDataset(backend=duckdb, load_args={}, materialized=table, save_args={'overwrite': True}, table_name=bronze_tracks).
Catalog Error: Table with name seed_tracks does not exist!
Did you mean "sqlite_schema"?
LINE 1: ...447tspvhyjmnxqnyxxevb4a" SELECT * FROM "seed_tracks"
deepyaman commented 2 days ago

seed_tracks is still on the ibis.FileDataset-tied connection, so bronze_tracks's ibis.TableDataset-tied connection cannot write it.

This actually feels like an even simpler ask than https://github.com/ibis-project/ibis/issues/8115 (I think it's a bit different, because DuckDB supports a way to load from other databases, and the ask is to expose it there).

@cpcloud @gforsyth do you know if I'm either:

  1. missing an easier way to get around this
  2. missing an issue/context where this has been discussed

I think the answer I recall from some months ago was that the output of a read_* call is meant to be persisted first to a table if want to do such operations. https://kedro.org/blog/building-scalable-data-pipelines-with-kedro-and-ibis also follows the pattern of running a "seed" pipeline to load data into the database first, and then being able to do everything else post-initialization.

gforsyth commented 2 days ago

I'm missing some context, I think. If the FileDataset has materialized the table into a known file spotify.db, I would expect the following tasks to be able to read that table. But also, if these nodes are each opening the same file, there will be contention for the lock.

I guess I'm not clear on what the ask is from the Ibis side?

deepyaman commented 2 days ago

I'm missing some context, I think. If the FileDataset has materialized the table into a known file spotify.db, I would expect the following tasks to be able to read that table. But also, if these nodes are each opening the same file, there will be contention for the lock.

I guess I'm not clear on what the ask is from the Ibis side?

You can think of FileDataset.load() as a wrapper around Backend.read_{file_format}() and FileDataset.save() as a wrapper around Backend.to_{file_format}(). Similarly, TableDataset.load() is a wrapper around Backend.table() and TableDataset.save() is a wrapper around Backend.create_{table_or_view}().

If FileDataset.load() refers to a different Backend object than TableDataset.save(), is there any way for this to work with Ibis? I'm guessing no--you need to make sure the Backend objects are identical. Ibis will not (or does not plan to?) automatically recognize that the expression comes from a different Backend object reference and do some intermediate .to_pyarrow() or something to facilitate accessing it from the second object?

gforsyth commented 1 day ago

Ibis currently doesn't support things across multiple connections -- that may happen in the future, but there's no work happening on that front at the moment.

Either the Backend objects should be the same, or, if the FileDataset does a CTAS of some sort and TableDataset knows to look for the correct table name, that should work(?). This would require the two separate Backend connections to be sharing a DuckDB session (which you can't do and also write data) or be pointing at the same underlying DuckDB on-disk file

mark-druffel commented 18 hours ago

@deepyaman sorry this might be a stupid idea, but could we just do something similar to the SparkDataset?

It uses a get_spark() function that gets or creates the active session in the load method. Pipelines using SparkDataset use a hook to create the spark session when the pipeline loads.

deepyaman commented 18 hours ago

@deepyaman sorry this might be a stupid idea, but could we just do something similar to the SparkDataset?

It uses a get_spark() function that gets or creates the active session in the load method. Pipelines using SparkDataset use a hook to create the spark session when the pipeline loads.

Unfortunately, not really; getting the active session is a Spark thing.

deepyaman commented 18 hours ago

Ibis currently doesn't support things across multiple connections -- that may happen in the future, but there's no work happening on that front at the moment.

Thanks! That's all I wanted to check.