apache / amoro

Apache Amoro (incubating) is a Lakehouse management system built on open data lake formats.
https://amoro.apache.org/
Apache License 2.0
748 stars 261 forks source link

[Bug]: Load mixed-format table failed in Flink 1.15 #2921

Closed zhoujinsong closed 2 weeks ago

zhoujinsong commented 3 weeks ago

What happened?

When I tried to load a mixed-format table within FLink 1.15, an error occurred with the message below:

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.table.functions.LookupFunction

Affects Versions

master

What engines are you seeing the problem on?

Flink

How to reproduce

Relevant log output

2024-06-12 20:17:48,943 WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse statement: select * from `user`;
        at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174) ~[flink-sql-client-1.15.4.jar:1.15.4]
        at org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45) ~[flink-sql-client-1.15.4.jar:1.15.4]
        at org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71) ~[flink-sql-client-1.15.4.jar:1.15.4]
        at org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2731) ~[flink-sql-client-1.15.4.jar:1.15.4]
        at org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:585) ~[flink-sql-client-1.15.4.jar:1.15.4]
        at org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:296) [flink-sql-client-1.15.4.jar:1.15.4]
        at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:281) [flink-sql-client-1.15.4.jar:1.15.4]
        at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:229) [flink-sql-client-1.15.4.jar:1.15.4]
        at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) [flink-sql-client-1.15.4.jar:1.15.4]
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) [flink-sql-client-1.15.4.jar:1.15.4]
        at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) [flink-sql-client-1.15.4.jar:1.15.4]
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) [flink-sql-client-1.15.4.jar:1.15.4]
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'hadoop_mixed.db.user'.

Table options are:

'flink.max-continuous-empty-commits'='2147483647'
'mixed-format.change.identifier'='db.user_change_'
'mixed-format.primary-key-fields'='id'
'mixed-format.table-store'='base'
'owner'='root'
'provider'='arctic'
'self-optimizing.group'='local'
'table-format'='MIXED_ICEBERG'
'table.create-timestamp'='1717489520057'
'watermark.base'='1717489522597'
'watermark.table'='1717489522597'
'write.metadata.delete-after-commit.enabled'='true'
'write.parquet.compression-codec'='zstd'
        at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:159) ~[flink-table-api-java-uber-1.15.4.jar:1.15.4]
        at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:184) ~[flink-table-api-java-uber-1.15.4.jar:1.15.4]
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175) ~[?:?]
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) ~[?:?]
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:197) ~[?:?]
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:189) ~[?:?]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1240) ~[?:?]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1188) ~[?:?]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:345) ~[?:?]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[?:?]
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105) ~[?:?]
        at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:172) ~[flink-sql-client-1.15.4.jar:1.15.4]
        at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88) ~[flink-sql-client-1.15.4.jar:1.15.4]
        at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) ~[flink-sql-client-1.15.4.jar:1.15.4]
        ... 11 more
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/table/functions/LookupFunction
        at org.apache.amoro.flink.table.DynamicTableFactory.generateDynamicTableSource(DynamicTableFactory.java:173) ~[amoro-mixed-format-flink-runtime-1.15-0.7-SNAPSHOT.jar:?]
        at org.apache.amoro.flink.table.DynamicTableFactory.createDynamicTableSource(DynamicTableFactory.java:164) ~[amoro-mixed-format-flink-runtime-1.15-0.7-SNAPSHOT.jar:?]
        at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:156) ~[flink-table-api-java-uber-1.15.4.jar:1.15.4]
        at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:184) ~[flink-table-api-java-uber-1.15.4.jar:1.15.4]
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175) ~[?:?]
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) ~[?:?]
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:197) ~[?:?]
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:189) ~[?:?]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1240) ~[?:?]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1188) ~[?:?]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:345) ~[?:?]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[?:?]
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105) ~[?:?]
at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:172) ~[flink-sql-client-1.15.4.jar:1.15.4]
        at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88) ~[flink-sql-client-1.15.4.jar:1.15.4]
        at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) ~[flink-sql-client-1.15.4.jar:1.15.4]
        ... 11 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.functions.LookupFunction
        at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_401]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_401]
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:359) ~[?:1.8.0_401]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_401]
        at org.apache.amoro.flink.table.DynamicTableFactory.generateDynamicTableSource(DynamicTableFactory.java:173) ~[amoro-mixed-format-flink-runtime-1.15-0.7-SNAPSHOT.jar:?]
        at org.apache.amoro.flink.table.DynamicTableFactory.createDynamicTableSource(DynamicTableFactory.java:164) ~[amoro-mixed-format-flink-runtime-1.15-0.7-SNAPSHOT.jar:?]
        at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:156) ~[flink-table-api-java-uber-1.15.4.jar:1.15.4]
        at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:184) ~[flink-table-api-java-uber-1.15.4.jar:1.15.4]
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175) ~[?:?]
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) ~[?:?]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) ~[?:?]
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:197) ~[?:?]
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:189) ~[?:?]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1240) ~[?:?]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1188) ~[?:?]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:345) ~[?:?]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[?:?]
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105) ~[?:?]
        at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:172) ~[flink-sql-client-1.15.4.jar:1.15.4]
        at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88) ~[flink-sql-client-1.15.4.jar:1.15.4]
        at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) ~[flink-sql-client-1.15.4.jar:1.15.4]
        ... 11 more
(END)

Anything else

No response

Are you willing to submit a PR?

Code of Conduct