ibis-project / ibis

the portable Python dataframe library
https://ibis-project.org
Apache License 2.0
5.15k stars 590 forks source link

feat(flink): support event-time temporal join #8247

Open mfatihaktas opened 8 months ago

mfatihaktas commented 8 months ago

Is your feature request related to a problem?

Event-time temporal join in Flink enables joining a table against a versioned table. Tables in Flink are temporal/dynamic, i.e., row values can change over time, or rows can be added or deleted. A versioned table contains one or more versioned table snapshots. With event-time temporal join, a table can be enriched with values retrieved from a versioned table at a certain point in time.

More information is available on the Flink doc.

Background

We previously implemented the event-time temporal join support in https://github.com/ibis-project/ibis/pull/7921. We put it on hold during the reviews due to two reasons:

Use case(s)

A generic example given in the Flink doc is as follows.

For example, suppose we have a table of orders, each with prices in different currencies. To properly normalize this table to a single currency, such as USD, each order needs to be joined with the proper currency conversion rate from the point-in-time when the order was placed.

In this example, we have (1) orders table with order_time event-time attribute, and currency and price fields, (2) currency_rates table with update_time event-time attribute, and currency and conversion_rate fields. Here currency_rates is a versioned table. User would want to join orders with currency_rates on currency fields by order_time.

As a more specific use case in ML space, temporal join is desired for training dataset generation:

Other backends

Supporting temporal join:

Not supporting temporal join:

Note: Temporal join seems to be also used to refer to the as of join support in other backends.

However, FOR SYSTEM_TIME AS OF is supported by other backends -- though this fits into time travel not temporal join:

This quote might explain why Flink seems to be the only backend supporting temporal join

Stream processing technologies like Apache Flink introduce a new type of data transformation that’s very powerful: the temporal join. [Why Temporal Join is Stream Processing’s Superpower]

Other streaming engines that support temporal join

API

Option 1: temporal_join

Example:

expr = table_left.temporal_join(
    table_right,
    table_left.on_field == table_right.on_field,
    at/by=table_left.event_time_attribute,
)

Option 2: VersionedTable + at_time + join

Temporal join in Flink is supported only against versioned tables:

Versioned tables are defined implicitly for any tables whose underlying sources or formats directly define changelogs. Examples include the upsert Kafka source as well as database changelog formats such as debezium and canal. As discussed above, the only additional requirement is the CREATE table statement must contain a PRIMARY KEY and an event-time attribute.

Adding the abstraction VersionedTable would enable Ibis to enforce the temporal join requirements. When the user defines a versioned Ibis table on a source that does not support versioning, we can let the backend error bubble up. VersionedTable can be overridden to execute backend-specific requirement checks. For Flink, that would be checking if the versioned table has been defined with a primary key and an event-time attribute.

Example: table_right = con.create_table(..., versioned=True)

expr = table_left.join(
    table_right.at_time(table_left.event_time_attribute),
    table_left.on_field == table_right.on_field
)

Option 3: Extend asof_join

The previous attempt implemented this option. The rationale behind this was the only form of asof join supported by Flink SQL is temporal join with the FOR SYSTEM_TIME AS OF clause.

Used Pandas merge_asof as the inspiration for the API example above.

What version of ibis are you running?

8.0.0

What backend(s) are you using, if any?

Flink

Code of Conduct

ogrisel commented 7 months ago

It might be important to distinguish between system time (table history managed automatically by the DBMS system) and business/validity time managed by the application/user.

System time might be what you get from time travel system as implemented in Apache Iceberg, Delta Lake and SQL 2011 temporal tables.

Bi-temporal modeling can be useful in many contexts, and Ibis might be required to know how system and valid time ranges are handled differently by the underlying backend.

Tri-temporal modeling (with an additional decision time) might be also be useful but adds an other layer of complexity and I am not sure Ibis has to do anything about that extra modeling layer. I might be 100% managed by application code.

ogrisel commented 7 months ago

As far as I understand, it's already possible to do table_a.asof_join(table_b, on="time", by="entity_id") in Ibis (using the duckdb backend for instance), but only for a regular time column (business/validity).

It's also possible to load a table for a given system time for some combination of data source formats / backends, for instance using ibis.read_delta(data_path, version=last_version_before_timestamp) where last_version_before_timestamp can be manually computed by the user from the contents of deltalake.DeltaTable(data_path).history(). In that respect, last_version_before_timestamp is an example of inference based on a system time managed by the deltalake runtime.

Things that are missing:

The latter might be a direct consequence of implementing the former.

mfatihaktas commented 6 months ago

Thanks @ogrisel for sharing your thoughts.

As far as I understand, it's already possible to do table_a.asof_join(table_b, on="time", by="entity_id") in Ibis (using the duckdb backend for instance), but only for a regular time column (business/validity).

Yes, asof_join is based on a time column.

It's also possible to load a table for a given system time for some combination of data source formats / backends, for instance using ibis.read_delta(data_path, version=last_version_before_timestamp) where last_version_before_timestamp can be manually computed by the user from the contents of deltalake.DeltaTable(data_path).history(). In that respect, last_version_before_timestamp is an example of inference based on a system time managed by the deltalake runtime.

If I understand it correctly, this refers to "time travel" where a snapshot of a table is accessed. The specific table snapshot to load is determined by the backend for a given timestamp. We have an issue for time travel support: https://github.com/ibis-project/ibis/issues/8203

It is important to differentiate temporal join from time travel.

Overall, temporal join allows for enriching the row values (indexed with a primary key) with values from a changing source.