apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.49k stars 2.24k forks source link

check-ordering enablement for flink config #10360

Open lei-xian0 opened 6 months ago

lei-xian0 commented 6 months ago

Feature Request / Improvement

Hi team, can we get check-ordering config enabled for Flink writers as well? Currently the input is not tolerating schema order change compare with the existing table schema. This is a blocker when we handling schema update. Now we are trying to use UnionByNameWith function to support existing table schema update, but its difficult for us to align the schema order in our customized serializer with table schema. For example when we config writer as below, the schema order have to match the existing table's schema order. Even tho they have the exact same columns. Example error:

Problems:
* columnA is out of order, before columnB`

Our flink writer:

FlinkSink.forRow(datastream, schema)
    .tableLoader(tableLoader)
    .append();

I believe its the same situation if we use RowData

FlinkSink.forRowData(datastream)
    .tableSchema(schema)
    .tableLoader(tableLoader)
    .append();

Please let us know if there is any other approach to handle udpate schema situation using Flink. Thanks.

Query engine

None

pvary commented 6 months ago

Currently Flink Sink is not able to handle the schema updates. You need to restart the job to handle schema changes. One hacky solution is to throw a SuppressRestartsException exception, and use some external tool, like Flink Kubernetes Operator to restart the failed job (kubernetes.operator.job.restart.failed does this for you). This will recreate the job graph and will update the sink schema.

Is this the feature that you are looking for?

lei-xian0 commented 6 months ago

@pvary Thanks for quick reply! Actually the manual restart is not a problem for us. We are using unionByName to update table schema first, then the problem we are facing is the Row/RowData we generated with newSchema is not as same order as the updated table schema. This is causing job crush.

pvary commented 6 months ago

You could create an operator to check for schema changes and fail the job if needed

lei-xian0 commented 6 months ago

I see, so the flink sinks just doesn't support any schema change. Wondering is this something on the current road map and we can expect in the foreseeable future?

pvary commented 6 months ago

This is non-trivial, as currently the schema is compiled to the job graph. We need to decouple the schema either using external schema registry, or switching to less performant serialization methods on the wire.

I do not know about anyone working on this feature right now.

github-actions[bot] commented 4 days ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.