kedro-org / kedro-plugins

First-party plugins maintained by the Kedro team.
Apache License 2.0
91 stars 83 forks source link

kedro airflow plugins: ValueError Pipeline input(s) not found in the DataCatalog #75

Closed allenhaozi closed 1 year ago

allenhaozi commented 1 year ago

when I run the Airflow Job Have this problem

ValueError: Pipeline input(s) {'X_test', 'y_train', 'X_train'} not found in the DataCatalog
import sys
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path

from airflow import DAG
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.version import version
from kedro.framework.project import configure_project
from kedro.framework.session import KedroSession

sys.path.append("/Users/mahao/airflow/dags/pandas_iris_01/src")

class KedroOperator(BaseOperator):
    @apply_defaults
    def __init__(self, package_name: str, pipeline_name: str, node_name: str,
                 project_path: str, env: str, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.package_name = package_name
        self.pipeline_name = pipeline_name
        self.node_name = node_name
        self.project_path = project_path
        self.env = env

    def execute(self, context):
        configure_project(self.package_name)
        with KedroSession.create(self.package_name,
                                 self.project_path,
                                 env=self.env) as session:
            session.run(self.pipeline_name, node_names=[self.node_name])

# Kedro settings required to run your pipeline
env = "local"
pipeline_name = "__default__"
#project_path = Path.cwd()
project_path = "/Users/mahao/airflow/dags/pandas_iris_01"
print(project_path)

package_name = "pandas_iris_01"

# Default settings applied to all tasks
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# Using a DAG context manager, you don't have to specify the dag property of each task
with DAG(
        "pandas-iris-01",
        start_date=datetime(2019, 1, 1),
        max_active_runs=3,
        schedule_interval=timedelta(
            minutes=30
        ),  # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
        default_args=default_args,
        catchup=False  # enable if you don't want historical dag runs to run
) as dag:

    tasks = {}

    tasks["split"] = KedroOperator(
        task_id="split",
        package_name=package_name,
        pipeline_name=pipeline_name,
        node_name="split",
        project_path=project_path,
        env=env,
    )

    tasks["make-predictions"] = KedroOperator(
        task_id="make-predictions",
        package_name=package_name,
        pipeline_name=pipeline_name,
        node_name="make_predictions",
        project_path=project_path,
        env=env,
    )

    tasks["report-accuracy"] = KedroOperator(
        task_id="report-accuracy",
        package_name=package_name,
        pipeline_name=pipeline_name,
        node_name="report_accuracy",
        project_path=project_path,
        env=env,
    )

    tasks["split"] >> tasks["make-predictions"]

    tasks["split"] >> tasks["report-accuracy"]

    tasks["make-predictions"] >> tasks["report-accuracy"]
noklam commented 1 year ago

I think you are missing the data from the catalog.

example_iris_data:
  type: pandas.CSVDataSet
  filepath: data/01_raw/iris.csv
example_train_x:
  type: pickle.PickleDataSet
  filepath: data/05_model_input/example_train_x.pkl
example_train_y:
  type: pickle.PickleDataSet
  filepath: data/05_model_input/example_train_y.pkl
example_test_x:
  type: pickle.PickleDataSet
  filepath: data/05_model_input/example_test_x.pkl
example_test_y:
  type: pickle.PickleDataSet
  filepath: data/05_model_input/example_test_y.pkl
example_model:
  type: pickle.PickleDataSet
  filepath: data/06_models/example_model.pkl
example_predictions:
  type: pickle.PickleDataSet
  filepath: data/07_model_output/example_predictions.pkl

See https://kedro.readthedocs.io/en/stable/deployment/airflow_astronomer.html?highlight=astro-airflow-iris

Can you provide the steps to reproduce the issue? What versions of kedro, kedro-airflow are you using and what commands did you run?