dlt-hub / dlt

data load tool (dlt) is an open source Python library that makes data loading easy 🛠️
https://dlthub.com/docs
Apache License 2.0
2.7k stars 180 forks source link

[REST_API] Incremental processing_steps #1928

Open paul-godhouse opened 1 month ago

paul-godhouse commented 1 month ago

Feature description

Some APIs do not have incremental param but still return date fields.

Ex:

[
  {id: 4, created_at: "2024-01-01"},
  {id: 67, created_at, ""},
  ...
]

Right now, we can filter out already retrieved data by using processing_steps see example:

from dlt.sources.rest_api.typing import (
    ProcessingSteps,
)

def _filter(record):
        created_at = datetime.fromisoformat(job["created_at"])
        yesterday = datetime.now(timezone.utc) - timedelta(days=1)
        return created_at >= yesterday

    filter_step: ProcessingSteps = {
        "filter": _filter,
    }

However, I need to define manually that I don't want record older than 1 day. It would be nice to be able to have a "true" incremental loading here, same as what we can do with an incremental parameter

Are you a dlt user?

Yes, I run dlt in production.

Use case

I want to have an incremental loading for REST APIs where we can't pass an incremental parameter.

Example: Get a Job in CaptainData https://docs.captaindata.co/#c5aa0d04-240b-4205-b3bd-08d544d2b7a9

Proposed solution

No response

Related issues

No response

francescomucio commented 1 month ago

I had a chat on Slack with @paul-godhouse because his request was not clear to me.

He has an API which doesn't support the possibility to filter only the most recent records, hence it is not possible to create an incremental resource with dlt.

The second best thing would be a filter in the processing_steps to remove the old records, the value used should be, like for incremental loads parameters, the max value of a column already stored in the destination table.

I could imagine something like:

"processing_steps": [
  "incremental_filter": {
      "cursor_path": "updated_at",
      "initial_value": "2024-01-25T11:21:28Z",
  }
]

Where cursor_path is the name of the column name in the result and in the target column.

The incremental filter will query the target table to get the max value before the resource is extracted and then will build a filter like:

"filter": lambda x: x[cursor_path] > max_cursor_path_from_target