kedro-org / kedro-plugins

First-party plugins maintained by the Kedro team.
Apache License 2.0
94 stars 90 forks source link

How to use `IncrementalDataset` with non file-based datasets? #471

Open astrojuanlu opened 11 months ago

astrojuanlu commented 11 months ago

Description

By looking at the IncrementalDataset documentation and its source code it's not clear to me how can I use the IncrementalDataset with an underlying dataset that is not file-based, for example a dataset that fetches data from a REST API.

Context

I'm trying to build an EL pipeline that Extracts data from a REST API and Loads it to a Delta table. In general I would like the pipeline to fulfill some properties:

  1. Efficient: Never extract all the data (specify a starting point).
  2. Idempotent: If I execute it twice, it should not fail.
  3. Incremental: Do not extract data that has already been loaded.

[!WARNING] This means that "reproducibility" in the sense of "every time I run the pipeline it should return exactly the same result" is explicitly a non-goal. If I'm already breaching a Kedro design principle then I'd rather debate that separately, and I would be very interested in knowing how one can build a standard E(T)L(T) data pipeline in Kedro like the ones achievable with Fivetran, Meltano, Airbyte etc.

The target is a custom Polars DeltaDataset I wrote to work around #444, which calls pl.DataFrame.write_delta with mode="append". Alternatively, I could have used IncrementalDataset + polars.EagerPolarsDataset with file_format: parquet, but I would lose the advantages of using the Delta format.

The source is a REST API, but I can't use kedro_datasets.api.api_dataset.APIDataset because I want to paginate the results, so I defined my own.

It's clear that the extraction needs to have some sort of checkpoint to be truly incremental. There are two ways of doing this that I can think of:

flowchart TD
    E[fa:fa-cloud API] --> T(Transform) --> |write| L[fa:fa-database Delta table]
    L --> |checkpoint| T

In my current implementation, I have addressed this by defining a read-only version of the target.

https://github.com/kedro-org/kedro-plugins/blob/0923bc5d7e0853d9f92f5c0f2618fe8789b9b019/kedro-datasets/kedro_datasets/partitions/incremental_dataset.py#L201-L205

And this does not sit well with how the REST API works, hence this issue.

Possible Implementation

Possible Alternatives

datajoely commented 11 months ago

So I'm going to add my wider thinking regarding Kedro's relationship with the principles of CRUD.

Conceptually Kedro fits neatest into a box when you do the following operations: READ,CREATE, OVERWRITE. You can also argue that APPEND falls into this category, but it blurs the boundary of STATE. This whole conversation comes down to state, because Kedro and the user's assumptions on reproducibility don't hold the minute the state of the underlying data changes.

Now the other point to emphasise is that in production use-cases the underlying data state will be outside of your control and change beneath you either way.

UPDATE and DELETE are even more complex than APPEND since they change this concept of state in more subtle and awkward ways. They make a lot of sense in a traditional OLTP system, but also have their place in an OLAP one as well.

The first implementation of spark.DeltaTableDataSet triggered a comprehensive discussion on the associated PR covers much of this as well. Please read that, but I also want to point your attention to this workflow which is in the docs.

We called this the "out of DAG Kedro action", in this example an update is performed within a Node, but in order to get the nodes to topologically sort correctly we needed to create a dummy MemoryDataSet which passed around so that the execution order is preserved. IIRC we even discussed a noop dataset type at the time. This is also touches a wider problem for any remote SQL action where since the DAG can't see it we have no idea if it happens in the Kedro lineage.

Delta merge(), update(), delete() workflow

As with many topics at the edge of Kedro's design decisions - this relates to dynamic pipelines, specifically conditional logic. To take advantage of the UD of CRUD one needs to make decisions because of some logical condition defined by the state encoded within the underlying data. Performing an update or delete often requires a branching strategy in terms of what needs to be done next - Kedro is too rigid with this today.

The distinction between DataSets and Node functions also gets blurred here as you may need to pass some Python state between them. In the example above of a about keeping an internal checkpoint pointer we get into the game of storing some sort of state which can be picked up in the next run. Whilst that can be constrained to just a dataset catalog entry a related request is to parameterise kedro run against a particular date field...

astrojuanlu commented 11 months ago

This whole conversation comes down to state, because Kedro and the user's assumptions on reproducibility don't hold the minute the state of the underlying data changes.

This is the crux of the matter I think. Where are Kedro's assumptions or opinions on reproducibility stated?

datajoely commented 11 months ago

