Closed deepyaman closed 4 years ago
Hi @deepyaman Thank you for sharing your hooks! Regarding the contribution, Kedro only provides the hooks spec and doesn't have any concrete hooks implementation. Hooks implementations are something users would implement in their projects, and we would encourage people to publish their own hooks outside of Kedro hooks.
We are more than happy to list your hook plugins in our docs (see the examples for kedro-wing, and steel-toe)
Hi @deepyaman , have you tried CachedDataSet?
CachedDataSet is a dataset wrapper which caches in memory the data saved, so that the user avoids io operations with slow storage media.
https://kedro.readthedocs.io/en/stable/kedro.io.CachedDataSet.html
Hi @deepyaman , have you tried CachedDataSet?
CachedDataSet is a dataset wrapper which caches in memory the data saved, so that the user avoids io operations with slow storage media.
https://kedro.readthedocs.io/en/stable/kedro.io.CachedDataSet.html
Hi @Minyus!
I think CachedDataSet
is loosely comparable:
CachedDataSet
saves time on read, whereas TeePlugin
(my implementation) saves on read and write.CachedDataSet
offers finer control as to what datasets to cache, but is therefore also clunky to apply to all intermediate datasets (you could write a hook to modify the catalog, but then you're headed in the direction of TeePlugin
.CachedDataSet
cache is managed/released by the user, whereas TeePlugin
converts the main line to MemoryDataSet
s and thereby gets managed by the framework.SharedMemoryDataSet
for ParallelRunner
.I'd say CachedDataSet
is a sniper rifle (you find your slow target and deal with it by adding caching) whereas TeePlugin
is a shotgun (you take care of I/O slowdowns out of the box in one shot, but you have to be careful you don't hit something you didn't mean to), but I also don't know enough about guns to be making these analogies.
related to #346
Also:
CachedDataSet cache is managed/released by the use
If I remember correctly I was letting the runner
manage releasing the cache
@deepyaman
Interesting. If you want to skip writing in addition to reading, don't you want to skip running the nodes that have already written the files and automatically resume execution from any stage of the pipeline where all inputs were persisted?
If so, you might be interested in run_only_missing
option discussed in:
Also:
CachedDataSet cache is managed/released by the use
If I remember correctly I was letting the
runner
manage releasing the cache
You're right, sorry. I didn't read the code properly.
Edit: @tsanikgr @Minyus @921kiyo I feel like you all have more familiarity with CachedDataSet
than I do, please correct me wherever I'm wrong. I feel like this functionality is useful, but I don't want to create a third-party plugin if there's a simple way to achieve the same behavior using existing functionality that I'm just not aware of. :)
If you want to skip writing in addition to reading, don't you want to skip running the nodes that have already written the files and automatically resume execution from any stage of the pipeline where all inputs were persisted?
I want to write, but I don't want to be blocked on write when I already have the data in memory. More specifically, I want to write so that I have my versioned dataset associated with this run. I don't care when it gets written out, since I have an in-memory copy that I can pass along.
(This does assume writing doesn't hit a snag, as it's possible your pipeline will run ahead based on in-memory data while your write fails for whatever reason, so future datasets also don't get written out unless some handling is added. I'm honestly not too worried about this myself.)
@deepyaman
I see. If I understand correctly, you might want to try combination of CachedDataSet
and asynchronous saving (new feature of Kedro 0.16.0) explained at:
If you want to apply CachedDataSet
to all the datasets rather than one by one, you can try @tsanikgr 's PR at https://github.com/quantumblacklabs/kedro/pull/346
I see. If I understand correctly, you might want to try combination of
CachedDataSet
and asynchronous saving (new feature of Kedro 0.16.0) explained at:
Yes, to some extent. My implementation (https://github.com/deepyaman/hookshot/blob/develop/src/hookshot/hooks.py) is based on the code that handles the async functionality, but extended across the pipeline rather than on a per-node basis (hence the "unrolled" ThreadPoolExecutor
instead of a nice little context manager).
I think the feeling I'm getting is that there are existing methods that are in the direction of what I want, but my feeling is that they don't push it far enough. I will try to find some time to benchmark these different approaches under parametrizable conditions (READ_LATENCY
, WRITE_LATENCY
, READ_TIME
, WRITE_TIME
, etc.), in addition to creating this as a plugin.
Great, I look forward to your benchmark results.
Sorry for the delay! I've put together something in my spare time, not feature complete but figured I'd share.
Let's assume a slow filesystem with a load and save delay of 10 seconds for intermediate datasets. I haven't added delays in nodes (to simulate nontrivial data processing) yet; an example of where this makes a better case for TeePlugin
is that the last node would be executing while we wait 10 seconds at the end of the run for everything to save).
Here are executions under each strategy:
Strategy | Total time | Log |
---|---|---|
Baseline (i.e. no caching/plugins) | 2 minutes | Log |
TeePlugin |
10 seconds (saving all outputs) | Log |
CachePlugin (i.e. CachedDataSet ) with is_async=True |
30 seconds (saving split_data , train_model , and predict node outputs) |
Log |
(Note that times include the initial minute delays before the pipeline begins, because of the way I added delays somehow triggering on initialization.)
The code to run these examples are in https://github.com/deepyaman/hookshot/. You can also change the load/save delays in conf/base/catalog.yml
to simulate different latencies. Next steps:
CachePlugin
as an alternative way to implement what you proposed)--hooks
and --async
CLI options?(hookshot) BOS-178551-C02X31K9JHD4:hookshot deepyaman$ kedro run
2020-07-14 13:40:10,444 - root - INFO - ** Kedro project hookshot
/anaconda3/envs/hookshot/lib/python3.8/site-packages/fsspec/implementations/local.py:29: FutureWarning: The default value of auto_mkdir=True has been deprecated and will be changed to auto_mkdir=False by default in a future release.
warnings.warn(
2020-07-14 13:40:11,067 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x1167b39d0
2020-07-14 13:40:21,071 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x1167b39d0
2020-07-14 13:40:21,072 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x11816c310
2020-07-14 13:40:31,076 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x11816c310
2020-07-14 13:40:31,077 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x11816c8b0
2020-07-14 13:40:41,080 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x11816c8b0
2020-07-14 13:40:41,080 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x11816ca00
2020-07-14 13:40:51,082 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x11816ca00
2020-07-14 13:40:51,083 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x11816cb50
2020-07-14 13:41:01,083 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x11816cb50
2020-07-14 13:41:01,084 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x11816cca0
2020-07-14 13:41:11,086 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x11816cca0
2020-07-14 13:41:11,096 - kedro.io.data_catalog - INFO - Loading data from `example_iris_data` (CSVDataSet)...
2020-07-14 13:41:11,096 - kedro.io.core - DEBUG - Loading CSVDataSet(filepath=/Users/deepyaman/hookshot/data/01_raw/iris.csv, protocol=file, save_args={'index': False})
2020-07-14 13:41:11,113 - kedro.io.data_catalog - INFO - Loading data from `params:example_test_data_ratio` (MemoryDataSet)...
2020-07-14 13:41:11,113 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<float>)
2020-07-14 13:41:11,114 - kedro.pipeline.node - INFO - Running node: split_data([example_iris_data,params:example_test_data_ratio]) -> [example_test_x,example_test_y,example_train_x,example_train_y]
2020-07-14 13:41:11,143 - kedro.io.data_catalog - INFO - Saving data to `example_train_x` (SlowDataSet)...
2020-07-14 13:41:11,144 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x1167b39d0
2020-07-14 13:41:11,144 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x1167b39d0
2020-07-14 13:41:21,144 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<DataFrame>) at 0x1167b39d0
2020-07-14 13:41:21,144 - kedro.io.data_catalog - INFO - Saving data to `example_train_y` (SlowDataSet)...
2020-07-14 13:41:21,145 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816c310
2020-07-14 13:41:21,145 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816c310
2020-07-14 13:41:31,153 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<DataFrame>) at 0x11816c310
2020-07-14 13:41:31,153 - kedro.io.data_catalog - INFO - Saving data to `example_test_x` (SlowDataSet)...
2020-07-14 13:41:31,153 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816c8b0
2020-07-14 13:41:31,153 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816c8b0
2020-07-14 13:41:41,162 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<DataFrame>) at 0x11816c8b0
2020-07-14 13:41:41,162 - kedro.io.data_catalog - INFO - Saving data to `example_test_y` (SlowDataSet)...
2020-07-14 13:41:41,162 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816ca00
2020-07-14 13:41:41,162 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816ca00
2020-07-14 13:41:51,168 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<DataFrame>) at 0x11816ca00
2020-07-14 13:41:51,169 - kedro.runner.sequential_runner - INFO - Completed 1 out of 4 tasks
2020-07-14 13:41:51,169 - kedro.io.data_catalog - INFO - Loading data from `example_train_x` (SlowDataSet)...
2020-07-14 13:41:51,170 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<DataFrame>) at 0x1167b39d0
2020-07-14 13:41:51,170 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<DataFrame>) at 0x1167b39d0
2020-07-14 13:42:01,176 - kedro.io.core - DEBUG - Loaded SlowDataSet(data=<DataFrame>) at 0x1167b39d0
2020-07-14 13:42:01,177 - kedro.io.data_catalog - INFO - Loading data from `example_train_y` (SlowDataSet)...
2020-07-14 13:42:01,177 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<DataFrame>) at 0x11816c310
2020-07-14 13:42:01,177 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<DataFrame>) at 0x11816c310
2020-07-14 13:42:11,181 - kedro.io.core - DEBUG - Loaded SlowDataSet(data=<DataFrame>) at 0x11816c310
2020-07-14 13:42:11,181 - kedro.io.data_catalog - INFO - Loading data from `parameters` (MemoryDataSet)...
2020-07-14 13:42:11,181 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<dict>)
2020-07-14 13:42:11,181 - kedro.pipeline.node - INFO - Running node: train_model([example_train_x,example_train_y,parameters]) -> [example_model]
2020-07-14 13:42:11,516 - kedro.io.data_catalog - INFO - Saving data to `example_model` (SlowDataSet)...
2020-07-14 13:42:11,516 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816cb50
2020-07-14 13:42:11,517 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816cb50
2020-07-14 13:42:21,518 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<ndarray>) at 0x11816cb50
2020-07-14 13:42:21,518 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<DataFrame>) at 0x1167b39d0
2020-07-14 13:42:21,518 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<DataFrame>) at 0x11816c310
2020-07-14 13:42:21,519 - kedro.runner.sequential_runner - INFO - Completed 2 out of 4 tasks
2020-07-14 13:42:21,519 - kedro.io.data_catalog - INFO - Loading data from `example_model` (SlowDataSet)...
2020-07-14 13:42:21,519 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<ndarray>) at 0x11816cb50
2020-07-14 13:42:21,519 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<ndarray>) at 0x11816cb50
2020-07-14 13:42:31,521 - kedro.io.core - DEBUG - Loaded SlowDataSet(data=<ndarray>) at 0x11816cb50
2020-07-14 13:42:31,521 - kedro.io.data_catalog - INFO - Loading data from `example_test_x` (SlowDataSet)...
2020-07-14 13:42:31,521 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<DataFrame>) at 0x11816c8b0
2020-07-14 13:42:31,521 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<DataFrame>) at 0x11816c8b0
2020-07-14 13:42:41,525 - kedro.io.core - DEBUG - Loaded SlowDataSet(data=<DataFrame>) at 0x11816c8b0
2020-07-14 13:42:41,525 - kedro.pipeline.node - INFO - Running node: predict([example_model,example_test_x]) -> [example_predictions]
2020-07-14 13:42:41,527 - kedro.io.data_catalog - INFO - Saving data to `example_predictions` (SlowDataSet)...
2020-07-14 13:42:41,527 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816cca0
2020-07-14 13:42:41,527 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816cca0
2020-07-14 13:42:51,532 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<ndarray>) at 0x11816cca0
2020-07-14 13:42:51,532 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<ndarray>) at 0x11816cb50
2020-07-14 13:42:51,532 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<DataFrame>) at 0x11816c8b0
2020-07-14 13:42:51,532 - kedro.runner.sequential_runner - INFO - Completed 3 out of 4 tasks
2020-07-14 13:42:51,532 - kedro.io.data_catalog - INFO - Loading data from `example_predictions` (SlowDataSet)...
2020-07-14 13:42:51,533 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<ndarray>) at 0x11816cca0
2020-07-14 13:42:51,533 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<ndarray>) at 0x11816cca0
2020-07-14 13:43:01,537 - kedro.io.core - DEBUG - Loaded SlowDataSet(data=<ndarray>) at 0x11816cca0
2020-07-14 13:43:01,537 - kedro.io.data_catalog - INFO - Loading data from `example_test_y` (SlowDataSet)...
2020-07-14 13:43:01,537 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<DataFrame>) at 0x11816ca00
2020-07-14 13:43:01,538 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<DataFrame>) at 0x11816ca00
2020-07-14 13:43:11,539 - kedro.io.core - DEBUG - Loaded SlowDataSet(data=<DataFrame>) at 0x11816ca00
2020-07-14 13:43:11,539 - kedro.pipeline.node - INFO - Running node: report_accuracy([example_predictions,example_test_y]) -> None
2020-07-14 13:43:11,540 - hookshot.pipelines.data_science.nodes - INFO - Model accuracy on test set: 100.00%
2020-07-14 13:43:11,540 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<ndarray>) at 0x11816cca0
2020-07-14 13:43:11,540 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<DataFrame>) at 0x11816ca00
2020-07-14 13:43:11,540 - kedro.runner.sequential_runner - INFO - Completed 4 out of 4 tasks
2020-07-14 13:43:11,540 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.
TeePlugin
(hookshot) BOS-178551-C02X31K9JHD4:hookshot deepyaman$ kedro run --hooks src.hookshot.hooks.TeePlugin
2020-07-14 13:49:30,667 - root - INFO - ** Kedro project hookshot
/anaconda3/envs/hookshot/lib/python3.8/site-packages/fsspec/implementations/local.py:29: FutureWarning: The default value of auto_mkdir=True has been deprecated and will be changed to auto_mkdir=False by default in a future release.
warnings.warn(
2020-07-14 13:49:30,980 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x10d6bc1f0
2020-07-14 13:49:40,985 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x10d6bc1f0
2020-07-14 13:49:40,985 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x11b96a880
2020-07-14 13:49:50,989 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x11b96a880
2020-07-14 13:49:50,989 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x11b96ae20
2020-07-14 13:50:00,994 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x11b96ae20
2020-07-14 13:50:00,994 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x11b96af70
2020-07-14 13:50:10,996 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x11b96af70
2020-07-14 13:50:10,997 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x11b979100
2020-07-14 13:50:21,001 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x11b979100
2020-07-14 13:50:21,002 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x11b979250
2020-07-14 13:50:31,006 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x11b979250
2020-07-14 13:50:31,010 - kedro.io.data_catalog - WARNING - Replacing DataSet 'example_test_x'
2020-07-14 13:50:31,010 - kedro.io.data_catalog - WARNING - Replacing DataSet 'params:example_num_train_iter'
2020-07-14 13:50:31,010 - kedro.io.data_catalog - WARNING - Replacing DataSet 'example_train_y'
2020-07-14 13:50:31,010 - kedro.io.data_catalog - WARNING - Replacing DataSet 'example_predictions'
2020-07-14 13:50:31,010 - kedro.io.data_catalog - WARNING - Replacing DataSet 'example_test_y'
2020-07-14 13:50:31,010 - kedro.io.data_catalog - WARNING - Replacing DataSet 'example_model'
2020-07-14 13:50:31,010 - kedro.io.data_catalog - WARNING - Replacing DataSet 'example_train_x'
2020-07-14 13:50:31,010 - kedro.io.data_catalog - WARNING - Replacing DataSet 'params:example_learning_rate'
2020-07-14 13:50:31,011 - kedro.io.data_catalog - INFO - Loading data from `example_iris_data` (CSVDataSet)...
2020-07-14 13:50:31,011 - kedro.io.core - DEBUG - Loading CSVDataSet(filepath=/Users/deepyaman/hookshot/data/01_raw/iris.csv, protocol=file, save_args={'index': False})
2020-07-14 13:50:31,030 - kedro.io.data_catalog - INFO - Loading data from `params:example_test_data_ratio` (MemoryDataSet)...
2020-07-14 13:50:31,031 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<float>)
2020-07-14 13:50:31,031 - kedro.pipeline.node - INFO - Running node: split_data([example_iris_data,params:example_test_data_ratio]) -> [example_test_x,example_test_y,example_train_x,example_train_y]
2020-07-14 13:50:31,044 - kedro.io.data_catalog - INFO - Saving data to `example_train_x` (SlowDataSet)...
2020-07-14 13:50:31,044 - kedro.io.data_catalog - INFO - Saving data to `example_train_y` (SlowDataSet)...
2020-07-14 13:50:31,045 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x10d6bc1f0
2020-07-14 13:50:31,045 - kedro.io.data_catalog - INFO - Saving data to `example_test_x` (SlowDataSet)...
2020-07-14 13:50:31,045 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x10d6bc1f0
2020-07-14 13:50:31,045 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11b96a880
2020-07-14 13:50:31,045 - kedro.io.data_catalog - INFO - Saving data to `example_test_y` (SlowDataSet)...
2020-07-14 13:50:31,045 - kedro.io.data_catalog - INFO - Saving data to `example_train_x` (MemoryDataSet)...
2020-07-14 13:50:31,045 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11b96ae20
2020-07-14 13:50:31,045 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11b96a880
2020-07-14 13:50:31,045 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11b96af70
2020-07-14 13:50:31,045 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11b96ae20
2020-07-14 13:50:31,046 - kedro.io.core - DEBUG - Saving MemoryDataSet()
2020-07-14 13:50:31,072 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11b96af70
2020-07-14 13:50:31,072 - kedro.io.data_catalog - INFO - Saving data to `example_train_y` (MemoryDataSet)...
2020-07-14 13:50:31,072 - kedro.io.core - DEBUG - Saving MemoryDataSet()
2020-07-14 13:50:31,073 - kedro.io.data_catalog - INFO - Saving data to `example_test_x` (MemoryDataSet)...
2020-07-14 13:50:31,073 - kedro.io.core - DEBUG - Saving MemoryDataSet()
2020-07-14 13:50:31,073 - kedro.io.data_catalog - INFO - Saving data to `example_test_y` (MemoryDataSet)...
2020-07-14 13:50:31,073 - kedro.io.core - DEBUG - Saving MemoryDataSet()
2020-07-14 13:50:31,073 - kedro.runner.sequential_runner - INFO - Completed 1 out of 4 tasks
2020-07-14 13:50:31,073 - kedro.io.data_catalog - INFO - Loading data from `example_train_x` (MemoryDataSet)...
2020-07-14 13:50:31,073 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<DataFrame>)
2020-07-14 13:50:31,073 - kedro.io.data_catalog - INFO - Loading data from `example_train_y` (MemoryDataSet)...
2020-07-14 13:50:31,074 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<DataFrame>)
2020-07-14 13:50:31,074 - kedro.io.data_catalog - INFO - Loading data from `parameters` (MemoryDataSet)...
2020-07-14 13:50:31,074 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<dict>)
2020-07-14 13:50:31,074 - kedro.pipeline.node - INFO - Running node: train_model([example_train_x,example_train_y,parameters]) -> [example_model]
2020-07-14 13:50:31,382 - kedro.io.data_catalog - INFO - Saving data to `example_model` (SlowDataSet)...
2020-07-14 13:50:31,382 - kedro.io.data_catalog - INFO - Saving data to `example_model` (MemoryDataSet)...
2020-07-14 13:50:31,382 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11b979100
2020-07-14 13:50:31,382 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11b979100
2020-07-14 13:50:31,382 - kedro.io.core - DEBUG - Saving MemoryDataSet()
2020-07-14 13:50:31,383 - kedro.io.core - DEBUG - Releasing MemoryDataSet(data=<DataFrame>)
2020-07-14 13:50:31,383 - kedro.io.core - DEBUG - Releasing MemoryDataSet(data=<DataFrame>)
2020-07-14 13:50:31,383 - kedro.runner.sequential_runner - INFO - Completed 2 out of 4 tasks
2020-07-14 13:50:31,383 - kedro.io.data_catalog - INFO - Loading data from `example_model` (MemoryDataSet)...
2020-07-14 13:50:31,384 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<ndarray>)
2020-07-14 13:50:31,384 - kedro.io.data_catalog - INFO - Loading data from `example_test_x` (MemoryDataSet)...
2020-07-14 13:50:31,384 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<DataFrame>)
2020-07-14 13:50:31,384 - kedro.pipeline.node - INFO - Running node: predict([example_model,example_test_x]) -> [example_predictions]
2020-07-14 13:50:31,386 - kedro.io.data_catalog - INFO - Saving data to `example_predictions` (SlowDataSet)...
2020-07-14 13:50:31,386 - kedro.io.data_catalog - INFO - Saving data to `example_predictions` (MemoryDataSet)...
2020-07-14 13:50:31,387 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11b979250
2020-07-14 13:50:31,387 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11b979250
2020-07-14 13:50:31,387 - kedro.io.core - DEBUG - Saving MemoryDataSet()
2020-07-14 13:50:31,387 - kedro.io.core - DEBUG - Releasing MemoryDataSet(data=<ndarray>)
2020-07-14 13:50:31,388 - kedro.io.core - DEBUG - Releasing MemoryDataSet(data=<DataFrame>)
2020-07-14 13:50:31,388 - kedro.runner.sequential_runner - INFO - Completed 3 out of 4 tasks
2020-07-14 13:50:31,388 - kedro.io.data_catalog - INFO - Loading data from `example_predictions` (MemoryDataSet)...
2020-07-14 13:50:31,388 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<ndarray>)
2020-07-14 13:50:31,388 - kedro.io.data_catalog - INFO - Loading data from `example_test_y` (MemoryDataSet)...
2020-07-14 13:50:31,388 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<DataFrame>)
2020-07-14 13:50:31,389 - kedro.pipeline.node - INFO - Running node: report_accuracy([example_predictions,example_test_y]) -> None
2020-07-14 13:50:31,389 - hookshot.pipelines.data_science.nodes - INFO - Model accuracy on test set: 93.33%
2020-07-14 13:50:31,390 - kedro.io.core - DEBUG - Releasing MemoryDataSet(data=<ndarray>)
2020-07-14 13:50:31,390 - kedro.io.core - DEBUG - Releasing MemoryDataSet(data=<DataFrame>)
2020-07-14 13:50:31,390 - kedro.runner.sequential_runner - INFO - Completed 4 out of 4 tasks
2020-07-14 13:50:31,390 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.
2020-07-14 13:50:41,050 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<DataFrame>) at 0x10d6bc1f0
2020-07-14 13:50:41,077 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<DataFrame>) at 0x11b96ae20
2020-07-14 13:50:41,077 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<DataFrame>) at 0x11b96a880
2020-07-14 13:50:41,077 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<DataFrame>) at 0x11b96af70
2020-07-14 13:50:41,384 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<ndarray>) at 0x11b979100
2020-07-14 13:50:41,390 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<ndarray>) at 0x11b979250
CachePlugin
(i.e. CachedDataSet
) with is_async=True
(hookshot) BOS-178551-C02X31K9JHD4:hookshot deepyaman$ kedro run --async --hooks src.hookshot.hooks.CachePlugin
2020-07-14 13:56:39,636 - root - INFO - ** Kedro project hookshot
/anaconda3/envs/hookshot/lib/python3.8/site-packages/fsspec/implementations/local.py:29: FutureWarning: The default value of auto_mkdir=True has been deprecated and will be changed to auto_mkdir=False by default in a future release.
warnings.warn(
2020-07-14 13:56:39,782 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x107cbf130
2020-07-14 13:56:49,783 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x107cbf130
2020-07-14 13:56:49,785 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x115f6d850
2020-07-14 13:56:59,786 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x115f6d850
2020-07-14 13:56:59,786 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x115f6ddf0
2020-07-14 13:57:09,790 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x115f6ddf0
2020-07-14 13:57:09,791 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x115f6df40
2020-07-14 13:57:19,794 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x115f6df40
2020-07-14 13:57:19,795 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x115f7c0d0
2020-07-14 13:57:29,799 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x115f7c0d0
2020-07-14 13:57:29,799 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x115f7c220
2020-07-14 13:57:39,801 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x115f7c220
2020-07-14 13:57:39,805 - kedro.io.data_catalog - WARNING - Replacing DataSet 'example_model'
2020-07-14 13:57:39,805 - kedro.io.data_catalog - WARNING - Replacing DataSet 'example_test_x'
2020-07-14 13:57:39,805 - kedro.io.data_catalog - WARNING - Replacing DataSet 'example_test_y'
2020-07-14 13:57:39,805 - kedro.io.data_catalog - WARNING - Replacing DataSet 'params:example_learning_rate'
2020-07-14 13:57:39,805 - kedro.io.data_catalog - WARNING - Replacing DataSet 'example_train_x'
2020-07-14 13:57:39,805 - kedro.io.data_catalog - WARNING - Replacing DataSet 'params:example_num_train_iter'
2020-07-14 13:57:39,805 - kedro.io.data_catalog - WARNING - Replacing DataSet 'example_train_y'
2020-07-14 13:57:39,806 - kedro.io.data_catalog - WARNING - Replacing DataSet 'example_predictions'
2020-07-14 13:57:39,806 - kedro.runner.sequential_runner - INFO - Asynchronous mode is enabled for loading and saving data
2020-07-14 13:57:39,806 - kedro.io.data_catalog - INFO - Loading data from `example_iris_data` (CSVDataSet)...
2020-07-14 13:57:39,806 - kedro.io.data_catalog - INFO - Loading data from `params:example_test_data_ratio` (MemoryDataSet)...
2020-07-14 13:57:39,807 - kedro.io.core - DEBUG - Loading CSVDataSet(filepath=/Users/deepyaman/hookshot/data/01_raw/iris.csv, protocol=file, save_args={'index': False})
2020-07-14 13:57:39,807 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<float>)
2020-07-14 13:57:39,814 - kedro.pipeline.node - INFO - Running node: split_data([example_iris_data,params:example_test_data_ratio]) -> [example_test_x,example_test_y,example_train_x,example_train_y]
2020-07-14 13:57:39,826 - kedro.io.data_catalog - INFO - Saving data to `example_train_x` (CachedDataSet)...
2020-07-14 13:57:39,826 - kedro.io.data_catalog - INFO - Saving data to `example_train_y` (CachedDataSet)...
2020-07-14 13:57:39,826 - kedro.io.data_catalog - INFO - Saving data to `example_test_x` (CachedDataSet)...
2020-07-14 13:57:39,827 - kedro.io.data_catalog - INFO - Saving data to `example_test_y` (CachedDataSet)...
2020-07-14 13:57:39,827 - kedro.io.core - DEBUG - Saving CachedDataSet(cache={}, dataset={'data': <object>})
2020-07-14 13:57:39,827 - kedro.io.core - DEBUG - Saving CachedDataSet(cache={}, dataset={'data': <object>})
2020-07-14 13:57:39,827 - kedro.io.core - DEBUG - Saving CachedDataSet(cache={}, dataset={'data': <object>})
2020-07-14 13:57:39,827 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x115f6ddf0
2020-07-14 13:57:39,827 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x115f6d850
2020-07-14 13:57:39,827 - kedro.io.core - DEBUG - Saving CachedDataSet(cache={}, dataset={'data': <object>})
2020-07-14 13:57:39,827 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x107cbf130
2020-07-14 13:57:39,827 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x115f6ddf0
2020-07-14 13:57:39,827 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x115f6d850
2020-07-14 13:57:39,828 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x115f6df40
2020-07-14 13:57:39,828 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x107cbf130
2020-07-14 13:57:39,828 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x115f6df40
2020-07-14 13:57:49,831 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<DataFrame>) at 0x115f6d850
2020-07-14 13:57:49,832 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<DataFrame>) at 0x107cbf130
2020-07-14 13:57:49,832 - kedro.io.core - DEBUG - Saving MemoryDataSet()
2020-07-14 13:57:49,832 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<DataFrame>) at 0x115f6ddf0
2020-07-14 13:57:49,832 - kedro.io.core - DEBUG - Saving MemoryDataSet()
2020-07-14 13:57:49,832 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<DataFrame>) at 0x115f6df40
2020-07-14 13:57:49,832 - kedro.io.core - DEBUG - Saving MemoryDataSet()
2020-07-14 13:57:49,833 - kedro.io.core - DEBUG - Saving MemoryDataSet()
2020-07-14 13:57:49,833 - kedro.runner.sequential_runner - INFO - Completed 1 out of 4 tasks
2020-07-14 13:57:49,834 - kedro.io.data_catalog - INFO - Loading data from `example_train_x` (CachedDataSet)...
2020-07-14 13:57:49,834 - kedro.io.data_catalog - INFO - Loading data from `example_train_y` (CachedDataSet)...
2020-07-14 13:57:49,834 - kedro.io.core - DEBUG - Loading CachedDataSet(cache={'data': <DataFrame>}, dataset={'data': <DataFrame>})
2020-07-14 13:57:49,834 - kedro.io.core - DEBUG - Checking whether target of MemoryDataSet(data=<DataFrame>) exists
2020-07-14 13:57:49,834 - kedro.io.data_catalog - INFO - Loading data from `parameters` (MemoryDataSet)...
2020-07-14 13:57:49,834 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<DataFrame>)
2020-07-14 13:57:49,834 - kedro.io.core - DEBUG - Loading CachedDataSet(cache={'data': <DataFrame>}, dataset={'data': <DataFrame>})
2020-07-14 13:57:49,835 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<dict>)
2020-07-14 13:57:49,835 - kedro.io.core - DEBUG - Checking whether target of MemoryDataSet(data=<DataFrame>) exists
2020-07-14 13:57:49,835 - kedro.io.core - DEBUG - Checking whether target of MemoryDataSet(data=<DataFrame>) exists
2020-07-14 13:57:49,835 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<DataFrame>)
2020-07-14 13:57:49,835 - kedro.io.core - DEBUG - Checking whether target of MemoryDataSet(data=<DataFrame>) exists
2020-07-14 13:57:49,835 - kedro.pipeline.node - INFO - Running node: train_model([example_train_x,example_train_y,parameters]) -> [example_model]
2020-07-14 13:57:50,142 - kedro.io.data_catalog - INFO - Saving data to `example_model` (CachedDataSet)...
2020-07-14 13:57:50,142 - kedro.io.core - DEBUG - Saving CachedDataSet(cache={}, dataset={'data': <object>})
2020-07-14 13:57:50,143 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x115f7c0d0
2020-07-14 13:57:50,143 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x115f7c0d0
2020-07-14 13:58:00,145 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<ndarray>) at 0x115f7c0d0
2020-07-14 13:58:00,145 - kedro.io.core - DEBUG - Saving MemoryDataSet()
2020-07-14 13:58:00,145 - kedro.io.core - DEBUG - Releasing CachedDataSet(cache={'data': <DataFrame>}, dataset={'data': <DataFrame>})
2020-07-14 13:58:00,146 - kedro.io.core - DEBUG - Releasing MemoryDataSet(data=<DataFrame>)
2020-07-14 13:58:00,146 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<DataFrame>) at 0x107cbf130
2020-07-14 13:58:00,146 - kedro.io.core - DEBUG - Releasing CachedDataSet(cache={'data': <DataFrame>}, dataset={'data': <DataFrame>})
2020-07-14 13:58:00,146 - kedro.io.core - DEBUG - Releasing MemoryDataSet(data=<DataFrame>)
2020-07-14 13:58:00,146 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<DataFrame>) at 0x115f6d850
2020-07-14 13:58:00,146 - kedro.runner.sequential_runner - INFO - Completed 2 out of 4 tasks
2020-07-14 13:58:00,147 - kedro.io.data_catalog - INFO - Loading data from `example_model` (CachedDataSet)...
2020-07-14 13:58:00,147 - kedro.io.core - DEBUG - Loading CachedDataSet(cache={'data': <ndarray>}, dataset={'data': <ndarray>})
2020-07-14 13:58:00,147 - kedro.io.data_catalog - INFO - Loading data from `example_test_x` (CachedDataSet)...
2020-07-14 13:58:00,147 - kedro.io.core - DEBUG - Checking whether target of MemoryDataSet(data=<ndarray>) exists
2020-07-14 13:58:00,148 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<ndarray>)
2020-07-14 13:58:00,148 - kedro.io.core - DEBUG - Checking whether target of MemoryDataSet(data=<ndarray>) exists
2020-07-14 13:58:00,148 - kedro.io.core - DEBUG - Loading CachedDataSet(cache={'data': <DataFrame>}, dataset={'data': <DataFrame>})
2020-07-14 13:58:00,148 - kedro.io.core - DEBUG - Checking whether target of MemoryDataSet(data=<DataFrame>) exists
2020-07-14 13:58:00,148 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<DataFrame>)
2020-07-14 13:58:00,148 - kedro.io.core - DEBUG - Checking whether target of MemoryDataSet(data=<DataFrame>) exists
2020-07-14 13:58:00,149 - kedro.pipeline.node - INFO - Running node: predict([example_model,example_test_x]) -> [example_predictions]
2020-07-14 13:58:00,151 - kedro.io.data_catalog - INFO - Saving data to `example_predictions` (CachedDataSet)...
2020-07-14 13:58:00,151 - kedro.io.core - DEBUG - Saving CachedDataSet(cache={}, dataset={'data': <object>})
2020-07-14 13:58:00,151 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x115f7c220
2020-07-14 13:58:00,151 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x115f7c220
2020-07-14 13:58:10,153 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<ndarray>) at 0x115f7c220
2020-07-14 13:58:10,153 - kedro.io.core - DEBUG - Saving MemoryDataSet()
2020-07-14 13:58:10,153 - kedro.io.core - DEBUG - Releasing CachedDataSet(cache={'data': <ndarray>}, dataset={'data': <ndarray>})
2020-07-14 13:58:10,153 - kedro.io.core - DEBUG - Releasing MemoryDataSet(data=<ndarray>)
2020-07-14 13:58:10,153 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<ndarray>) at 0x115f7c0d0
2020-07-14 13:58:10,154 - kedro.io.core - DEBUG - Releasing CachedDataSet(cache={'data': <DataFrame>}, dataset={'data': <DataFrame>})
2020-07-14 13:58:10,154 - kedro.io.core - DEBUG - Releasing MemoryDataSet(data=<DataFrame>)
2020-07-14 13:58:10,154 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<DataFrame>) at 0x115f6ddf0
2020-07-14 13:58:10,154 - kedro.runner.sequential_runner - INFO - Completed 3 out of 4 tasks
2020-07-14 13:58:10,154 - kedro.io.data_catalog - INFO - Loading data from `example_predictions` (CachedDataSet)...
2020-07-14 13:58:10,154 - kedro.io.data_catalog - INFO - Loading data from `example_test_y` (CachedDataSet)...
2020-07-14 13:58:10,154 - kedro.io.core - DEBUG - Loading CachedDataSet(cache={'data': <ndarray>}, dataset={'data': <ndarray>})
2020-07-14 13:58:10,154 - kedro.io.core - DEBUG - Checking whether target of MemoryDataSet(data=<ndarray>) exists
2020-07-14 13:58:10,154 - kedro.io.core - DEBUG - Loading CachedDataSet(cache={'data': <DataFrame>}, dataset={'data': <DataFrame>})
2020-07-14 13:58:10,154 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<ndarray>)
2020-07-14 13:58:10,155 - kedro.io.core - DEBUG - Checking whether target of MemoryDataSet(data=<DataFrame>) exists
2020-07-14 13:58:10,155 - kedro.io.core - DEBUG - Checking whether target of MemoryDataSet(data=<ndarray>) exists
2020-07-14 13:58:10,155 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<DataFrame>)
2020-07-14 13:58:10,155 - kedro.io.core - DEBUG - Checking whether target of MemoryDataSet(data=<DataFrame>) exists
2020-07-14 13:58:10,155 - kedro.pipeline.node - INFO - Running node: report_accuracy([example_predictions,example_test_y]) -> None
2020-07-14 13:58:10,155 - hookshot.pipelines.data_science.nodes - INFO - Model accuracy on test set: 96.67%
2020-07-14 13:58:10,156 - kedro.io.core - DEBUG - Releasing CachedDataSet(cache={'data': <ndarray>}, dataset={'data': <ndarray>})
2020-07-14 13:58:10,156 - kedro.io.core - DEBUG - Releasing MemoryDataSet(data=<ndarray>)
2020-07-14 13:58:10,156 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<ndarray>) at 0x115f7c220
2020-07-14 13:58:10,156 - kedro.io.core - DEBUG - Releasing CachedDataSet(cache={'data': <DataFrame>}, dataset={'data': <DataFrame>})
2020-07-14 13:58:10,156 - kedro.io.core - DEBUG - Releasing MemoryDataSet(data=<DataFrame>)
2020-07-14 13:58:10,156 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<DataFrame>) at 0x115f6df40
2020-07-14 13:58:10,156 - kedro.runner.sequential_runner - INFO - Completed 4 out of 4 tasks
2020-07-14 13:58:10,156 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.
Description
I'm always frustrated when I/O dominates compute.
For example, my pipeline takes 10 minutes to run, of which 9 minutes are spent writing to and reading back from S3.
Context
At QuantumBlack, it's most common to write intermediate datasets to disk. In fact, the Kedro data catalog very much facilitates this workflow. This also presents numerous advantages:
However, it's also extremely inefficient, especially when writing large datasets using slow mechanisms. On top of that, we most often expect reloaded data to be exactly equal to what was saved, save the case of transcoding and some terminal output formats (e.g. Excel, CSV).
Possible Implementation
https://github.com/deepyaman/hookshot/blob/develop/src/hookshot/hooks.py
Feel free to clone the repo and run the example. :)
At a high-level, the plugin aims to provide Unix
tee
-like behavior to runners.Goals of this implementation:
Limitations:
SharedMemoryDataSet
forParallelRunner
. I would be happy to get some input from the experts here. :)I'm most interested in understanding what's the best way to contribute this. I think it makes sense as part of a new
kedro.extras.hooks
subpackage. As part of Kedro, this functionality would continue to be supported through backend redesigns.