apache / sedona

A cluster computing framework for processing large-scale geospatial data
https://sedona.apache.org/
Apache License 2.0
1.95k stars 695 forks source link

Sedona Flink Function Not Found Error When Deployed on Distributed Cluster #1052

Open wang0702 opened 1 year ago

wang0702 commented 1 year ago

Expected behavior

Actual behavior

Sedona Flink works fine in my local IDE, but when I try to deploy it to a distributed cluster, I get a "function not found" error.

Steps to reproduce the problem

Settings

Sedona version = 1.4.1

Apache Spark version = 3.0

Apache Flink version = 1.43

API type = Scala, Java, Python?

Scala version =2.12

JRE version = 1.8

Python version = 3.7

Environment = Standalone or flink cluster

image
jiayuasu commented 1 year ago

Please make sure you correctly package your code to a fat jar. If you use Maven, please use maven shaded plugin. If you use sbt, please use sbt assembly plugin.

But from the error message, it seems that you didn't correctly register Sedona Flink functions to Flink.

wang0702 commented 12 months ago

I just used the sedona example/flink-sql program to package, it works fine in IDEA locally. But cannot depoly to the cluster. I already used the maven shaded plugin right now. Could you please give me more suggestions.

org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application. at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:110) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) at java.lang.Thread.run(Thread.java:750) Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) ... 1 more Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88) at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:104) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ... 1 more Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Invalid function call: ST_S2CellIDs(RAW('org.locationtech.jts.geom.Geometry', '...'), INT NOT NULL) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ... 4 more Caused by: org.apache.flink.table.api.ValidationException: Invalid function call: ST_S2CellIDs(RAW('org.locationtech.jts.geom.Geometry', '...'), INT NOT NULL) at org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194) at org.apache.flink.table.planner.functions.inference.TypeInferenceReturnInference.inferReturnType(TypeInferenceReturnInference.java:78) at org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:482) at org.apache.calcite.rex.RexBuilder.deriveReturnType(RexBuilder.java:283) at org.apache.calcite.tools.RelBuilder.call(RelBuilder.java:595) at org.apache.calcite.tools.RelBuilder.call(RelBuilder.java:602) at org.apache.flink.table.planner.expressions.converter.FunctionDefinitionConvertRule.convert(FunctionDefinitionConvertRule.java:73) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:71) at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:134) at org.apache.flink.table.planner.plan.QueryOperationConverter.convertExprToRexNode(QueryOperationConverter.java:857) at org.apache.flink.table.planner.plan.QueryOperationConverter.access$800(QueryOperationConverter.java:134) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.lambda$convertToRexNodes$6(QueryOperationConverter.java:630) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.convertToRexNodes(QueryOperationConverter.java:631) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:154) at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:76) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:151) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133) at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47) at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:76) at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:184) at org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.convertToRel(QueryOperationCatalogViewTable.java:76) at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59) at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:716) at FlinkExample.main(FlinkExample.java:55) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ... 7 more Caused by: org.apache.flink.table.api.ValidationException: Could not infer an output type for the given arguments. at org.apache.flink.table.types.inference.TypeInferenceUtil.inferOutputType(TypeInferenceUtil.java:154) at org.apache.flink.table.planner.functions.inference.TypeInferenceReturnInference.inferReturnTypeOrError(TypeInferenceReturnInference.java:99) at org.apache.flink.table.planner.functions.inference.TypeInferenceReturnInference.inferReturnType(TypeInferenceReturnInference.java:76) ... 62 more