kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
10.03k stars 906 forks source link

Allow injecting data into a `KedroSession` run #2169

Open idanov opened 1 year ago

idanov commented 1 year ago

Description

Currently there's no easy way to inject data into a KedroSession interactively. The only way data is being loaded is through the DataCatalog. This makes it hard to embed a Kedro project into an existing Web server or a tool like DataRobot, where the entrypoint is owned by another application and the initial data is provided preloaded (see https://github.com/datarobot/datarobot-user-models/blob/master/model_templates/python3_sklearn/custom.py#L10 for example, the file custom.py is executed and the function transform already has two artifacts preloaded, the model and the data).

In effort to make the interactive sessions and potentially different deployment modes (e.g. calling a Kedro run from a FastAPI server with some data provided in an HTTP request) easier to work with, we should think of a way to allow injecting data into a KedroSession. Currently the only way to do that is through a Hook, which is clunky and not very intuitive, also not at all documented.

datajoely commented 1 year ago

The other potentially tangetal user issue is when users do go down the dynamic catalog route they need to do it in two places:

Despite our preference in avoiding dynamic pipelines, enough users ask this that I think we need to come up with some way that DRY can be achieved or at the very least you only have to define this in one place

jmholzer commented 1 year ago

Tech design 01/02/23:

Proposed implementation 1:

Proposed implementation 2:

Data that user provides overrides any catalog definition with the same name.

The naming of any additional parameter has implications for breaking changes and should be considered carefully.

noklam commented 1 year ago

Just have this top of my head, currently most of the kedro run arguments are available in session.run, except env and extra_params which only available in KedroSession.create. The extra_params does not do anything special other than being saved by the session store. Briefly mention this in this comment

antonymilne commented 1 year ago

Looking through the code, it seems that extra_params was renamed runtime_params in the config loader when Jiri did his work on it 18 months ago. But extra_params is still called extra_params in kedro context. Looking through the configuration Miro board I don’t think we had a concrete plan to rename this but think we wanted to? @idanov @merelcht So in terms of naming I’d suggest:

Implementation 1

  1. non-breaking: add new argument runtime_data (or runtime_datasets?) specifically for input datasets (not parameters)
  2. breaking: rename extra_params to runtime_params

Implementation 2

  1. non-breaking: allow extra_params to take in datasets also but make sure that config_loader only looks at the ones that are parameters
  2. breaking: rename extra_params to runtime_data that can take both parameters and input datasets
antonymilne commented 1 year ago

@noklam note that extra_params aren't just stored in the session store and forgotten about. They are also passed into settings.CONFIG_LOADER_CLASS (as described above) and settings.CONTEXT_CLASS (which is what enables CLI parameters to override catalog ones)

takikadiri commented 1 year ago

At our organisation we often needed to run a kedro session from outside kedro, for example from streamlit app or a rest api. For now, we don't manage to do that, we instead run the web app server inside of kedro, more specifically inside a custom kedro runner. Here is our current reasons for doing that :

We run the web app server inside kedro by using the runner abstraction. Our CLI instantiate our custom runners using a runner.yml before giving it to the kedro session. We have then a way to embed a whole diverses kind of configurable applications inside kedro (web and batch oriented), those apps dictate how and when we run the pipeline and with which elements of the catalog. We managed to get <100 ms latency with this setup.

By the way this pattern was lately advised for running kedro pipelines in Dask I believe that intoducing the runner API (with a sort of runner.yml) will open up a whole new kind of plugins/runners that can be proposed by the community (a rest api that serve kedro pipelines, a spark rdd runner, a dask runner, ...). We already advocate this pattern with my collegue here. Even IncrementalDataset and PartitionDataSet logics can be more naturally implemented in a runner.

I understand that this way of instrumenting kedro runs is not generalizable in all situations (Datarobot App as mentionned here). We are interested to see more progress in making kedro more integrable in other applications. I'm just gving my feedback regarding running kedro pipeline from an external app, as i strugled with some latency and performance considerations. Maybe the kedro session need some more steps/methods in his lifecycle (in addition to more attributes like mentionned). A session materialization for example can be used by the web app to materialize some attributes of the session at initialization time, so it can inject the rest of attributes (the moving parts) at HTTP request time.

noklam commented 1 year ago

@takikadiri Thank you for your feedback, this is very interesting! There is an idea about reducing the overhead of KedroSession creation, it's more relevant for Web application. Might be similar idea to your point

A session materialization for example can be used by the web app to materialize some attributes of the session at initialization time, so it can inject the rest of attributes (the moving parts) at HTTP request time.

