delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.6k stars 1.71k forks source link

[BUG] Flink adapter compatibility checks fail on comments #1911

Open etspaceman opened 1 year ago

etspaceman commented 1 year ago

Bug

Describe the problem

The Flink adapter today attempts to compare schemas and see if they are compatible. It does this by:

The primary issue here is that the Json outputs of the underlying schema could contain comment metadata, but the one generated by the produced object has no understanding of these comments. If the underlying schema has some descriptive comments, the Json representations will not match, even though the schemas are indeed compatible.

Using mergeSchema would fix this issue, but it is not desirable as it would remove all of the comment metadata from the schema.

On top of this however, this logic is concerning in that any change to the underlying schema could result in a Flink application failure. This includes backwards compatible changes, like adding an optional field, or updating a field to optional from required. The compatibility layer should accept these scenarios and continue to allow writes.

Steps to reproduce

Observed results

An unrecoverable error is produced, rendering the Flink application blocked in progress.

java.lang.RuntimeException: DataStream's schema is different from current table's schema. 

Expected results

Flink is able to write to the underlying schema

Further details

Full stack trace, but without the schema (as it is internal). Can share the schema output to a DBX representative on a private channel.

java.lang.RuntimeException: DataStream's schema is different from current table's schema. 
at io.delta.flink.sink.internal.committer.DeltaGlobalCommitter.handleMetadataUpdate(DeltaGlobalCommitter.java:488)
    at io.delta.flink.sink.internal.committer.DeltaGlobalCommitter.doCommit(DeltaGlobalCommitter.java:416)
    at io.delta.flink.sink.internal.committer.DeltaGlobalCommitter.commit(DeltaGlobalCommitter.java:235)
    at org.apache.flink.streaming.api.transformations.SinkV1Adapter$GlobalCommitterAdapter.commit(SinkV1Adapter.java:453)
    at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:127)
    at org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.commit(GlobalCommitterOperator.java:173)
    at org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.initializeState(GlobalCommitterOperator.java:143)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
    at java.base/java.lang.Thread.run(Thread.java:829)

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

scottsand-db commented 1 year ago

Hi @etspaceman - thanks for this issue. I've locally produced a test that reproduces this issue.

What do you want to see as a solution? Is a suitable MVP: do not fail if there stream schema and table schema only differ by schema comments?

etspaceman commented 1 year ago

I think that would suffice for the comments portion. But we should be able to also support compatible differences between the schemas. E.g. if an optional field is present on the spark schema, but not the flink one, we should still be able to continue the job without failing