kedro-org / kedro-plugins

First-party plugins maintained by the Kedro team.
Apache License 2.0
92 stars 89 forks source link

Add an additional Jinja2-Template #27

Closed fdroessler closed 1 year ago

fdroessler commented 2 years ago

Description

The current kedro-airflow plugin requires the kedro project to be installed on the Airflow worker. This can provide a challenge when changes to the kedro projects were done and need to be rolled out to the airflow worker. An alternative to the current extension of the BaseOperator which generates a KedroOperator would be to use the PythonVirtualenvOperator. In that alternative setup, the worker would be able to install the kedro project updates from a local/official pypi server upon release of an update. The downside is that there is an additional overhead when creating the Virtualenv.

Context

I have run into issues when using the current setup as I am getting OOM errors when running kedro dags that use the KedroOperator (investigation to why is still ongoing). Further to this and probably due to my inexperience I was struggling rolling out updates from the kedro project to the airflow workers without manually updating the worker or without what would seem a lot of hassle in worker updates.

Possible Implementation

An alternative Jinja2 Template can be provided which follows very much the logic of the existing one but replaces the KedroOperator with a PythonVirtualenvOperator setup.

Basic elements of the setup can be found below: Kedro function to be executed in the virtual environment:

def kedro_func(project, path, env, node_names, pipeline_name):
        """
        Example function that will be performed in a virtual environment.

        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        from kedro.framework.session import KedroSession
        from kedro.framework.project import configure_project

        configure_project(project)
        with KedroSession.create(project,
                                    path,
                                    env=env) as session:
            context = session.load_context()
            print(context.catalog.list())
            session.run(pipeline_name, node_names=node_names)
        print('Finished')

Changes to the task generating loop:

    tasks = {}
    {% for node in pipeline.nodes %}
    tasks["{{ node.name | safe | slugify }}"] = PythonVirtualenvOperator(
        python_callable=kedro_func,
        requirements=['kedro', package_name],
        system_site_packages=False,
        task_id="{{ node.name | safe | slugify  }}",
        op_kwargs={
            "package_name": package_name,
            "pipeline_name": pipeline_name,
            "node_name": "{{ node.name | safe }}",
            "project_path": project_path,
            "env": env,
        }
    )
    {% endfor %}

Would be happy to receive feedback on this and if it is deemed useful I am happy to create a PR.

noklam commented 2 years ago

Hi @fdroessler, sorry for the late reply.

I see there are a couple of issues mentioned here.

  1. OOM - how is it solved with the new operator?
  2. Replace BaseOperator with PythonVirtualOperator - the main difference is the operator will get its own virtual environment instead of using the working Python env, is this correct?

I think this could be useful, but I am not sure what's the best practice with the latest Airflow. For example, I found that they release a TaskFlowAPI . https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html

fdroessler commented 2 years ago

Hi @noklam apologies from my side as well ;) Looking into TaskFlowAPI I think that could work as well and might even allow for a more generic template.

However, is there any argument against providing templates using different Airflow "APIs"? From what I have seen, while TaskFlowAPI is probably the way-to-go it is not necessarily common yet. Also I have seen a variety of pre-TaskFlowAPI deployments which would not be able to support that workflow but would be able to support BaseOperator and PythonVirtualOperator. Let me know what you think.

noklam commented 2 years ago

@fdroessler I think for this we really need the input from the community, I am definitely not an Airflow expert and I don't understand the advantage of all the APIs.

As long as we state clearly what are the use cases/advantages of the different operators, I think it's reasonable to have more API Operator supported.

Does the BaseOperator just use whatever library is installed in the workers? So in theory I can't run 2 projects with different requirements? And the PythonVirtualOperator will have its own requirements installed per Task level. Is this correct?

The one thing that I think we need more consideration is that KedroOperator is using BaseOperator, but the new template will be PythonVirtualOperator. I think what we need here is a more generic KedroBaseOperator then we have different implementations for corresponding AirFlow API.