Where are Kedro's assumptions or opinions on reproducibility stated?

Whilst, I personally think we should introduce conditionals etc, you can see how these constraining the combinatorial complexity keeps things simple, straightforward and reproducible. To my knowledge we've never explicitly written this down, but I've always felt that this was always implied by the general vibe of pure functions + DAG + static catalog = reproducible

astrojuanlu commented 11 months ago

To my knowledge we've never explicitly written this down

I hope we can get to that soon

but I've always felt that this was always implied by the general vibe of pure functions + DAG + static catalog = reproducible

Given that all the I/O logic is in the dataset, I think this is too handwavy. You can have pure functions and a DAG and a catalog with static references, but if the dataset code does funky stuff, the pipelines will not be reproducible.

I think they keyword here is "idempotent", which is compatible with UPSERT/MERGE database operations and therefore with ETLs.

I'll keep asking for other opinions.

merelcht commented 10 months ago

This issue brings up some very interesting questions. When it comes to reproducibility in Kedro, I have personally always assumed that running a pipeline with specific input should lead to the same output, unless something has changed. That's also the assumption we've held when designing experiment tracking. However, I don't think we've recently had a conversation about that and I think it would be a good idea to go over the core Kedro principles again with the team. + write down the assumptions we hold but have never made explicit.

But to go back to the original question of using IncrementalDataset with non file-based datasets. I must admit that my understanding of the IncrementalDataset is a bit superficial. But that dataset is based on PartitionedDataset so I'm wondering if it is at all suitable for something like a REST API, because that's not really partitioned data at all right? Perhaps what we need is an incremental dataset that is not partitioned based?

astrojuanlu commented 10 months ago

Thanks @merelcht for chiming in!

But to go back to the original question

Yes, I think this thread has branched a lot and that's my fault. I'll set up a separate conversation later on.

But that dataset is based on PartitionedDataset so I'm wondering if it is at all suitable for something like a REST API, because that's not really partitioned data at all right? Perhaps what we need is an incremental dataset that is not partitioned based?

That's my impression too, yes: that the inheritance relationship IncrementalDataset < PartitionedDataset should be broken.

Maybe this should go into https://github.com/kedro-org/kedro/milestone/12.

merelcht commented 10 months ago

Maybe this should go into https://github.com/kedro-org/kedro/milestone/12.

Or maybe even better https://github.com/kedro-org/kedro/milestone/40 ?

astrojuanlu commented 9 months ago

Both are in kedro-org/kedro... Should I move the issue over to the central repo?

astrojuanlu commented 9 months ago

Opened https://github.com/kedro-org/kedro/issues/3578 to channel the discussion about Kedro for data pipelines.

MinuraPunchihewa commented 4 weeks ago

Is there any work that is being done here? I would love to be involved.

I have developed a custom dataset recently to cater to incremental loads from Unity Catalog tables in Databricks. The logic is pretty simple and I suppose it could be applied to any data source. Essentially, we maintain a table to store metadata; this table is created by the custom dataset if it does not exist already and is a Delta table in this situation, however, I expect that it can be converted to something a little more standard such as an SQLite database or even a file. We can ask users to provide a location where it can be stored. For each table that the dataset will load from, a parameter called checkpoint_column has to be defined. This is expected to be either a date or timestamp column that represents when the data was loaded/captured. When some data is loaded a record is added to the metadata table; simply the name of the table against the checkpoint, which is the max of the checkpoint_column. The next time data is loaded from this table, it will include only the records after the latest checkpoint.

This worked for us because each table we had included a column that represents when it was ingested. However, I think we could potentially maintain a incremental ID column as a checkpoint as well.

I wonder if we can create a range of datasets for this purpose based on some kind of common interface: IncrementalTableDataset, IncrementalAPIDataset etc.?

astrojuanlu commented 4 weeks ago

Thanks a lot for sharing your insights @MinuraPunchihewa !

I think it's clear that there are multiple ways of solving this problem, as your efforts attest (and we know of other users implementing workarounds to do incremental loading, see https://kedro.hall.community/risk-of-loading-full-dataset-instead-of-incremental-updates-JQn11wCgFLpT#1e66d061-a057-4dc3-87b1-8edbae48806c)

We haven't started looking at this problem in a more holistic way just yet.

noklam commented 3 weeks ago

The original issue brings APIDataset as an example, but I think focusing on tables will be much more valuable.

MinuraPunchihewa commented 3 weeks ago

The original issue brings APIDataset as an example, but I think focusing on tables will be much more valuable.

Agreed.