kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.88k stars 893 forks source link

Universal Kedro deployment (Part 2) - Offer a unified interface to external compute and storage backends #904

Open Galileo-Galilei opened 3 years ago

Galileo-Galilei commented 3 years ago

Preamble

This is the second part of my serie of design documentation on refactoring Kedro to make deployment easier:

Defining the feature: The need for a well identified place in the KedroSession and the template for managing connections

Identifying the uses cases: perform operations with connections to external backend / servers

As Kedro's popularity is increasing and more and more people are using it in an enterprise context, they tend to have more needs to interact with external/legacy systems. The main use cases are:

  1. Load data in memory from this backend (e.g. SQLTableDataSet)
  2. Save data from memory to this backend (e.g. SQLTableDataSet)
  3. Perform computation in this backend (e.g. SparkContext, SQLQueryDataSet)
  4. Use a local client to connect to a remote server and perform/trigger operations (kedro-mlflow MlflowArtifactDataSet, kedro-dolt, kedro-neptune...)

As of now (kedro==0.17.4) Kedro offers very limited support for these use cases, and it hurts maintenability and transition to deployment, mainly because it is hard to modify these backend connections credentials for production.

Overview of possible solutions in kedro==0.17.4

Above use cases are currently handled on a per-backend basis, with very different choices in the implementation. Hereafter is a non exhaustive list of examples where a connection to a remote serve is instantiated:

Backend Current "best" solution kedro object Connection object Connection Registration location Access within kedro objects
Spark Kedro documentation ProjectContext SparkSession.builder.appName() ProjectContext.init get singleton inside nodes with getOrCreate
SQL Two datasets SQLQueryDataSet to perform computation and SQLTableDataSet to load existing data AbstractDataSet sqlalchemy.engine() inside pd.read_sql_query SQLTableDataSet._load session-> context -> dataset -> load(): no easy access inside nodes
SAS No official plugin, but my team has built an internal plugin which mimics SQL behaviour with SASTableDataSet and SASQueryDataSet AbstractDataSet saspy.SASsession() SASTableDataSet.init session-> hook: no easy access inside nodes
Big Query Two datasets that mimic SQL ones AbstractDataSet bigquery.Client() GBQQueryDataSet.init session-> context -> dataset -> load(): no easy access inside nodes
Mlflow kedro-mlflow, pipelineX Custom configuration class in the plugin + Custom Hooks (for both) mlflow.tracking.MlflowClient() MlflowPipelineHook.before_pipeline_run session-> hook: no easy access inside nodes
Dolt kedro-dolt Custom Hook pymysql.connect() DoltHook.init session-> hook: no easy access inside nodes
Neptune kedro-neptune Custom AbstractDataSet neptune.init() NeptuneRunDataSet.init session-> context -> dataset -> load(): no easy access inside nodes
Dataiku kedro_to_dataiku Instantation on the fly where needed dataiku.api_client() Instantation on the fly where needed import package
Rest API APIDataSet APIDataSet requests.request(**self._request_args) APIDataSet._load session-> context -> dataset -> load(): no easy access inside nodes

We can make the following observations:

Understanding the limitations: why these solutions are not sustainable in the long run

Limit 1 : Computation should be part of the nodes instead of catalog for maintenance

Many computation are performed in the catalog while they belong to the nodes: according to the principles of kedro because only I/O operations should take place in the catalog. This causes several maintenance issues:

this makes Kedro hard to use for DataWarehousing not written in pure python as required in #360.

Limit 2 : Performance issues arise because of current implementation

Perfoming calculations inside the DataCatalog raises several other issues:

Limit 3: It is hard to bypass existing issues

As of kedro==0.17.X, it is hard to modify the existing implementation for your custom use case without the DataCatalog because:

The current solutions are the following, and None is satisfying:

Thoughts and suggestions on API design changes

Desired properties for remote connections

Here is a minimal set of property I can think of (feel free to add some if you think some are missing):