Can I get some clarification on this point? How does embedding a web server inside a KedroSession help this? Does these HTTP trigger a new pipeline run or it simply fetch some data from catalog and return something?

We wanted to avoid creating and running kedro sessions in each HTTP request call. There is a lot of overhead included in the kedro session creation and session running (context creation, catalog creation, hooks, ...).

takikadiri commented 1 year ago

@noklam the embeded web app start inside a runner. At that time the KedroSession has already been created and the context, pipeliene and catalog are also already created.

Inside the runner, the web app have access to the pipeline and the catalog and will decide how to use them. At each HTTP call the web app replace a given input dataset from the catalog with a MemoryDataSet that contains the request body and run the pipeline using a standard runner (sequential runner for example). The HTTP response is made from a given output dataset (can be infered if the only free output of the pipeline).

All the free inputs datasets are materialized as MemoryDataSet at web app init time, except the input dataset. The input dataset, output dataset, and other web app specific configs are declared in a runner.yml

noklam commented 1 year ago

@takikadiri Did I understand this correctly?

  1. kedro run --runner CustomWebServerRunner, which itself launch a webserver and listening to API calls
  2. Upon request, it triggers a particular pipeline with SequentialRunner

The main job of the WebServerRunner isn't run any pipeline but launch a webserver and created the catalog,pipeline objects. Why these input, output need to be define in runner.yml instead of catalog.yml?

takikadiri commented 1 year ago

@noklam yes exactly, the WebServerRunner don't run pipeline but launch a webserver and alterate the pipeline and catalog at each HTTP call

The input and output datasets are defined in the catalog. We use a runner.yml to define the args of our custom runners, Here is an example of the runner.yml :

rest_api_runner:
  type: customr_runners.CustomWebServerRunner
  endpoint: /your_desired_endpoint
  input_dataset: _dataset name here_
  output_dataset: _output_dataset_name_
  gunicorn_args: 
  _more_args_ .....

dask_runner:
  type: custom_runners.CustomWebServerRunner
  _some_args_ .....

The runners are then used this way : kedro run --pipeline your_pipeline --runner rest_api_runner

Our custom CLI instantiate the selected runner using the runner.yml. Today, the native kedro CLI do not support runners that needs args for their initialization.

With this design, our data scientists develop one pipeline that can be used as a batch or as a service, with a simple runner selection. At the same time, it separate responsabilities, as we don't want data scientists to develop the web app (for performance, reliability and security reasons), they have a yaml API to declaratively define their web app though.

We start the web server in a runner and not the CLI because we want one generic CLI for all our kedro apps. When deploying ou kedro apps, the platforms that run our apps run it as python packages with python -m main_module We judged that it's not optimal to have a different CLI per app, it compromises our ability to automate deployments.

I hope this helps, we welcome any feedback on this design, and we're looking forward to seeing kedro more integrable with other services.

