GoogleCloudPlatform / flink-on-k8s-operator

[DEPRECATED] Kubernetes operator for managing the lifecycle of Apache Flink and Beam applications.
Apache License 2.0
658 stars 266 forks source link

Entrypoint for submitting a job #267

Open Yufan-l opened 4 years ago

Yufan-l commented 4 years ago

Question: The operator is using flink run as the entrypoint, while the flink's official docker is using java org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint, what's the difference between them?

I'm asking because I have an app has trouble running with flink run, but working fine with StandaloneJobClusterEntryPoint. Can I override the entrypoint?

functicons commented 4 years ago

The design choice of flink run was made because it is easy to integrate with the operator. I don't find java org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint provides additional benefits. I'd like to know why flink run doesn't work for your use case.

BTW, the operator doesn't support overriding the entrypoint, but we can consider adding the feature if it is really needed.

Yufan-l commented 4 years ago

Thanks for reply. The app is simply consuming a kafka topic with value serialized in avro format. The error is an NPE

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: null
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.lang.NullPointerException
    at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
    at org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo.<init>(GenericRecordAvroTypeInfo.java:45)
    at org.apache.flink.formats.avro.AvroDeserializationSchema.getProducedType(AvroDeserializationSchema.java:169)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.getProducedType(KafkaDeserializationSchemaWrapper.java:55)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.getProducedType(FlinkKafkaConsumerBase.java:1042)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1570)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1547)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:638)

While I have added all dependencies in the classpath, not sure why flink run produces this error.

functicons commented 4 years ago

Did you try creating a Uber jar with dependencies for your job?

Yufan-l commented 4 years ago

Yes, i'm using the Uber jar and the docker image is flink:1.10.1-scala_2.11

Mrart commented 4 years ago

It is nothing about flink run. It seems something wrong with your jar.

sroctadian commented 3 years ago

Hi is there any solution for this problem ? currently i'm facing this problem when migrate to production system with flink on yarn. Previously the jobs running well on development server with standalone mode.