kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.53k stars 877 forks source link

Passing Extra Parameters to Custom Dataset #1723

Open brendalf opened 2 years ago

brendalf commented 2 years ago

Description

Hello there. I created a custom dataset to handle our Spark Delta Tables. The problem is that the custom dataset needs a replace-where string defining what partition should be overwritten after the data is generated inside the node. Catalog definition:

revenue:
  type: path.DeltaTableDataSet
  namespace: test
  table: revenue
  save_args:
    -

I can't use the parameters inside the save_args key for the custom dataset because the replace values are also calculated during execution depending on other pipeline parameters, like DATE_START and LOOKBACK.

I tried to create a class to be the interface between the nodes and the custom catalog, this class holds the Spark Dataframe and extra values, but Kedro fails when trying to convert to a pickle: Node return:

return SparkPlan(
    df=revenue,
    replace_where=[ 
        f"date >= '{from_date}' and date <= '{date_end}'"
    ]
)

Custom Dataset save method:

def _save(self, plan: SparkPlan) -> None:
    """Saves data to the specified filepath"""
    logger = logging.getLogger(self._table_name)
    logger.info(plan.replace_where)

Error received:

kedro.io.core.DataSetError: Failed while saving data to data set MemoryDataSet().
cannot pickle '_thread.RLock' object

Questions:

  1. Is there a way to provide runtime values to the dataset together with the data?
  2. Could I put these values in the context and retrieve them inside the custom dataset?
    • I saw a method to load the current kedro context, but that method was removed.

Edit 1 - 2022-07-25:

The error above was happening because I typed the wrong dataset in the node outputs, so Kedro tried to save as a MemoryDataset. I solved the problem of sending extra parameters by using this SparkPlan wrapper around every save and load from my custom dataset.

deepyaman commented 2 years ago

@brendalf On your specific error, can you try returning a dictionary from your function and constructing the SparkPlan object inside _save from that dictionary?

Questions:

  1. Is there a way to provide runtime values to the dataset together with the data?
  2. Could I put these values in the context and retrieve them inside the custom dataset?

    • I saw a method to load the current kedro context, but that method was removed.

Since Kedro tries to abstract data saving/loading from logic, I don't think this is directly supported. Of the top of my head, what you could do is return these runtime values from nodes, either explicitly or using hooks to pass that extra output.

brendalf commented 2 years ago

Hi @deepyaman. It's working now, both returning a SparkPlan or a dict. I realized that I had a typo in the catalog dataset name. Thanks.

Since Kedro tries to abstract data saving/loading from logic, I don't think this is directly supported. Of the top of my head, what you could do is return these runtime values from nodes, either explicitly or using hooks to pass that extra output.

Can you provide a short example?

datajoely commented 2 years ago

Are you running this with ParallelRunner? That's a common issue here.

brendalf commented 2 years ago

No. I'm not

brendalf commented 2 years ago

Although I solved the issue by wrapping the data and the parameters inside a class, I think it would be good to have this feature handled by Kedro in the future. Thanks for the support. Should I close this?

datajoely commented 2 years ago

Hi @brendalf I've just realised this is possibly resolved by tweaking the copy_mode of the memory dataset when passed into the next node:

https://kedro.readthedocs.io/en/latest/_modules/kedro/io/memory_dataset.html

noklam commented 2 years ago
kedro.io.core.DataSetError: Failed while saving data to data set MemoryDataSet().
cannot pickle '_thread.RLock' object

These errors almost always come from serialization, I think we had similar issue with TensorFlow object, quick solution is the copy_mode that Joel mentioned above.

datajoely commented 2 years ago

@noklam do you think we could catch this pickling error and recommend the solution? It's a hard one to debug for users in this situation.

brendalf commented 2 years ago

Hi @datajoely In my case I didn't wanted to save in a MemoryDataset, that was happening because I had a type between the data catalog entry and the name I actually wrote as output for the node. I think the problem happened because the memory dataset tried to serialize a spark dataframe object.

datajoely commented 2 years ago

Sorry - MemoryDataSet is used to dynamically pass data between nodes automatically, if you look at the implementation we automatically do this for native Spark dataframes:

image

So you can do this by explicitly declaring MemoryDataSets in the catalog.

I also think if you were to subclass our spark.SparkDataSet or spark.DeltaTableDataSet you would benefit from this too.

brendalf commented 2 years ago

Do you think it would be nice to have in the future a way to send runtime calculated values as extra parameters to the dataset? For now, I solved by wrapping the values and the dataset inside a class, that my custom dataset accepts to save. If not, we can close this issue.

noklam commented 2 years ago

@brendalf Could you provide an example of that?

datajoely commented 2 years ago

@brendalf or perhaps - why can't you just return runtime data as inputs to another node, does it need to be in the DataSet implementation?

brendalf commented 2 years ago

My custom dataset needs to receive two things:

  1. Spark data to be saved.
  2. Replace where query using values calculated inside the node.

Example: I need to replace the data inside the Delta Table for a specific set of dates. I have a date_start, a date_end and a lookback parameters defined inside the parameters.yml and then inside the node I actually load data from date_start - lookback to date_end, so I need to replace the same dates in the output table.

I thought about three solutions to solve this:

  1. Create a class that accepts the data and the replace where query, so the node can send everything I need to the custom dataset that accepts this class instead of just the spark dataset.
  2. I could push the replace where values to the parameters from inside the node and retrieve them in the custom dataset.
  3. I could instantiate my Custom Dataset inside the node and call the save method, without using the catalog.

I actually solved the problem with the first approach, but it's problematic, since now when I want to join nodes together, nodes downstream won't receive the spark dataset plan with lazy evaluation anymore, but a instance of this class.

I couldn't find how to implement the second approach. Maybe Kedro could automatically send the context to the dataset as kwargs?

The problem with the third one is that I want to keep using the data catalog.

brendalf commented 1 year ago

Hello folks, any news here?

datajoely commented 1 year ago

I think this option is most common amongst the community:

Create a class that accepts the data and the replace where query, so the node can send everything I need to the custom dataset that accepts this class instead of just the spark dataset.

In Kedro the nodes should be pure python functions with no knowledge of IO, so you should never have a context available there.

astrojuanlu commented 1 week ago

I can't use the parameters inside the save_args key for the custom dataset because the replace values are also calculated during execution depending on other pipeline parameters, like DATE_START and LOOKBACK.

The question of dynamic datasets like these has come up recently in some user conversations. We haven't started thinking on how to do it yet.