`

noklam commented 1 year ago

This is all very neat. I like that it mitigates the cost of session creation and uses Kedro CLI consistently as an entry point. Initially, I found having a runner that launches a web server but not running a pipeline a bit weird, but I understand this could be the best solution to improve the performance. I am a bit nervous that it starts to feel that this is highly coupled with Kedro. Conceptually it's more natural to launch a web server and run Kedro pipeline, but I need more time to think about it.

yes exactly, the WebServerRunner don't run pipeline but launch a webserver and alterate the pipeline and catalog at each HTTP call With this design, our data scientists develop one pipeline that can be used as a batch or as a service, with a simple runner selection.

How would a batch pipeline be different from service pipeline? I imagine they will be different as the service pipeline got at least two more node/datasets for the HTTP request body & response. How would the webserver change the pipeline, does the parameter --pipeline pipeline_a truly reflect the pipeline being run?

kedro run --pipeline pipeline_a --runner rest_api_runner

takikadiri commented 1 year ago

It is indeed highly coupled with kedro, for the better or worst :) It can be ok to embed a thin web app that somehow extend kedro (serve a pipeline), but i agree that it can feel a bit weird when the web app have a range of features that have nothing to do with kedro but need kedro just in a specific feature (datarobot app or streamlit app for example),

How would a batch pipeline be different from service pipeline? I imagine they will be different as the service pipeline got at least two more node/datasets for the HTTP request body & response. How would the webserver change the pipeline, does the parameter --pipeline pipeline_a truly reflect the pipeline being run?

The batch pipeline and service pipeline are the same. --pipeline pipeline_a is the pipeline that is natively given to the runners. When running, the CustomRunner do not change the selected pipeline, but can change if needed some attributes of the pipeline object and catalog object.

While waiting for kedro to be more integrable with other applications, we'll keep using it as the entry point that run our thin apps and manage their lifecycle, and above all that give us great abstractions for business logic code authoring.

datajoely commented 1 year ago

Thanks for explaining your thinking @takikadiri so clearly. We built a version of this in the early days of kedro and eventually deprecated it since it wasn't being used! This is super validating that sophisticated users like yourselves need this and now is the time for us to think about the design we need to solve this problem space.

Please keep in touch and let us know what you learn as it will steer how we build this!

noklam commented 1 year ago

Looking through the code, it seems that extra_params was renamed runtime_params in the config loader when Jiri did his work on it 18 months ago. But extra_params is still called extra_params in kedro context. Looking through the configuration Miro board I don’t think we had a concrete plan to rename this but think we wanted to? @idanov @merelcht So in terms of naming I’d suggest:

Implementation 1

  1. non-breaking: add new argument runtime_data (or runtime_datasets?) specifically for input datasets (not parameters)
  2. breaking: rename extra_params to runtime_params

Implementation 2

  1. non-breaking: allow extra_params to take in datasets also but make sure that config_loader only looks at the ones that are parameters
  2. breaking: rename extra_params to runtime_data that can take both parameters and input datasets

I agree with this, especially after our discussion about OmegaConfigLoader and #2530.

takikadiri commented 1 year ago

So I took this thinking further by trying to solve this problem space (Integrating kedro project in a larger App/system, dynamic catalog, multiple runs without session initialization overhead, injecting data into session, ...). I propose a solution called Kedro Boot.

Kedro Boot Introduction

Kedro Boot is a framework that streamlines the integration of Kedro projects with various applications. It serves as a bridge that shape the interactions between an application and Kedro's resources

The integration can be performed in two modes :

  1. Embedded Mode : This mode involves using Kedro Boot to embed an application inside a Kedro project, leveraging kedro's entry points, session and config loader for managing application lifecycle. It's suitable for use cases when the application is lightweight and yet to be developed by a team that already invest in kedro architecture.

  2. Standalone Mode : This mode refers to using Kedro Boot in an external application that has its own entry point. In this case, the external application interacts with Kedro resources through Kedro Boot. It's suitable for large application that already exist before the kedro project, and may have been developed by a different team (like the case of this issue: A Datarobot app that need to run dynamically some kedro pipelines)

In both modes the application can perform multiple pipeline runs with dynamically rendered catalog.

Example of usages :

This let data scientists concentrate in the business logic (analytics, ML, scientific compute, ..) while having a way to distribute their work as a ready to consume "function" to be integrated in a wider App/system by Engineers (DE/SWE)

Technical Design :

For each integration mode, i describe the technical design in two aspects, the system behavior and the user interface. I also provide some code examples to demonstrate the concept in action.

Embeded Mode :

System behavior :

I break down the execution flow into 5 temporal stages :

  1. Kedro CLI run time:

A kedro boot app start with this command: kedro <kedro_boot_app_name> run <kedro_boot_app_args> <kedro_run_args>. It implement a run command using a kedro_boot_cli_factory. The run command have these properties :

  1. Kedro session run time:

When kedro session run, it create/materialize pipeline and catalog objects, then run the pipeline using the KedroBootRunner.

  1. Kedro runner run time:

The KedroBootRunner create a KedroBootSession using the the live/materialized pipeline and catalog. Then run the KedroBootApp while giving it the KedroBootSession.

  1. Kedro boot app run/startup time:

At startup time, the KedroBootApp compile KedroResources (pipeline namespaces with optionally inputs,outputs if needed to be dynamically rendered) needed for performing it's functionnalities. The compilation process break down the catalog into these classes of datasets :

If the compilation is not triggered by the KedroBootApp (The app does not need to dynamically render the catalog or to select a specific pipeline namespace), the compilation will be lazily performed before the first iteration using default values.

  1. Kedro boot app iteration time

At this point KedroBootApp perform multiple runs using KedroBootSession. At each iteration the KedroBootApp provide AppResource that contains potentially Data indexed by pipeline namespace, this data is injected into the KedroBootSession. KedroBootSession use these AppResource to render a new couple of (pipeline, catalog) using the compiled KedroResource as the template/base, then run the pipeline using a kedro runner.

Here is a sequence diagram that illustrates the execution flow

sequenceDiagram

    participant KedroBootCli
    participant KedroSession
    participant KedroBootRunner(AbstractRunner)
    participant KedroBootApp
    participant KedroBootSession
    participant KedroResource
    participant AppResource
    participant CatalogRenderer

    KedroBootCli->>+KedroSession: create(**run_args)
    KedroSession-->>-KedroBootCli: session
    KedroBootCli->>+KedroBootRunner(AbstractRunner): create(config_loader, app_class, app_args)
    KedroBootRunner(AbstractRunner)->>+KedroBootApp: create()
    KedroBootApp-->>-KedroBootRunner(AbstractRunner): kedro_boot_app
    KedroBootRunner(AbstractRunner)-->>-KedroBootCli: kedro_boot_runner
    KedroBootCli->>+KedroSession: run(kedro_boot_runner, **run_args)

    KedroSession->>+KedroBootRunner(AbstractRunner): run(pipeline, catalog, hook_manager, session_id)
    KedroBootRunner(AbstractRunner)->>+KedroBootApp: run(pipeline, catalog, hook_manager, session_id)
    KedroBootApp->>+KedroBootSession: create(pipeline, catalog, hook_manager, session_id
    KedroBootSession-->>-KedroBootApp: kedro_boot_session

    KedroBootApp->>+KedroResource: create(namespace, inputs, outputs)
    KedroResource-->>-KedroBootApp: kedro_resources
    KedroBootApp->>+KedroBootSession: compile_resources(kedro_resources)
    KedroBootSession->>+KedroResource: compile(pipeline, catalog)

    rect rgb(240, 246, 252)
    loop 
        KedroBootApp->>+AppResource: create(namespace, datasets, template_params,catalog_params)
        AppResource-->>-KedroBootApp: app_resource
        KedroBootApp->>+KedroBootSession: run(app_resource)
        KedroBootSession->>+KedroResource: get_kedro_resource(app_resource.namespace)
        KedroResource-->>-KedroBootSession: kedro_resource
        KedroBootSession->>+CatalogRenderer: render(kedro_resource, app_resource)
        CatalogRenderer-->>-KedroBootSession: catalog
        KedroBootSession-->>-KedroBootApp: run_outputs
    end
    end

    KedroBootApp-->>-KedroBootRunner(AbstractRunner): Any
    KedroBootRunner(AbstractRunner)-->>-KedroSession: Any
    KedroSession-->>-KedroBootCli: Any

User interface:

A kedro boot app developer should implement two things. An AbstractKedroBootApp and a run command

Code example :

Here is an Example of a simplified fastapi app, embeded in a kedro projet using kedro_boot. It's not a realistic app, it's intented to show some kedro boot interfaces in action.

fastapi_app_factory.py

from fastapi import FastAPI
from pydantic import BaseModel
from kedro_boot import AppResource

def fastapi_app_factory(kedro_boot_session):
    # Create an instance of the FastAPI class
    app = FastAPI()

    # Define a Pydantic model for the JSON data
    class Item(BaseModel):
        name: str
        description: str = None

    # Create a POST endpoint to receive JSON data
    @app.post("/items/")
    def process_item(item: Item):
        # Replace the kedro datasets "items" with this json data, then run the pipeline. Would be more clean if we used the dependency injection mechanisms for injecting the kedro_boot_session into this endpoint.
        run_result = kedro_boot_session.run(AppResource(namespace="namespace_one", datasets={"item": item}, template_params=None, catalog_params=None)) # We deliberately show the complete args of AppResource
        return run_result

    # Endpoint with path parameters
    @app.get("/books/{book_id}")
    def process_book(item_id: int):
        # Render the Jinja Template from the Path parameter received by the endpoint. Here we used another pipeline namespace to perform the logic of this endpoint 
        run_result = kedro_boot_session.run(AppResource(namespace="namespace_two", datasets=None, template_params={"item_id": item_id}, catalog_params=None))
        return run_result

    return app

catalog.yml

namespace_one.item:
  type: pandas.CSVDataSet
  filepath: data/01_raw/item.csv

namespace_two.book:
  type: pandas.SQLQueryDataSet
  sql: SELECT * from BOOK where BOOK_ID = [[ book_id ]]

fastapi_kedro_boot_app.py

from kedro_boot import AbstractKedroBootApp, KedroResource
import uvicorn
from .fastapi_app_factory import 

class FastApiBootApp(AbstractKedroBootApp):

    def _run(kedro_boot_session: KedroBootSession):

        # Compile kedro resources (pipeline and catalog) by declaring the resources that will be rendered at iteration time (HTTP call time).
        kedro_boot_session.compile([KedroResource(namespace="namespace_one", inputs=["item"], outputs=["transformed_item"]), KedroResource(namespace="namespace_two")])

        # Create fastapi app while injecting kedro_boot_session. 
        fastapi_app = fastapi_app_factory(kedro_boot_session)

        # An embeded app leverage config_loader to manage it's configs
        fastapi_args = self.config_loader["fastapi"]

        uvicorn.run(fastapi_app, **fastapi_args)

fastapi_cli.py

import click
from kedro_boot import kedro_boot_cli_factory

app_params = [
        click.option("--app", type=str, help="fastapi app"),
        click.option("--host", type=str, help="fastapi host"),
        click.option("--port", type=int, help="fastapi port"),
        click.option("--workers", type=int, help="fastapi workers")
    ]

fastapi_command = kedro_boot_cli_factory("code_example.FastApiBootApp", app_params)

@click.group(name="fastapi")
def commands():
    pass

commands.add_command(fastapi_command, "fastapi")

Standalone Mode :

In Standalone mode, the external application control the execution flow. At some point, the developer should create a KedroBootSession by using the kedro_booter function. From that point the behavior is the same as the embdeded mode. The external application can use the KedroBootSession to compile KedroResources and to perform multiple runs using these resources.

Code example :

We can serve the same fastapi app in a standalone mode. We are going to reuse the same fastapi_app_factory.py module and the same kedro project (pipeline, catalog). Here is an example of external_app code

external_app.py

from kedro_boot import kedro_booter
from .fastapi_app_factory import fastapi_app_factory

# Some init code
#
#

kedro_boot_session = kedro_booter(project_path=project_path, kedro_run_args) # kedro_run_args are the same arguments of a kedro run cli command. User can give a package_name instead of project_path in case of packaged kedro project.

kedro_boot_session.compile([KedroResource(namespace="namespace_one", inputs=["item"], outputs=["transformed_item"]), KedroResource(namespace="namespace_two")])
fastapi_app = fastapi_app_factory(kedro_boot_session)

# The external app don't have kedro config_loader. Configs can be get in another way. We gonna skip this step in this example, and run the uvicron web server without extra configs.
uvicorn.run(fastapi_app)

Next steps

Initially, I was building a kedro-fastapi plugin. After multiple refactoring rounds of the codebase, the concepts behind kedro-boot emerged.
So now, I have kedro-fastapi and kedro-boot, both of which are nearly ready. However, they still need more testing, documentation, and a CI.
The kedro-fastapi plugin use kedro boot and focus on providing a user friendly interface and some production capabilities. Under the hood, the plugin do some mapping between FastAPI objects and kedro objects and leverage FastAPI dependecy injection to inject framework objects into the fastapi app. This allows users among others to follow the DRY (Don't Repeat Yourself) principle as closely as possible.

For now, kedro-boot is in the kedro-fastapi codebase/repo. The plugin can be released within the next 2 to 3 weeks. Let me know if it make sens to release them separately in separate repos, or should i bundle the whole in kedro-fastapi for now.

Otherwise, I'm all ears for your thoughts on the concept and design ! Feedback is welcomed

Related issues : #2182 #2879 #1846 #795 #933 #2058 #143 #1041

astrojuanlu commented 1 year ago

Hey @takikadiri , thanks a lot for the detailed writeup, really appreciated ❤️ I think it intersects with #143 and possibly other issues. See also this long series of issues opened by @Galileo-Galilei , #770 #904 #1041, which may or may not have overlap with these ideas too.

We are now focusing on the upcoming 0.19.0 release so it will take us some time to give this a proper read, but rest assured we will do it when we are ready.

astrojuanlu commented 9 months ago

This issue, among others, was mentioned on 2024-02-14 tech design, in which @Galileo-Galilei and @takikadiri showed kedro-boot https://github.com/takikadiri/kedro-boot/ there was agreement that implementing this, plus making the Session re-entrant and more lightweight #2182 #2879, would be good.

astrojuanlu commented 9 months ago

xref https://github.com/kedro-org/kedro/issues/3540

astrojuanlu commented 6 months ago

This issue was mentioned again, among others, on 2024-05-22 tech design, in which @ankatiyar walked the team through the adoption of the TaskFlow API in kedro-airflow https://github.com/kedro-org/kedro-plugins/issues/25#issuecomment-2107314350