LineaLabs / lineapy

Move fast from data science prototype to pipeline. Capture, analyze, and transform messy notebooks into data pipelines with just two lines of code.
https://lineapy.org
Apache License 2.0
662 stars 58 forks source link

Support intermediate artifacts #683

Open PertuyF opened 2 years ago

PertuyF commented 2 years ago

Hi all, thank you so much for developing LineaPy, looks great and I'm really excited about it!

Is your feature request related to a problem? Please describe. A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

When I develop a pipeline, I may want to integrate semantic steps to build my refined dataset table. As an illustration, master_data would be data loaded and assembled from a relational DB, whereas dataset would be the same table refined with some feature engineering.

Currently, if I try to do this I would save both master_data and dataset as artifacts, then create a pipeline like:

lineapy.to_pipeline(artifacts=[master_data.name, dataset.name], 
                    dependencies={dataset.name: {master_data.name}},
                    framework='AIRFLOW', pipeline_name='my_great_airflow_pipeline', output_dir='airflow')

My issue is that Lineapy would then create steps to build master_data from scratch, and also to create dataset from scratch instead of loading master_data as a starting point. Like:

import pickle

def master_data():

    import pandas as pd
    from sklearn.datasets import load_iris

    iris = load_iris()
    df = pd.DataFrame(iris.data, columns=iris.feature_names).assign(
        target=[iris.target_names[i] for i in iris.target]
    )
    iris_agg = df.set_index("target")
    artifact = pickle.dump(
        iris_agg, open("/home/oneai/.lineapy/linea_pickles/10dzyzx", "wb")
    )

def dataset():

    import pandas as pd
    from sklearn.datasets import load_iris

    iris = load_iris()
    df = pd.DataFrame(iris.data, columns=iris.feature_names).assign(
        target=[iris.target_names[i] for i in iris.target]
    )
    iris_agg = df.set_index("target")
    iris_clean = iris_agg.dropna().assign(test="test")
    dataset = pickle.dump(
        iris_clean, open("/home/oneai/.lineapy/linea_pickles/5Tk63gO", "wb")
    )

Describe the solution you'd like A clear and concise description of what you want to happen.

Ideally LineaPy would capture the dependency and build:

My issue is that Lineapy would then create steps to build master_data from scratch, and also to create dataset from scratch instead of loading master_data as a starting point. Something like:

import pickle

def master_data():

    import pandas as pd
    from sklearn.datasets import load_iris

    iris = load_iris()
    df = pd.DataFrame(iris.data, columns=iris.feature_names).assign(
        target=[iris.target_names[i] for i in iris.target]
    )
    iris_agg = df.set_index("target")
    pickle.dump(
        iris_agg, open("/home/oneai/.lineapy/linea_pickles/10dzyzx", "wb")
    )

def dataset():

    import pandas as pd

    iris_agg = pickle.load(
      open("/home/oneai/.lineapy/linea_pickles/10dzyzx", "rb")
   )
    iris_clean = iris_agg.dropna().assign(test="test")
    dataset = pickle.dump(
        iris_clean, open("/home/oneai/.lineapy/linea_pickles/5Tk63gO", "wb")
    )

Is it planned to support this behavior? Am I missing something?

dorx commented 2 years ago

Hi @PertuyF thanks so much for creating this issue. We are indeed working on this and really appreciate your input! We will get back with more detailed design proposals soon.

mingjerli commented 2 years ago

Hi @PertuyF thanks again for bring this issue up. Looks like you were running the following code to generate your two artifacts, master_data and dataset

import pandas as pd
from sklearn.datasets import load_iris

iris = load_iris()
df = pd.DataFrame(iris.data, columns=iris.feature_names).assign(
    target=[iris.target_names[i] for i in iris.target]
)
iris_agg = df.set_index("target")
master_data = lineapy.save(iris_agg, 'master_data')

iris_clean = iris_agg.dropna().assign(test="test")
dataset = lineapy.save(iris_clean, 'dataset')

and you were using the following code to generate your pipeline:

lineapy.to_pipeline(artifacts=[master_data.name, dataset.name], 
                    dependencies={dataset.name: {master_data.name}},
                    framework='AIRFLOW', pipeline_name='my_great_airflow_pipeline', output_dir='airflow')

