docker-flink / examples

Examples for how to use the Flink Docker images in a variety of ways
Apache License 2.0
90 stars 46 forks source link

Flink and Calico on Kubernetes connection reset / PUT operation failed #3

Closed Vince-Cercury closed 6 years ago

Vince-Cercury commented 6 years ago

I'm not sure it should be posted here. Please let me know if not appropriate.

We run Flink in Kubernetes 1.8 in AWS. It's been fine for monthsWe can also make it work with this helm. . I've setup a new k8s clusters recently.Everything the same EXCEPT we enabled Calico (instead of using only Flannel)

Calico gives us networking between containers.

Since enabling Calico, Flink client receive this error when trying to send a jar file to job manager:

    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
    at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:73)
    at au.com.nbnco.ourapp.connectivitytest.Application.executeJob(Application.java:26)
    at au.com.nbnco.ourapp.connectivitytest.Application.main(Application.java:14)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
    at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Could not upload the jar files to the job manager.
    at org.apache.flink.runtime.client.JobSubmissionClientActor$1.call(JobSubmissionClientActor.java:154)
    at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:95)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Could not retrieve the JobManager's blob port.
    at org.apache.flink.runtime.blob.BlobClient.uploadJarFiles(BlobClient.java:746)
    at org.apache.flink.runtime.jobgraph.JobGraph.uploadUserJars(JobGraph.java:584)
    at org.apache.flink.runtime.client.JobSubmissionClientActor$1.call(JobSubmissionClientActor.java:148)
    ... 9 more
Caused by: java.io.IOException: PUT operation failed: Connection reset
    at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:512)
    at org.apache.flink.runtime.blob.BlobClient.put(BlobClient.java:374)
    at org.apache.flink.runtime.blob.BlobClient.uploadJarFiles(BlobClient.java:772)
    at org.apache.flink.runtime.blob.BlobClient.uploadJarFiles(BlobClient.java:741)
    ... 11 more
Caused by: java.net.SocketException: Connection reset
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
    at org.apache.flink.runtime.blob.BlobUtils.writeLength(BlobUtils.java:324)
    at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:498)```

and Job manager says:

```java.lang.IllegalArgumentException: Invalid BLOB addressing for permanent BLOBs
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
    at org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:337)
    at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:114)
2018-03-27 06:28:16,069 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Submitting job 11433fc332c7d76100fd08e6d1b623b4 (flink-job-connectivity-test).
2018-03-27 06:28:16,085 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Using restart strategy NoRestartStrategy for 11433fc332c7d76100fd08e6d1b623b4.
2018-03-27 06:28:16,096 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job recovers via failover strategy: full graph restart
2018-03-27 06:28:16,105 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Running initialization on master for job flink-job-connectivity-test (11433fc332c7d76100fd08e6d1b623b4).
2018-03-27 06:28:16,105 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Successfully ran initialization on master in 0 ms.
2018-03-27 06:28:16,117 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job 11433fc332c7d76100fd08e6d1b623b4 (flink-job-connectivity-test)
java.lang.NullPointerException
    at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
    at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.<init>(CheckpointStatsTracker.java:121)
    at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1277)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:447)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

It looks like the file cannot be transferred from the client to the job manager. I believe Invalid BLOB addressing is because the job manager did not receive any file.

I don't understand why we get Connection reset. I can see in tcpdump that the job manager and client use a number of ports to communicate and transfer file

Everything is the same. Works on one cluster. Does not work on another. Ports are configured the same. Every artefact is the same.

We don't have any NetworkPolicy. But would Calico enabled have some form of effect on networking?

Vince-Cercury commented 6 years ago

Problem solved. I added this to my Flink task manager manifest file

`- name: data port: 6121

And this in the flink conf files : taskmanager.data.port: 6121

So basically I pinned a data port for task manager. I had done that for the job manager (blob server port). And it was fine. But it looks like Calico works differently than Flannel and it could not use a random data port for task manager