apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.13k stars 2.13k forks source link

The "Emitting watermarks" feature can't be used in flink sql? #10219

Open yeezychao opened 4 months ago

yeezychao commented 4 months ago

Query engine

flink 1.18.0

Question

Hi @stevenzwu In the latest version, use flink sql still cannot define watermarks. This is still not possible when our company wants to use flink sql to implement window aggregation to process ODS data. Are there plans to support this?

pvary commented 4 months ago

@yeezychao: The feature should be available in Iceberg 1.5.0. Here is the PR, and there is an example there: #9346.

Please let us know, if it is not working.

pvary commented 4 months ago

Also, here is the documentation which describes what this feature can and can't do: https://iceberg.apache.org/docs/nightly/flink-queries/#emitting-watermarks

yeezychao commented 4 months ago

@pvary Thank you for your reply. I understand what you mean and I have also read the doc. However, we want to implement a function similar to this demo and need to use flink sql for TUMBLE window aggregation. If you run window calculations by specifying the watermark field through options, I will get an exception. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.Because it is not defined by _WATERMARK FOR rowtime_column_name AS watermark_strategyexpression Therefore, I think this implementation can only be used under Flink Data StreamAPI, Table&SQL API are not supported yet.

pvary commented 4 months ago

@yeezychao: Do you happen to know, what is needed from the connector side to make this work?

yeezychao commented 4 months ago

@pvary I'm confused why computed columns and watermark specs are not supported in the FlinkCatalog.java code.

pvary commented 4 months ago

What would be needed to support them? I am guessing that this would be a Flink specific conversion between the Iceberg table and the Flink table. Am I right here?

yeezychao commented 4 months ago

@pvary You're right!

yeezychao commented 4 months ago

IcebergTableSource should implement the SupportsWatermarkPushDown interface like Kafka to implement Watermark pushdown. Can you give some suggestions? @hililiwei @stevenzwu

pvary commented 4 months ago

Maybe we could just implement the interface with the IcebergTableSource. We either prevent setting a watermark strategy which is not noWatermarks (cleaner approach, but might not work, if the watermark strategy is reused somewhere else to change some behavior), or create a new watermark strategy in createFLIP27Stream, where the watermark generation part is noWatermark, and only the timestamp generation part is reused. If we pass this watermark strategy to the fromSource, then we might be able to make things work.

Sadly I don't have the bandwidth to test this out, but if you want to take a stab at it, I can review your PR.

Thanks, Peter

Deanozk commented 1 month ago

Is this not on the roadmap yet?

Deanozk commented 1 month ago

Any updates?

yeezychao commented 1 month ago

Maybe we could just implement the interface with the IcebergTableSource. We either prevent setting a watermark strategy which is not noWatermarks (cleaner approach, but might not work, if the watermark strategy is reused somewhere else to change some behavior), or create a new watermark strategy in createFLIP27Stream, where the watermark generation part is noWatermark, and only the timestamp generation part is reused. If we pass this watermark strategy to the fromSource, then we might be able to make things work.

Sadly I don't have the bandwidth to test this out, but if you want to take a stab at it, I can review your PR.

Thanks, Peter

Hi @pvary ,I refer to 4625 and implement the computed column and declare the watermark strategy. I plan to align the logic of your implemented watermark-column option in IcebergSource code according to the watermark field stored in the table attributes. But if the watermark-column is a computed column (not a physical column), how to create SplitWatermarkExtractor next? Can you give me some advice? Thank you

pvary commented 1 month ago

how to create SplitWatermarkExtractor next? Can you give me some advice?

I'm not sure how Flink calculates these watermarks normally. The IcebergSourceSplit could be used to get the min value for the timestamp column, but I am not sure how Flink calculates the watermark in these cases