GoogleCloudPlatform / flink-on-k8s-operator

[DEPRECATED] Kubernetes operator for managing the lifecycle of Apache Flink and Beam applications.
Apache License 2.0
658 stars 266 forks source link

Job Manager HA setup fails #366

Open shravangit20 opened 3 years ago

shravangit20 commented 3 years ago

I have added the below HA job manager properties to the "flink properties" section in the CRD and noticed the job manager/task manager pods getting crashed and from the logs it appears as there is an issue with the "high-availability.storageDir". I have tried an S3 loaction, a folder path from zookeeper pod etc but it appears like a hdfs security context and a HDFS location is expected for the HA to work? Is this a correct understanding? Could you please help?

Flink properties: high-availability: zookeeper high-availability.zookeeper.quorum: my-kafka-zookeeper:2181 high-availability.storageDir: s3://... high-availability.cluster-id: "zk-ha1" high-availability.jobmanager.port: "6126" high-availability.zookeeper.path.root: /flink-k8s restart-strategy: failure-rate

Below is the Job Manager error log :

2020-11-05 22:24:56,185 INFO org.apache.flink.runtime.security.SecurityUtils - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath. 2020-11-05 22:24:56,186 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Initializing cluster services. 2020-11-05 22:24:57,807 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to start actor system at flink-operator-jobmanager:6126 2020-11-05 22:25:00,459 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 2020-11-05 22:25:00,671 INFO akka.remote.Remoting - Starting remoting 2020-11-05 22:25:01,461 INFO akka.remote.Remoting - Remoting started; listening on addresses :[akka.tcp://flink@flink-operator-jobmanager:6126] 2020-11-05 22:25:01,952 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor system started at akka.tcp://flink@flink-operator-jobmanager:6126 2020-11-05 22:25:02,050 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)

I read a medium post suggesting to add hadoop jars to the existing Flink image and then use the hdfs path to the storage directory....Is that the recommended approach? https://medium.com/hepsiburadatech/high-available-flink-cluster-on-kubernetes-setup-73b2baf9200e

shravangit20 commented 3 years ago

Could someone please help with this. I got stuck with the HA setup to make it work but all my attempts were unsuccessfull. Below are few things I tried: Added HA configurations in flink properties to register zookeeper endpoint Tried with zookeeper 3.4, 3.5 & 3.6 setups to ensure its not the version specific zookeeper client issue Used s3 bucket (Have a s3 downloader init container with keys), local path etc as a high-availability.storageDir to store job manager metadata.

Also, is it recommended to use a flink image which has hadoop baked in to use a hdfs:// path for the high-availability.storageDir? Also, noticed that the job-manager, task-manager pods going in to "CrashLoopBackOff" status when the HA arguments are added to flink properties. Without them the cluster gets created fine.

Job-Manager Logs after creating the FlinkCluster with HA configs enabled:

04:47:52.588 [main] ERROR org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl - Ensure path threw exception org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /opt at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException.create(KeeperException.java:102) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException.create(KeeperException.java:54) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1111) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1139) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.utils.ZKPaths.mkdirs(ZKPaths.java:291) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.NamespaceImpl$1.call(NamespaceImpl.java:90) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.NamespaceImpl.fixForNamespace(NamespaceImpl.java:83) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.fixForNamespace(CuratorFrameworkImpl.java:731) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.WatcherRemovalFacade.fixForNamespace(WatcherRemovalFacade.java:170) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:187) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:35) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache.reset(NodeCache.java:258) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache.start(NodeCache.java:181) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache.start(NodeCache.java:160) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService.start(ZooKeeperLeaderElectionService.java:138) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.startInternal(WebMonitorEndpoint.java:761) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:244) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:163) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:219) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:520) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:64) [flink-dist_2.12-1.11.2.jar:1.11.2] 04:47:52.596 [main] ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: Unhandled error in ZooKeeperLeaderElectionService: Ensure path threw exception at org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService.unhandledError(ZooKeeperLeaderElectionService.java:430) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:713) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:709) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:100) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:92) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.logError(CuratorFrameworkImpl.java:708) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.NamespaceImpl.fixForNamespace(NamespaceImpl.java:100) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.fixForNamespace(CuratorFrameworkImpl.java:731) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.WatcherRemovalFacade.fixForNamespace(WatcherRemovalFacade.java:170) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:187) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:35) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache.reset(NodeCache.java:258) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache.start(NodeCache.java:181) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache.start(NodeCache.java:160) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService.start(ZooKeeperLeaderElectionService.java:138) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.startInternal(WebMonitorEndpoint.java:761) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:244) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:163) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:219) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:520) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:64) [flink-dist_2.12-1.11.2.jar:1.11.2] Caused by: org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /opt at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException.create(KeeperException.java:102) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException.create(KeeperException.java:54) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1111) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1139) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.utils.ZKPaths.mkdirs(ZKPaths.java:291) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.NamespaceImpl$1.call(NamespaceImpl.java:90) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.NamespaceImpl.fixForNamespace(NamespaceImpl.java:83) ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0] ... 17 more

shravangit20 commented 3 years ago

@functicons Could you please confirm if having a Hadoop file system & its dependencies baked in to the flink is mandatory in order to enable job manager HA? I have been seeing this in my logs when I use the 1.8.2.

2020-11-11 14:24:09,276 INFO org.apache.flink.core.fs.FileSystem - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available. 2020-11-11 14:24:09,357 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Install security context. 2020-11-11 14:24:09,363 INFO org.apache.flink.runtime.security.modules.HadoopModuleFactory - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath. 2020-11-11 14:24:09,371 INFO org.apache.flink.runtime.security.SecurityUtils - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath. 2020-11-11 14:24:09,371 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Initializing cluster services. 2020-11-11 14:24:10,379 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to start actor system at flink-operator-jobmanager:0 2020-11-11 14:24:12,164 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 2020-11-11 14:24:12,462 INFO akka.remote.Remoting - Starting remoting 2020-11-11 14:24:12,967 INFO akka.remote.Remoting - Remoting started; listening on addresses :[akka.tcp://flink@flink-operator-jobmanager:39267] 2020-11-11 14:24:12,979 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor system started at akka.tcp://flink@flink-operator-jobmanager:39267 2020-11-11 14:24:13,060 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)

PengsenTang commented 3 years ago

Hey,Shra~ Any updates about the "Could not create FileSystem for highly available storage"? Cuz I got the same issue and have no idea..

PengsenTang commented 3 years ago

@shravangit20

gezhiwei8899 commented 2 years ago

I Got the same problem

MikeB2019x commented 2 years ago

Same