kubeflow-kale / kale

Kubeflow’s superfood for Data Scientists
Apache License 2.0
632 stars 128 forks source link

Kale SDK: Graph contains a cycle or graph changed during iteration #399

Open Techn0logic opened 2 years ago

Techn0logic commented 2 years ago
python: 3.8.8
pip: 21.3.1
kubeflow-kale: 0.7.0

Aiming to use kale sdk to compile (and run) pipeline in on-prem kubeflow environment as per documentation https://docs.arrikto.com/release-1.4/user/kale/sdk/pipelines.html#procedure

example kale_sdk.py

from kale.sdk import pipeline, step
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split

def load(random_state):
    """Create a random dataset for binary classification."""
    rs = int(random_state)
    x, y = make_classification(random_state=rs)
    return x, y

def split(x, y):
    """Split the data into train and test sets."""
    x, x_test, y, y_test = train_test_split(x, y, test_size=0.1)
    return x, x_test, y, y_test

def train(x, x_test, y, training_iterations):
    """Train a Logistic Regression model."""
    iters = int(training_iterations)
    model = LogisticRegression(max_iter=iters)
    model.fit(x, y)

@pipeline(name="binary-classification", experiment="kale-tutorial")
def ml_pipeline(rs=42, iters=100):
    """Run the ML pipeline."""
    x, y = load(rs)
    x, x_test, y, y_test = split(x, y)
    train(x, x_test, y, iters)

if __name__ == "__main__":
    ml_pipeline(rs=42, iters=100)

Issue 1. entry point doesn't seem to work as expected on the module level python -m kale --help /opt/conda/bin/python: No module named kale.__main__; 'kale' is a package and cannot be directly executed

While using kale binary seems to only work with notebook NB

usage: kale [-h] --nb NB [--upload_pipeline] [--run_pipeline] [--debug] [--experiment_name EXPERIMENT_NAME] [--pipeline_name PIPELINE_NAME]
            [--pipeline_description PIPELINE_DESCRIPTION] [--docker_image DOCKER_IMAGE] [--kfp_host KFP_HOST] [--storage-class-name STORAGE_CLASS_NAME]
            [--volume-access-mode VOLUME_ACCESS_MODE]
kale: error: the following arguments are required: --nb

Issue 2. docstring in the @step and @pipeline causes error. (Removing docstrings leads to another issue)

python kale_sdk.py

Traceback (most recent call last):
  File "kale_sdk.py", line 47, in <module>
    ml_pipeline(rs=42, iters=100)
  File "/opt/conda/lib/python3.8/site-packages/kale/sdk/api.py", line 85, in _do
    processor = PythonProcessor(func, config)
  File "/opt/conda/lib/python3.8/site-packages/kale/processors/pyprocessor.py", line 57, in __init__
  File "/opt/conda/lib/python3.8/site-packages/kale/processors/pyprocessor.py", line 62, in validate
  File "/opt/conda/lib/python3.8/site-packages/kale/processors/pyprocessor.py", line 133, in _validate_function_body
    raise RuntimeError("ast.Expr value is not a ast.Call node")
RuntimeError: ast.Expr value is not a ast.Call node

Issue 3: When removing docstrings, and having more than 1 step DAG creation fails.

2022-01-19 17:22:18 Kale podutils:255         [INFO]     Getting the base image of container...
2022-01-19 17:22:18 Kale podutils:84          [INFO]     Getting the current container name...
2022-01-19 17:22:18 Kale podutils:89          [INFO]     <CONTAINER NAME>
2022-01-19 17:22:18 Kale podutils:268         [INFO]     Retrieved image: <MT IMAGE>
2022-01-19 17:22:18 Kale kale                 [INFO]     Registering Step 'data_loading'
2022-01-19 17:22:18 Kale kale                 [INFO]     Registering Step 'data_split'
2022-01-19 17:22:18 Kale kale                 [INFO]     Registering Step 'model_training'
Traceback (most recent call last):
  File "kale_sdk.py", line 43, in <module>
    ml_pipeline(rs=42, iters=100)
  File "/opt/conda/lib/python3.8/site-packages/kale/sdk/api.py", line 86, in _do
    pipeline_obj = processor.run()
  File "/opt/conda/lib/python3.8/site-packages/kale/processors/baseprocessor.py", line 44, in run
  File "/opt/conda/lib/python3.8/site-packages/kale/processors/pyprocessor.py", line 83, in to_pipeline
  File "kale_sdk.py", line 39, in ml_pipeline
    train(x, x_test, y, iters)
  File "/opt/conda/lib/python3.8/site-packages/kale/step.py", line 70, in __call__
    return execution_handler(self, *args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/kale/processors/pyprocessor.py", line 90, in _register_step_handler
  File "/opt/conda/lib/python3.8/site-packages/kale/pipeline.py", line 222, in add_step
    if step.name in self.steps_names:
  File "/opt/conda/lib/python3.8/site-packages/kale/pipeline.py", line 243, in steps_names
    return [step.name for step in self._topological_sort()]
  File "/opt/conda/lib/python3.8/site-packages/kale/pipeline.py", line 243, in <listcomp>
    return [step.name for step in self._topological_sort()]
  File "/opt/conda/lib/python3.8/site-packages/kale/pipeline.py", line 290, in _steps_iterable
    for name in step_names:
  File "/opt/conda/lib/python3.8/site-packages/networkx/algorithms/dag.py", line 246, in topological_sort
    for generation in nx.topological_generations(G):
  File "/opt/conda/lib/python3.8/site-packages/networkx/algorithms/dag.py", line 177, in topological_generations
    raise nx.NetworkXUnfeasible(
networkx.exception.NetworkXUnfeasible: Graph contains a cycle or graph changed during iteration

Expectation is to compile and run pipeline using sdk defined in the python file as it is outlined in the documentation.

Would appreciate some pointers on how to correctly use the SDK and why I am experiencing these errors.

Techn0logic commented 2 years ago

I figured that it breaks if the variable names of outputs from each step overlap.

Changing variable x to x1; y to y1 names allows dag to succeed.

@pipeline(name="binary-classification", experiment="kale-tutorial")
def ml_pipeline(rs=42, iters=100):
    x, y = load(rs)
    x1, x_test, y1, y_test = split(x, y)
    train(x1, x_test, y1, iters)

Is this the expected behavior in the SDK? Perhaps the example needs to be updated on the documentation?

tc360950 commented 2 years ago

This is caused by the way pipeline steps are added to DAG in pyprocessor.py:

def _link_step(self, step: Step):
        ins_left = set(step.ins.copy())
        for anc_step in reversed(list(self.pipeline.steps)):
            if ins_left.intersection(set(anc_step.outs)):
                self.pipeline.add_dependency(anc_step, step)

When _link_step is called on a given step it already has been placed in the DAG but with no edges. So when anc_step becomes equal to step it is linked to itself because: ins_left.intersection(set(anc_step.outs)) is true.

I believe at a bare minimum Runtime exception should be raised in PythonProcessor._register_step_handler if outputs and inputs of a step have non-empty intersection.

I can try to add modification which would allow such situation to occur.

wemoveon2 commented 2 years ago

I'm experiencing the first issue as well, did you manage to resolve it?