Galileo-Galilei / kedro-mlflow

A kedro-plugin for integration of mlflow capabilities inside kedro projects (especially machine learning model versioning and packaging)
https://kedro-mlflow.readthedocs.io/
Apache License 2.0
195 stars 29 forks source link

Understanding PipelineML and pipeline_ml #16

Closed akruszewski closed 3 years ago

akruszewski commented 4 years ago

Hi @Galileo-Galilei. As I mentioned in other issue, I'm working currently with integrating my training and inference pipelines with MLPipeline. Unfortunately I'm confused with handling inputs and outputs, I can't wrap my head around it.

Context

My training pipeline is built from three other pipelines: de_pipeline (data engineering), fe_pipeline (feature engineering) and md_pipeline (training aka. modeling).

My inference pipeline is buit from the same pipelines but with predict argument which change their behavior (they're using previously saved models for imputer and prediction.

In my current implementation it looks like this:

     de_pipeline_predict = pipeline(
         de.create_pipeline(predict=True),  # type: ignore
         inputs={"remote_raw": "remote_new", "imputer": "imputer"},
         namespace="new",
     )
     fe_pipeline_predict = pipeline(
         fe.create_pipeline(predict=True),  # type: ignore
         namespace="new",
     )

     # `new_preds` output would be mapped to `new.new_preds` because of
     # namespace usage, so we use map `new_preds` to `new_preds` to retain the
     # name and keep catalog clean.
     md_pipeline_predict = pipeline(
         md.create_pipeline(predict=True),  # type: ignore
         inputs={"lgbm": "lgbm"},
         outputs={"new_preds": "new_preds"},
         namespace="new",

My pipelines also getting as input parameters, obtained from kedro configuration (by that I mean conf/base/parameters.yaml).

When I'm trying to glue them together with:

     train_pipeline = de_pipeline + fe_pipeline + md_pipeline
     predict_pipeline = de_pipeline_predict + fe_pipeline_predict + md_pipeline_predict

     training = pipeline_ml(
         training=train_pipeline,
         inference=predict_pipeline,
     )

and running my training pipeline I'm getting:

kedro_mlflow.pipeline.pipeline_ml.KedroMlflowPipelineMLInputsError:
        The following inputs are free for the inference pipeline:
        - lgbm
     - remote_new
     - imputer
     - params:data_engineering
     - params:target.
        Only one free input is allowed.
        Please make sure that 'inference' pipeline inputs are 'training' pipeline outputs,
        except one.

I'm understand the issue here, but I don't know how to proceed with that ("un-free" inputs which should be obtained (automatically?) using Kedro features). I would be glad for any tips.

Galileo-Galilei commented 4 years ago

TL; DR: make sure that training_pipeline.outputs() and predict_pipeline.inputs() are identical (except for one value for the data to predict in the second set, which seems to be remote_new in your project.). Note: the namespace may have modified them and causes some interference and it might be a bug.


A bit of context: what is the pipeline_ml function for?

Unlike all the others functions of the plugin, this function is not intended for production: it packages a whole pipeline to make it easy to serve (in api or batch) in one command line. The goal is to share a pipeline very easily and to facilitate test and reuse between data scientists.

For production purpose, you need a tool to schedule / monitor the pipelines (like airflow for instance).

What is its goal?

Under the hood, the PipelineML class (the type of the output of pipeline_ml) is nothing but a kedro Pipeline (the "training" pipeline) which has access to another pipeline (the "predict" pipeline) (you can see it in the code here).

When the KedroPipelineHook detects that the pipeline you are trying to run (with kedro run) is not a standard Pipeline but a PipelineML, it tries to log it in mlflow with the KedroPipelineModel class which is actually the class that contains all the code logic.

This class is inspired from mlflow example on how to create a custom mlflow model. Basically, it is a class that implements a predict method which will be called automatically when serving the model, and a load_context method which manages the inputs your predict method needs. Inded, any logged model that respects this format can benefits from mlflow capabilites, including model service.

How does it works?

If you have read the KedroPipelineModel code described above, you can see that the KedroPipelineModel needs 2 elements to be logged in mlflow :

You can fill these arguments by hand but it can be a bit tedious and this is where the pipeline_ml comes into play: your predict pipeline is a higher order function, i.e. a function generated by the training pipeline. All the arguments your predict pipeline needs are generated by the training pipeline, and the pipeline_ml binds them together: it should have all the informations needed for logging. When the training pipeline is run, the associated "predict" is logged.

To do so, it applies the following process:

How to fix your bug

The error comes from the fact that training_pipeline.outputs() and predict_pipeline.inputs() do not have the same entries. It may either comes from the fact that you do not use the same name for entries in the model (fix that!) or that the namespace introduces a prefix that changes the entries names. In the second case, it may a bug to investigate (I confess I have not tried pipeline_ml with namespaces which are quite recent in Kedro). Your pipeline should look like this (I'm coding on the fly, the code is not tested):

full_pipeline= Pipeline(
        [
            node(
                func=preprocess,
                inputs=dict(data="raw_data"),
                outputs="preprocessed_data",
                tags=["training", "inference"]
            ),
            node(
                func=fit_imputer,
                inputs=dict(data="preprocessed_data"),
                outputs="imputer",
                tags=["training"]
            ),
            node(
                func=transform_imputer,
                inputs=dict(data="preprocessed_data", imputer="imputer"),
                outputs="imputed_data",
                tags=["training", "inference"]
            ),
            node(
                func=train_model,
                inputs=dict(data="imputed_data"),
                outputs="model",
                tags=["training"]
            ),
            node(
                func=predict,
                inputs=dict(data="imputed_data", model="model"),
                outputs="predictions",
                tags=["inference"]
            ),

        ]
    )

### create the pipeline like you. Use kedro viz to see what's going on!
train_pipeline=full_pipeline.only_nodes_with_tags("training") # print the pipeline and check that train_pipeline.outputs={imputer, model} 
predict_pipeline=full_pipeline.only_nodes_with_tags("inference") # print the pipeline and check that predict_pipeline.inputs={raw_data, imputer, model} 

training = pipeline_ml(
         training=train_pipeline,
         inference=predict_pipeline,
         input_name="raw_data"
)

If you still struggle with the function and needs additional help, can you please share :

Additional infos

I will publish a detailed example in the docs this weekend, if you can wait until this it should make the explanation more clear I guess.

laurids-reichardt commented 4 years ago

Hi @Galileo-Galilei, thanks for your great work!

My understanding of pipeline_ml might be wrong, but I believe the requirement that training_pipeline.outputs() and predict_pipeline.inputs() are identical might be a bit too restrictive. Consider the following pipeline, which matches text strings to predefined labels:

def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                name="Split Data",
                func=split_data,
                inputs=["text_samples", "parameters"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
                tags=["training"],
            ),
            node(
                name="Fit MultiLabelBinarizer",
                func=fit_label_binarizer,
                inputs="y_train",
                outputs="mlb",
                tags=["training"],
            ),
            node(
                name="Transform Labels",
                func=transform_labels,
                inputs=["mlb", "y_train", "y_test"],
                outputs=["Y_train", "Y_test"],
                tags=["training"],
            ),
            node(
                name="Train Model",
                func=train_model,
                inputs=["X_train", "Y_train"],
                outputs="classifier",
                tags=["training"],
            ),
            node(
                name="Evaluate Model",
                func=evaluate_model,
                inputs=["classifier", "X_test", "Y_test"],
                outputs=None,
                tags=["evaluation"],
            ),
            node(
                name="Make Prediction",
                func=make_prediction,
                inputs=["classifier", "mlb", "features"],
                outputs=None,
                tags=["inference"],
            ),
        ]
    )

Nodes:

def split_data(text_samples: pd.DataFrame, parameters: Dict) -> List:
    # extract features
    X = text_samples["features"].values

    # extract labels
    y = text_samples["labels"].values

    # split dataset into train and test data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
    )

    return [X_train, X_test, y_train, y_test]