We totally agree the output you are currently getting is not ideal. The reason for this is LineaPy does not track the dependencies between artifacts generated within the same session and we are working on this.

There is probably more than one reasonable pipeline that can be generated from the artifact-creating code depending on what is your end goal. There are at least three scenarios that I can think of:

  1. You need to output both the master_data and the dataset at the same time.
  2. You only need to output the dataset but you also need to recompute the master_data since it has been modified.
  3. You only need to output the dataset and you don't need to recompute the master_data since it is already up to date (or never get updated). ..... probably more scenarios can be added

In order to generate pipelines for each scenario, LineaPy will first detect the dependencies between artifacts within the session in the background and the user will be able to generate pipelines for each scenario based on how they call the to_pipeline function.

The followings are some proposed solutions (not finalized yet and your input would be very welcome!):

For scenario 1 (outputting both the master_data and the dataset at the same time), we want to create a pipeline by running:

lineapy.to_pipeline(artifacts=['master_data', 'dataset'])

and we are expect a pipeline.py looks like

def get_master_data():
    import pandas as pd
    from sklearn.datasets import load_iris

    iris = load_iris()
    df = pd.DataFrame(iris.data, columns=iris.feature_names).assign(
        target=[iris.target_names[i] for i in iris.target]
    )
    iris_agg = df.set_index("target")
    return iris_agg

def get_dataset(iris_agg):
    dataset = iris_agg.dropna().assign(test="test")
    return dataset

def pipeline():
    iris_agg = get_master_data()
    dataset = get_dataset(iris_agg)
    return iris_agg, dataset

if __name__=='__main__':
    pipeline()

For scenario 2 (outputting only the dataset and reusing the master_data), we want to create a pipeline by running

lineapy.to_pipeline(artifacts=['dataset'], recompute_dependencies=False)

and we are expect a pipeline.py looks like

def get_dataset(iris_agg):
    dataset = iris_agg.dropna().assign(test="test")
    return dataset

def pipeline():
    iris_agg = lineapy.get('master_data').get_value()
    dataset = get_dataset(iris_agg)
    return dataset

if __name__=='__main__':
    pipeline()

For scenario 3 (outputting only the dataset but recomputing the master_data), we want to create a pipeline by running

lineapy.to_pipeline(artifacts=['dataset'], recompute_dependencies=True)

and we are expect a pipeline.py looks like

def get_master_data():
    import pandas as pd
    from sklearn.datasets import load_iris

    iris = load_iris()
    df = pd.DataFrame(iris.data, columns=iris.feature_names).assign(
        target=[iris.target_names[i] for i in iris.target]
    )
    iris_agg = df.set_index("target")
    return iris_agg

def get_dataset(iris_agg):
    dataset = iris_agg.dropna().assign(test="test")
    return dataset

def pipeline():
    iris_agg = get_master_data()
    dataset = get_dataset(iris_agg)
    return dataset

if __name__=='__main__':
    pipeline()

I hope all these proposed solutions make sense to you.

There are definitely other things that can be done here, for instance

  1. Do we want to lift load_iris() out of the get_master_data() function and put it in a separate function?
  2. Do we want to save the artifact again while running the pipeline? If yes, do you want to save them within the same LineaPy database or other place?

and we would love your input!

PertuyF commented 2 years ago

Thank you so much @mingjerli for sharing this reflection and giving me an opportunity to provide feedback!

These strategies you mention totally make sense to me, and probably scenarii 1 and 3 would fit my use cases the best. The reason is that for now I forsee LineaPy as an assistant toward productionizing prototyping. To me this would typically covers three main stages:

  1. Prototype development, probably the creative and messy way
  2. transform a messy prototype into clean reusable code. Where LineaPy plays its role (artifacts versioning and pipeline creation)
  3. integrate reusable code to production systems, if the prototype brings value. Where some steps of the pipeline could be reimplemented on dedicated framework

Typical example would be to explore an hypothesis, starting by creating master data from an existing relational DB, then engineering features into dataset, then training model. This is stage 0 and LineaPy would help ensuring reusability of the code (e.g. re-execute on updated data source, revisit after a while, handover to another data scientist,...)

If eventually models are worth it I will want to integrate the pipeline with my DataOps stack and my MLOps stack. This would be stage 2 and there comes my semantic:

In my current vision, LineaPy would be involved to facilitate transition from prototype to production. Hence if we consider three artifacts master data, dataset and model from above: I see scenario 1 useful to push dataset to DataOps and model to MLOps; we need both outputs. And I see scenario 3 useful to reimplemente master data into the proper data framework; we need to be able to recalculate the artifact. Scenario 2 would heavily rely on using LineaPy in a centralised way, and whether I do not exclude this case in the future, I believe I am no there yet. Although it also relates to the final stage where master data is simply loaded, with the difference that recalculation and versioning could be handled dynamically as needed (e.g. Upon change in data source).

Your additional points are very relevant. Extracting load_iris() as a standalone step could make sense as semantically data loading and data transformation could need to be separate. Saving artifacts again upon running the pipeline would directly apply to the case of dataset above. And depending on the stage of the process it could be interesting to do so in LineaPy (stage 1, clean reusable code) or somewhere else (stage 2, integration with DataOps stack).

I hope this makes sense to you! I probably still have to give it some thoughts, although I prefer to throw ideas here so we can engage a discussion 🙂

Also, this is the use case from just one user. I would completely understand that others ways of working could require different behaviours.

Happy to continue this discussion as needed! Best, Fabien

yifanwu commented 2 years ago

Hi Fabien, thanks again for your detailed feedback!

We've been working on supporting scenario 1/3 for a couple weeks and will keep you posted once it's ready for use. In the meantime, would love to explore more how you are using lineapy and connect on slack!

yoonspark commented 2 years ago

Hi Fabien (@PertuyF), hope all is well with you. We are happy to share that we finally have the support for scenario 1! (We are working on the other two scenarios and will keep you posted as they get ready too.) This means that, with the latest version of lineapy (v0.1.5; released yesterday!), running to_pipeline() API would modularize artifact code into "non-overlapping" functions, where duplicate code blocks are factored out to reduce redundant computations (which might be expensive to run).

Hence, with the same example discussed earlier, i.e.,

import pandas as pd
from sklearn.datasets import load_iris

iris = load_iris()
df = pd.DataFrame(iris.data, columns=iris.feature_names).assign(
    target=[iris.target_names[i] for i in iris.target]
)
iris_agg = df.set_index("target")
master_data = lineapy.save(iris_agg, "master_data")

iris_clean = iris_agg.dropna().assign(test="test")
dataset = lineapy.save(iris_clean, "dataset")

running

lineapy.to_pipeline(
    artifacts=[master_data.name, dataset.name],
    dependencies={dataset.name: {master_data.name}},
    framework="AIRFLOW",
    pipeline_name="iris_pipeline",
    output_dir="./airflow",
)

would generate a module file that looks like:

### ./airflow/iris_pipeline_module.py

import pandas as pd
from sklearn.datasets import load_iris

def get_master_data():
    iris = load_iris()
    df = pd.DataFrame(iris.data, columns=iris.feature_names).assign(
        target=[iris.target_names[i] for i in iris.target]
    )
    iris_agg = df.set_index("target")
    return iris_agg

def get_dataset(iris_agg):
    iris_clean = iris_agg.dropna().assign(test="test")
    return iris_clean

def run_session_including_master_data():
    # Given multiple artifacts, we need to save each right after
    # its calculation to protect from any irrelevant downstream
    # mutations (e.g., inside other artifact calculations)
    import copy

    artifacts = dict()
    iris_agg = get_master_data()
    artifacts["master_data"] = copy.deepcopy(iris_agg)
    iris_clean = get_dataset(iris_agg)
    artifacts["dataset"] = copy.deepcopy(iris_clean)
    return artifacts

def run_all_sessions():
    artifacts = dict()
    artifacts.update(run_session_including_master_data())
    return artifacts

if __name__ == "__main__":
    run_all_sessions()

As shown, the modularized code now contains "non-overlapping" functions, e.g., get_dataset() no longer repeats the same computation as that in get_master_data() (the former instead takes the output from the latter to carry on its own processing).

Moreover, LineaPy can now smartly identify any "common" computation among different artifacts and factor it out into its own function — even if that common computation has not been stored as an artifact (check our recent GH discussion post for a concrete example). This of course further reduces any redundant computation in the pipeline.

Given your valuable inputs earlier, we would love to get your feedback on this new style of pipeline generation. Please give it a try and let us know what you think (or any questions)!