apache / amoro

Apache Amoro (incubating) is a Lakehouse management system built on open data lake formats.
https://amoro.apache.org/
Apache License 2.0
836 stars 276 forks source link

[Improvement]: Clarify the concept of watermark and optimize its implementation #1805

Closed zhoujinsong closed 1 week ago

zhoujinsong commented 1 year ago

Search before asking

What would you like to be improved?

We have implemented calculating and showing the watermark for Mixed format tables.

As #944 discusses, the current implementation has some issues that should be fixed.

In addition, as we support more table formats, we should consider better ways to support the watermark on other format tables.

How should we improve?

Are you willing to submit PR?

Subtasks

No response

Code of Conduct

zhoujinsong commented 1 year ago

Before starting the improvement, I would like to collect some information about table watermark.

What is table watermark?

A watermark is a time-based property on a table that indicates the freshness of the table, indicating that data older than this time has already been written to the table.

What is the usage of table watermark?

How to generate table watermark?

The watermark changes as data is written, so it is usually generated and recorded in the table ingestion job. It is generally only generated in streaming ingestion jobs as It is difficult to ensure the semantic meaning of watermark in batch write jobs.

Here are some implementation methods of different table formats:

What should we improve?

  1. As watermark is a core property of data lake tables, we should provide a unified management method for them.
  1. Open the computation method of watermark for different table formats.
majin1102 commented 1 year ago

It seems that the Iceberg community has discussed watermark quite further. I made some conclusions:

  1. Watermark of a table(Iceberg or others) in many cases should be a property or metric based on a timestamp column of this very table, which is defined as event-time column in Mixed Iceberg format. instead of reported from an upstream streaming job. Two scenarios should be considered:

    1. For a Flink job with watermark writing to a table, the watermark could reveal the real event time of the sink table, that's how Paimon watermark works, but this watermark could somehow be conflicted with the event time defined by sink table.
    2. For a Table defined with event time(means watermark is necessary), there could not be any streaming job writing to the table, or it's not guaranteed that streaming jobs will always define a watermark.
    3. Given the two points above, we may need to support both defining event time on a table and using watermark values from streaming jobs. If user explicitly defines event time on a table, the watermark is calculated based on the data distribution of that column during committing. If event time is not defined, the watermark values from streaming jobs could be used. If neither is defined, the commit time can be considered.
  2. There could be multiple streaming jobs writing to a table, the watermark must align all streaming writers, which means that calculating the watermark in committing phase is not a feasible solution. Amoro could calculate watermark values in refreshing phase and define an IDLE timeout like Flink dealing with multiple datastreams.

github-actions[bot] commented 3 weeks ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] commented 1 week ago

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'