kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.94k stars 903 forks source link

Enhancing Pipeline Context Preservation in Runners for Deployment of Kedro (on AWS Batch) #3468

Open hugocool opened 10 months ago

hugocool commented 10 months ago

Description

In deploying with AWS Batch using Kedro, we encounter a fundamental limitation with the current Kedro Session implementation: the inability to retain the pipeline's name context through the entire execution process, especially in remote or delegated environments. While the kedro run --pipeline=name_of_pipeline_you_want_to_run command initiates a Pipeline object with the correct nodes, it loses the pipeline name context when individual node run commands are executed by a runner in environments like AWS Batch. This issue leads to problems with namespaces, dataset resolution, and executing nodes outside the __default__ pipeline, as the Pipeline object lacks awareness of its own name (name_of_pipeline_you_want_to_run).

Context

Our pipelines are structured to support conditional operations and multiple environments, necessitating a dynamic approach to pipeline execution. The lack of pipeline name awareness in the Pipeline object during remote node execution leads to defaulting all operations to the __default__ namespace, causing significant challenges in managing complex pipeline structures and dataset resolutions. For example, consider a setup where the pipeline registry contains both an evaluation pipeline and a training pipeline. These pipelines cannot both be registered under the default pipeline due to overlapping node names. As a result, without the ability to pass the pipeline name to the runner, these pipelines default to the __default__ pipeline when run on AWS Batch(since it receives the following kedro run command: kedro run --env=... --node=... and the name_of_pipeline_you_want_to_run only accessible in the CLI.py and the Kedro Session). So the filtering for namespaces is not passed on. This defaulting results in improper namespace filtering and subsequent issues with dataset resolution. Storing the name_of_pipeline_you_want_to_run in the Pipeline helps solve two issues with deployment (on for example AWS batch):

  1. Running nodes that are not contained in the __default__ pipeline.
  2. Dataset resolving through namespaces. Consider the following anonymized code snippets that demonstrate how dynamic pipelines like training or evaluation models are set up:
# Example of dynamic pipeline creation
pipelines["client_1_train"] = create_client_1_pipeline(only_train=True)
pipelines["client_2_train"] = create_client_2_pipeline(only_train=True)
# Data catalog configuration for dynamic dataset resolution
"{namespace}.X":
  type: pandas.ParquetDataSet
  filepath: s3://example-bucket/data/04_intermediate/{namespace}_X.parquet

In such configurations, nodes are run in the __default__ namespace by default, but for specific pipelines like a train-only pipeline, there is no distinction between train and test datasets. Consequently, when the nodes assume they are in the __default__ pipeline, they might look for a dataset like client.train.X, which doesn't exist because it was saved as X by a previous node in a different pipeline. This leads to dataset resolution issues and an inability to run nodes outside of the __default__ pipeline effectively. By making the Pipeline object aware of its own name, these issues could be mitigated, allowing for more accurate dataset resolution and node execution in the intended namespaces.

Possible Implementation

To address this limitation, we propose enhancing the Pipeline object to include an attribute for the pipeline name (pipeline_name) that is retained throughout the execution process. This would involve modifying the Kedro Session run method to accept and propagate the pipeline name. This change would ensure that runners and their delegated execution commands are aware of the correct pipeline context, preserving intended namespaces and dataset resolutions. The modified implementation would look something like this:

# In the KedroSession.run method

# Existing pipeline retrieval based on the name
try:
    pipeline = pipelines[pipeline_name or "__default__"]
except KeyError as exc:
    raise ValueError(...) from exc

# Proposed enhancement: attaching the pipeline name to the filtered pipeline
filtered_pipeline = pipeline.filter(...)
filtered_pipeline.pipeline_name = pipeline_name  # Proposed addition

# Rest of the execution code follows
...

Incorporating the pipeline_name attribute within the Pipeline object and ensuring its propagation through runners would facilitate a more accurate execution of pipelines, especially in complex and distributed environments like AWS Batch. This addition would mitigate the issues of namespace collisions and dataset resolution, providing a more robust and flexible deployment process.

Possible Alternatives

We currently use environment variables to pass the pipeline name during runtime. This method is functional but cumbersome and error-prone, lacking the robustness and integration of a more streamlined solution. I use a environment variable to pass the pipeline name to the runner as follows: in cli.py:

# in def run:
    PIPELINE_NAME = pipeline or "__default__"
    os.environ["PIPELINE_NAME"] = PIPELINE_NAME
# in batch_runner.py:
PIPELINE_NAME = os.environ.get("PIPELINE_NAME", "__default__")

# in _submit_job:

        command = [
            "kedro",
            "run",
            "--pipeline", 
            PIPELINE_NAME,
            "--env",
            "aws_batch",
            "--node",
            node.name,
        ]
datajoely commented 10 months ago

Thanks for the detailed issue @hugocool I think this is related to our existing session_id concepts https://github.com/kedro-org/kedro/issues/2879 and relates to the wider productionisation research conducted in #3094