def fit_label_binarizer(y_train: np.ndarray) -> MultiLabelBinarizer:
    # multi label binarizer to transform data labels
    mlb = MultiLabelBinarizer()

    # fit the mlb on the train label data
    mlb.fit(y_train)

    return mlb

def transform_labels(mlb: MultiLabelBinarizer, y_train: np.ndarray, y_test: np.ndarray) -> List:
    # transform train label data
    Y_train = mlb.transform(y_train)

    # transform test label data
    Y_test = mlb.transform(y_test)

    return [Y_train, Y_test]

def train_model(X_train: np.ndarray, Y_train: np.ndarray) -> Pipeline:
    # scikit-learn classifier pipeline
    classifier = Pipeline(
        [
            ("vectorizer", CountVectorizer()),
            ("tfidf", TfidfTransformer()),
            ("clf", OneVsRestClassifier(LinearSVC())),
        ]
    )

    # fit the classifier on the train data
    classifier.fit(X_train, Y_train)

    return classifier

def evaluate_model(classifier: Pipeline, X_test: np.ndarray, Y_test: np.ndarray):
    # make prediction with test data
    predicted = classifier.predict(X_test)

    # accuracy score of the trained classifier
    accu = accuracy_score(Y_test, predicted)

    # log accuracy
    logger = logging.getLogger(__name__)
    logger.info("Model has an accuracy of %.3f", accu)

