apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
5.85k stars 2.06k forks source link

Flink: Get NoSuchMethodError when submit flink sql by flink 1.12.x since iceberg upgrade flink dependence to 1.13.2 #3187

Closed Reo-LEI closed 2 years ago

Reo-LEI commented 2 years ago

Since we upgrade the flink dependence of iceberg from 1.12.1 to 1.13.2(#3116), I trying to submit flink sql job to a flink cluster which flink version is 1.12.0, and I encounter the NoSuchMethodError as follow(the full stack will be attached at the end):

java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;

I reproduce that in my local env, and I found the root cause of this error is flink 1.13 change the return value type of DynamicTableFactory$Context.getCatalogTable() method from CatalogTable to ResolvedCatalogTable.

I think we need to found a way to be compatible with flink 1.12. @openinx @Flyangz @kbendick

Flink 1.12 https://github.com/apache/flink/blob/release-1.12/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java#L53

Flink 1.13 https://github.com/apache/flink/blob/release-1.13/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java#L74

Trace Stack image

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a sink for writing table 'default_catalog.default_database.table_name'.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:814)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1056)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1134)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1134)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 
    at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:156)
    at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:369)
    at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:221)
    at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:159)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike.map(TraversableLike.scala:233)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:159)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
    at com.huya.dc.walrus.lakehouse.flink.sql.FlinkSQLSubmitter.executeSQL(FlinkSQLSubmitter.java:156)
    at com.huya.dc.walrus.lakehouse.flink.sql.FlinkSQLSubmitter.main(FlinkSQLSubmitter.java:112)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
    ... 11 more
Caused by: java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;
    at org.apache.iceberg.flink.FlinkDynamicTableFactory.createDynamicTableSink(FlinkDynamicTableFactory.java:104)
    at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:153)
    ... 36 more
openinx commented 2 years ago

Thanks for the report, @Reo-LEI ! I think this issue was introduced from this apache flink PR and FLINK-21913, it just changed the returned data type from CatalogTable to ResolvedCatalogTable without any compatibility guarantee. In this case, the iceberg-flink-runtime jar which is compiled from apache flink 1.13 will include the ResovledCatalogTable class inside it. Finally when we package this jar and submit the flink job to flink 1.12, the above compatibility issue happen.

As we all know, the DynamicTableFactory is a basic API which almost all flink connectors are built on top of it. The breaking compatibility makes the downstream projects really hard to deliver better compatibility to users, unless we iceberg maintain different modules for each maintained flink version (That's not the thing that we want to do).

The last flink upgrading work is also not a good experience (See the discussion and comment ), because the flink 1.12 also breaks several API that was annotated PublicEvolving in flink 1.11.0, that becomes one of the most important reasons leading to the conclusion that stops support flink 1.11.0 in our apache iceberg branch ( Supporting new features [such as flip-27 unified iceberg source/sink] that depends the API introduced in flink 1.12 is another reason). To better support the compatibility of downstream systems and delivering better experience to flink users, I will strongly suggest the Apache Flink community to pay more attention to ensuring API compatibility.

openinx commented 2 years ago

FYI @twalthr, @tillrohrmann, @StephanEwen, @zentol, @aljoscha, @rmetzger, @fhueske, @dawidwys

openinx commented 2 years ago

In short term, I don't think the apache flink will release a new version to address the compatibility issue recently. But I think we apache iceberg can still make jar which is built on top of flink 1.13 works fine for both flink1.12 and flink1.13, because we don't introduce any new API when upgrading the flink version from 1.12 to 1.13 in https://github.com/apache/iceberg/pull/3116.

I think we can use the similar approach to load the class method dynamically as the following: https://github.com/apache/iceberg/blob/d5443e3a34a4288441a015ab616d965557d78202/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java#L94

But I don't think we can completely avoid more implicit compatibility issues like this, although the probability is very small.

Update I've mailed a message to the apache flink developer mail list here: https://lists.apache.org/x/thread.html/ra438e89eeec2d4623a32822e21739c8f2229505522d73d1034e34198@%3Cdev.flink.apache.org%3E

MartijnVisser commented 2 years ago

@openinx @Reo-LEI Thanks for the message. As mentioned in the Flink Dev mailing list, I'm going to investigate what happened on our end, share that with you and let's get some learning out of this. I'll get back to you in a couple of days.

openinx commented 2 years ago

Thanks for the feedback @MartijnVisser ! Looking forward a better journey with flink&iceberg.

MartijnVisser commented 2 years ago

For transparency purposes, this is what @twalthr shared with the mailing list on this topic:

I'm very sorry for the inconvenience that we have caused with our API changes. We are trying our best to avoid API breaking changes. Thanks for giving us feedback.

There has been a reason why Table API was marked as @PublicEvolving instead of @Public. Over the last two years, we have basically rewritten the entire API [1] to digest the Blink merge and making the Table API stable and ready for the future. We tried our best to give users 1-2 releases time to upgrade their implementations whenever we deprecated API but we were aware that this might cause frustration, but hopefully for the greater good. We have reworked type system, Catalog API, schema, source/sinks, functions and much more. Flink 1.14 will hopefully be the last release with major API changes. We could also mark most Table API interfaces as @Public in 1.15.

For your mentioned incompatibility, I agree that the change from CatalogTable to ResolvedCatalogTable was not very nice. Since implementing a connector is not straight forward, we were expecting that not many users implement custom connectors. We therefore consider this part as kind of "second level API" for which we can evolve quicker. A context.getCatalogTable().getSchema() should still work for 1.12 and 1.13, at least that was the goal.

Thanks again for the feedback. It was a good reminder and we will pay more attention to this.

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions

kbendick commented 2 years ago

I know this is old but thank you for tagging me @Reo-LEI.

I have been out of office for a bit (just getting back this week) but thank you for the summary @openinx and for all of the work on this.

Thank you so much also @MartijnVisser and everybody involved.

I have a new role as an open-source engineer now, in general vs before when my focus was a bit more Spark+Iceberg. So my work is a bit more flexible as to what is worked on then before. I'd love to be get set up to possibly help contribute back some of our learnings / experiences / use cases as I do feel that Iceberg is becoming quite popular on the Flink side, particularly with newer or more bleeding edge Flink users.

kbendick commented 2 years ago

If we can get our Flink version up to be within 2 major versions of the current stable release, like is suggested in the Flink community, I think it would be much easier to contribute back where we might need configs or something to decide on which API path to take as things are evolving (as I myself have seen some where it defaults to one over the other and I would have liked to choose the other). The newer APIs are also more powerful and so I do understand why this is happening.

I look forward to working with you all as I am a huge Flink streaming fan!

MartijnVisser commented 2 years ago

@kbendick Would love to jump on a call with some of our engineers to talk some more, it might be especially worthwhile since we're designing something to address compaction from a Flink perspective. Would be great to get some feedback from you.