Benefits for kedro user

API design suggestion

I suggest to have an API very similar to Kedro's DataCatalog to manage external "engines" (an engine is a client to interact with a remote server, a database or any other backend).

We would have the following correspondance: data clients for external connections
AbstractDataSet AbstractEngine
DataCatalog EngineCatalog
catalog.yml engines.yml

An AbstractEngine would implement the following methods:


class AbstractEngine:
    def __init__(self, init_args, credentials)
        # store parameters as attributes here
        self.init_args=init_args
        self._credentials=credentials

    def get_or_create(self):
        # [the connection is a singleton](https://python-3-patterns-idioms-test.readthedocs.io/en/latest/Singleton.html)
        # this functions tries to retrieve the connection it, or create if if the singleton is not instantiated yet
        # the goal is to lazily create the connection object, but not connect to the backend yet
        # the goal is to call it at each call, so we instantiate it only when needed
        return my_connection_object(**self.init_args, self._credentials)

    def ping(self):
        # (optional) eventually connect to the backend and check if connection is healthy?
        pass

    def describe(self):
        # (optional) some information of the connection?
        pass

On a per project basis, a dedicated configuration file wil enable to declare the connections in a catalog-like way. As for catalog, anyone can create custom engine that are intened to be used inside nodes. The following notations are not well defined, but I guess they are close enough to the DataCatalog to be self-explainable:

#conf/local/engines.yml

my_sql_connection:
    type: sql.SQLAlchemyEngine
    init_args: 
        arg1: value1 #whatever you want 
    credentials: my_sql_creds  # retrieved from `credentials.yml` to leverage existing mechanism

my_spark_session:
    type: spark.SparkEngine
    init_args: 
        arg1: value1 #the .config() args? API to be designed more precisely
    credentials: my_spark_creds  # retrieved from `credentials.yml` to leverage existing mechanism

my_neptune_client:
    type: neptune.NeptuneEngine
    credentials: my_neptune_creds  # retrieved from `credentials.yml` to leverage existing mechanism

# whatever engine you need, e.g. to interact with mlflow, requests, any other sql backend, google big query...

When session.load_context(), the EngineCatalog object is instantiated and accessible like pipelines and catalog:

# example.py
with KedroSession.create(my_package):
    print(session.engines)
    # > kedro.engines.EngineCatalog object
    print(session.engines.list()
    # > ["my_sql_connection","my_spark_session", "my_neptune_client"]  # similar to DataCatalog's API

The key part is that these connection objects are accessible from the nodes as for the catalog, hereafter is an example with sql:

# src/<project_name>/pipelines/<my_pipeline>/pipeline.py

def create_pipeline(**kwargs):
# src/<project_name>/pipelines/<my_pipeline>/pipeline.py

def create_pipeline(**kwargs):
    return Pipeline(
    [node(
        func=func_for_sql,
        inputs=dict(
            engine="my_sql_engine",
            args="params:param1"
            ),
        outputs="my_sql_table"
    )]
)

Important note for developers: the key idea is that the connection is lazily instantiated and created at the first call and stored in a singleton; any further call will reuse the same connection

This refers to a custom func_for_sql function declared by the developer:

def func_for_sql(engine, args):
    query="""
        create table as ... from ...
    """
    res=engine.execute(query)
    return res

Note that there is maximum flexibility here: you don't have to load the results in memory, you can use python wrappers to write your SQL query instead of writing a big string (and benefits autocompletion, testing...): in short you can do anything you can code.

This feature request is very similar to the SQLConnectionDataset described in #880 and the associated PR #886. Above example focuses on SQL because it seems to be one of the most common use case, but I hope it it clear that this implementation would cover much more use cases (including, but not limited too, SparkSession and tracking backends which also seem to be common use cases). As a side consequence, it also helps minimizing the catalog size which partially solves somes issues discussed in #891.

Other impacts

If this approach was kept, I would incline to remove all datasets which perform computations, including all xxxQueryDataSet (where xxx=SQl, GBQ...) eventually APIDataSet (not sure about this one) and the documentation bout SparkContext. We should keep the xxxTableDataSet which only perform I/O operations and no computations though.

Other possible implementations

  1. Instead of creating new "Engine" objects, we could only create new catalog objects to instantiate these connections, but since these objects are not real dataset (they don't have save methods), I think it is more consistent to separate them from the catalog.yml. Furthermore, we may want to pass an engine connection to a DataSet object (say a SQLTableDataSet) to be backend independent, and this assume we have instantiated the EngineCatalog object before the DataCatalog.
  2. We could create a plugin to manage these objects quite easily; however, the interaction with the session (when are these "engines" instantiated ? How to interact with nodes in a catalog-like way?) may be difficult and need specific tricks.
datajoely commented 3 years ago

Hi @Galileo-Galilei - as always thank you for such a fantastic contribution to the project. I too have been thinking about this and am very keen to make this work. I need a bit more time to digest this but - there is some good work to do here!

datajoely commented 3 years ago

Hi @Galileo-Galilei - this is a great piece of work and thank you for putting so much time and care into this.

I'm going to break down my thinking into two distinct parts:

  1. Moving to a singleton pattern for all datasets objects which work via a remote connection and are capable of performing remote execution
  2. Should we as Kedro allow for arbitrary execution of statement that we have no visibility of in terms of lineage and has no guarantee of being acyclic. That being said, it would be sweet to work with SQL data without serialising data locally

Regarding the first point, ✅ we've actually been discussing internally and are 100% aligned that we need to implement this - great minds think alike 😛 . I'm quite a fan of your engines.yml pattern - we will do some prototyping on our side to see what the least breaking change looks like.

Now the second point is a bit trickier 🤷 - I had been pushing for this on our side for a while, but have recently come around to @idanov 's idea that this breaks too many of our core principles to support natively. In this world - our pipelines are no longer guaranteed to be reproducible and I think it becomes hard for us to argue the DAG generated by kedro-viz reflects reality. As it stands we are also not planning to merge the #886 into Kedro for these reasons.

For transformations, SQL support in Kedro is poor - we can use it as a point of consumption, but serialising back and forth to Python to perform transformations is silly. Your proposed solution would allow users to leverage SQL as an execution engine, but I struggle to fit it into Kedro's dataset-orientated DAG.

If you want to do transformations in SQL it's hard for me not to recommend using something like dbt and their ELT pattern for the data engineering side of things and focus on Kedro for the ML engineering parts of your workflow. They solve this lineage point by building a DAG via jinjafied SQL ref() and source() macros. I've spent a lot of time thinking about how Kedro should work in this space and am prototyping some ways that we can better compliment such setups.

I'm keen to see what the community thinks here - the former data engineer in me wants this functionality to just get things done, but the PM in me feels this will be a nightmare to support.

Galileo-Galilei commented 3 years ago

Hello @datajoely, thank you very much for the answer.

Good to see we're align on the first point :wink:

Regarding the second point, I think we may have slightly different objectives:

Maybe the best solution is to create a plugin (not integrated to the core framework) to enable this possibility, with adequate warnings on the risks of doing so. The MVP for this plugin would only contain new datasets like SQLConnectionDataSet described above and not implement the whole "engine" system, waiting for you to come up with something more integrated to the core framework. This would be very easy to implement and maitain on the short term, and easily reversible in case you implement something better in the future, even if it would not be as handy as the "engine" design pattern.

P.S: Thanks for pointing dbt out, I'll have a look!

datajoely commented 2 years ago

Some concerns discussed here will be addressed by https://github.com/kedro-org/kedro/pull/1163

deepyaman commented 1 year ago

I wonder if using something like https://github.com/fugue-project/fugue (or https://github.com/ibis-project/ibis?) under the hood makes sense if want to support engines in this way. Both really took off well after @Galileo-Galilei's initial post, of course. :)