def make_prediction(classifier: Pipeline, mlb: MultiLabelBinarizer, features: np.ndarray) -> List:
    # model inference on features
    predicted = classifier.predict(features)

    # inverse transform prediction matrix back to string labels
    all_labels = mlb.inverse_transform(predicted)

    # map input values to predicted label
    predictions = []
    for item, labels in zip(values, all_labels):
        predictions.append({"value": item, "label": labels})

    # return predictions as list of dicts
    return predictions

The inference part does not only need access to the trained model but to the fitted MultiLabelBinarizer as well. Is this supported by pipeline_ml?

Galileo-Galilei commented 4 years ago

Hello @laurids-reichardt, glad to see that you're playing around with the plugin!

Regarding your question, the access to other data (encoder, binarizer, ...) than the ml model is not only "possible" but exactly what pipeline_ml is designed for, because it is a very common pattern in ml applications. It enables even more complex pipelines (your prediction pipeline is composed of a single node, but you can have split the train_model scikit-learn pipeline in several kedro nodes, one for the vectorizer, one for tf-id and one for your classifier and it should still be able to handle this more complex use case).

A few remarks on pipeline_ml:

The good news are that I think your code should almost work "as is". Can you check the following items:

In case these elements are not enough to solve your problem, what is the error message you get ? Can you share a sample of the data you use to make the problem reproducible?

EDIT: It is a bug. The pipeline_ml considers only "terminal" outputs (i.e. which are not inputs of other nodes), and since your mlb is reused in your training pipeline (in the transform label), it crashes. This behaviour is very strange, I was sure that there was a test for it. I try to commit a fix ASAP. ~To make your pipeline work in current state, just return the binarizer as a terminal output (for example, make transform_labels return it unmodified).~

EDIT2: It should be fixed. Do all above modifications and install hotfix-pipeline-ml version of kedro-mlflow with below command. I've tried with the example code you give above, and everything seems to be fine. I'll add some tests in coming weeks and I will merge it to develop.

pip install --upgrade git+https://github.com/Galileo-Galilei/kedro-mlflow.git@hotfix-pipeline-ml
laurids-reichardt commented 4 years ago

Yeah, thanks for the quick answer! The hotfix works. For reference, the url is: git+https://github.com/Galileo-Galilei/kedro-mlflow.git@hotfix-pipeline-ml

Galileo-Galilei commented 4 years ago

Indeed, wrong copy pasting, sorry.

Does It "works" only means that it is properly stored in mlflow or did you set the API up and tried it out?

laurids-reichardt commented 4 years ago

Here's my current implementation: https://github.com/laurids-reichardt/kedro-examples/blob/kedro-mlflow-hotfix2/text-classification/src/text_classification/pipelines/pipeline.py

kedro run --pipeline=kedro_mlflow runs without issues and logs the mlb and classifier as artifacts inside the mlruns folder.

However I get the following error while trying to make some predictions:

❯ mlflow models predict -i ./predict_input.csv -m ./mlruns/1/887715ed7000444fae966f99376ea870/artifacts/text_classification --no-conda
2020/07/19 23:46:55 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
Traceback (most recent call last):
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/bin/mlflow", line 8, in <module>
    sys.exit(cli())
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/mlflow/models/cli.py", line 93, in predict
    json_format=json_format)
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/mlflow/pyfunc/backend.py", line 65, in predict
    json_format)
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/mlflow/pyfunc/scoring_server/__init__.py", line 222, in _predict
    pyfunc_model = load_model(model_uri)
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/mlflow/pyfunc/__init__.py", line 466, in load_model
    model_impl = importlib.import_module(conf[MAIN])._load_pyfunc(data_path)
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/mlflow/pyfunc/model.py", line 209, in _load_pyfunc
    python_model = cloudpickle.load(f)
ModuleNotFoundError: No module named 'text_classification'

My current guess would be that the issue stems from the fact that I use pyenv+venv to resolve my dependencies instead of conda. I'll investigate further and report back.

Galileo-Galilei commented 4 years ago

It sounds like your own package is not installed as a python package. What does pip show text_classification return? In your active environment and at the root of your kedro project , can you try:

cd src
pip install -e . 
laurids-reichardt commented 4 years ago

You're right, pip install -e ./src did the trick. I should read up on Python modules. 😃

Now it works without issues. Thanks for your great support!

❯ model_id=5372a2d2198b443382922a67c3dc3a40; mlflow models predict -i ./predict_input.csv -m ./mlruns/1/${model_id}/artifacts/text_classification --no-conda -t 'csv'
2020/07/20 11:49:40 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/fsspec/implementations/local.py:33: FutureWarning: The default value of auto_mkdir=True has been deprecated and will be changed to auto_mkdir=False by default in a future release.
  FutureWarning,
{"predictions": [{"value": "The weather in london is rainy as always.", "label": ["new york"]}, {"value": "England just lost the football game.", "label": ["london"]}, {"value": "New York is called the Big Apple.", "label": ["new york"]}]}%
laurids-reichardt commented 4 years ago

I converted the scikit-learn classifier pipeline to a kedro pipeline as well: https://github.com/laurids-reichardt/kedro-examples/blob/master/text-classification/docs/kedro-pipeline.svg

Galileo-Galilei commented 4 years ago

I have just added unit tests, updated the changelog and merged this fix to develop. It will be released to pypi soon. I let the issue opened because apart from this bugfix, it is the best documentation of the pipeline_ml function so far.

Short digression on module

Regarding the module, I always recommend to install your kedro package as a module. In you don't, you can perform relative import between your scripts, but they depend on your working directory. This may lead to annoying issues because:

For all these reasons, I found it much more stable to install your project as a python package with pip.

The -e flag means that you install the package in editable mode, which means that it is not installed in the directory of your virtual env (C:\Users\YOU\Anaconda3\envs\YOUR_ENV) but it points towards your local project (try pip freeze and look for your package to see what I mean). This means that any changes in your code is immediately taken into account without reinstalling the package.

A better way to specify environment in kedro-mlflow

Note that with all this in mind, you can specifiy the conda_env arg of MlflowPipelineHook to ensure that your model will be reproducible for anyone which can pip install your module (from github for instance)

from your_package import __version__as pkg_version # <-- add this
from kedro_mlflow.framework.hooks import MlflowNodeHook, MlflowPipelineHook
from pk.pipeline import create_pipelines

class ProjectContext(KedroContext):
    """Users can override the remaining methods from the parent class here,
    or create new ones (e.g. as required by plugins)
    """

    project_name = "YOUR PROJECT NAME"
    project_version = "0.16.X"
    hooks = (
        MlflowNodeHook(flatten_dict_params=False),
        MlflowPipelineHook(model_name="YOUR_PYTHON_PACKAGE",
                           conda_env={ "python": YOUR_PYTHON_VERSION,
                                       "dependencies": {f"your_package=={pkg_version}"}) # <-- and this
    )
Galileo-Galilei commented 3 years ago

This issue is closed since :

Feel free to reopen if needed.