dlt-hub / dlt

data load tool (dlt) is an open source Python library that makes data loading easy 🛠️
https://dlthub.com/docs
Apache License 2.0
2.7k stars 180 forks source link

Support Multiple Timestamp Columns for Incremental Loading #2076

Closed lfpll closed 1 day ago

lfpll commented 5 days ago

Feature description

Currently, dlt only supports a single timestamp column for incremental loading. This limits the ability to handle datasets where incremental state needs to be tracked across multiple time-based columns.

E.g.: systems where delete time is different them update time.

Are you a dlt user?

Yes, I'm already a dlt user.

Use case

I have tables that have a delete_time but they do not update update_time and is framework related sugestion. And unfortunately dlt doesn't work for them

Proposed solution

Have dlt accept two columns on the incremental class and use in at least sql.

Example on SQL

where update_time > {{dlt.last_state}}
or delete_time > {{dlt.last_sate}}

Related issues

No response

rudolfix commented 4 days ago

@lfpll the easiest way to do it is to create view with a computed column MIN(update_time, delete_time) and use it for incremental. alternatively you can use query adapter and table adapter args to modify table shape and SQL query (see #2070 with example test doing that) it is also possible to define Incremental on multiple columns by using JSONPath and custom last value function but you'll be better off by creating a view or modify query

lfpll commented 3 days ago

I agree unfortunately is way easier for me to change in the extraction since this is not one only table but N, being N ~30-40 tables and I'm not the owner of the database.

~Can you elaborate in the incremental with JSONPath? I was trying to debug the code and see where this class is called our use but I'm lost in the factories and I cannot see where it happens.~

A ok it feels I can use commas, thanks let me take a look on how it works.

lfpll commented 3 days ago

@rudolfix JSONPath doesn't work with sqlite.

I tried

from jsonpath_ng.ext import parse
.....
#path = parse("['create_time','update_time']")
#path = parse("[create_time,update_time]")
#path = parse("$['create_time','update_time']")
#path = parse("$[create_time,update_time]")
#path = parse("[update_time]")
path = parse("update_time")
.....
  table.apply_hints(
            write_disposition="append",
            primary_key=table_config["primary_key"],
            incremental=dlt.sources.incremental(path,
                                                last_value_func=last_value_func),
        )

All failing in my sqlite test.

I would assume this is also a bug since JSONPath is one values accepted as incremental.

As an example when I do this

  path = parse("[update_time]")

I get this

In processing pipe a_organizations: extraction of resource a_organizations in generator table_rows caused an exception: "Cursor column 'update_time' does not exist in table 'organizations'"

Which clearly compiles to update_time but something is wrong when interpreting, because If I pass simply the string 'update_time' it works.

rudolfix commented 2 days ago

we'll be merging #2070 with an example that does (+-) what you need. The JSONPath way requires three things to work

is it a problem for you to add computed column to your table in the SELECT itself?