mesosphere / marathon

Deploy and manage containers (including Docker) on top of Apache Mesos at scale.
https://mesosphere.github.io/marathon/
Apache License 2.0
4.07k stars 845 forks source link

Running Flink on Mesos with two slaves via Marathon #6953

Open marzieh-ghasemi opened 5 years ago

marzieh-ghasemi commented 5 years ago

I have four physical nodes with docker installed on each of them. I configured Mesos,Flink,Zookeeper,Hadoop and Marathon on docker of each one. I had already had three nodes,one slave and two masters, that I had run Flink on Marathon and its UI had been run without any problems. After that, I changed the cluster,two masters and two slaves. I added this Json file in Marathon, it was ran, but Flink UI was not shown in both slave nodes. The error is in following.

{
 "id": "flink",
 "cmd": "/home/flink-1.7.2/bin/mesos-appmaster.sh -Djobmanager.heap.mb=1024 -Djobmanager.rpc.port=6123 -Drest.port=8081 -Dmesos.resourcemanager.tasks.mem=1024 -Dtaskmanager.heap.mb=1024 -Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2 -Dmesos.resourcemanager.tasks.cpus=1",
 "cpus": 1.0,
 "mem": 1024,
 "instances": 2
}
Error:
Service temporarily unavailable due to an ongoing leader election. Please refresh

I cleared Zookeeper contents with this commands:

  /home/zookeeper-3.4.14/bin/zkCleanup.sh /var/lib/zookeeper/data/ -n 10
  rm -rf /var/lib/zookeeper/data/version-2
  rm /var/lib/zookeeper/data/zookeeper_server.pid

Also, I ran this command and delete Flink contents in Zookeeper:

   /home/zookeeper-3.4.14/bin/zkCli.sh
  delete /flink/default/leader/....

But still one of Flink UI has problem.

I have configured Flink high availability like this:

  high-availability: zookeeper
  high-availability.storageDir: hdfs:///flink/ha/
  high-availability.zookeeper.quorum: 0.0.0.0:2181,10.32.0.3:2181,10.32.0.4:2181,10.32.0.5:2181
  fs.hdfs.hadoopconf: /opt/hadoop/etc/hadoop
  fs.hdfs.hdfssite: /opt/hadoop/etc/hadoop/hdfs-site.xml
  recovery.zookeeper.path.mesos-workers: /mesos-workers
  env.java.home: /opt/java
  mesos.master: 10.32.0.2:5050,10.32.0.3:5050

Because I used Mesos cluster, I did not change any thing in flink-conf.yaml. I supposed that Marathon must handle running application and distribute applications among slave nodes, but it did not.

Would you please guide me how to use both Mesos slaves to run Flink platform?

Any help would be really appreciated.

jeschkies commented 5 years ago

I'm not really familiar with Flink. In any case, Marathon does not really know what kind of workload it is running so it won't tear down framework probably. At least for now.

I had already had three nodes,one slave and two master, ...

You should run a cluster with either one or three masters.

Would you please guide me how to use both Mesos slaves to run Flink platform?

I'm not very familiar with Flink on Mesos but it seems you already have several frameworks running. So I suggest you try out DC/OS. I uses Marathon under the hood and should help you setup a cluster with several nodes.

marzieh-ghasemi commented 5 years ago

Dear Jeschkies, do you say that Marathon does not distribute tasks among slaves? I believe Mesos is responsible to distribute tasks among slaves and I have to determine that in JSON file which is for Marathon, but I don't know how.

meichstedt commented 5 years ago

@marzieh-ghasemi Marathon does not necessarily distribute tasks among slaves; it will use suitable offers but as @jeschkies pointed out it doesn't know what your service's requirements are. You have to tell Marathon to distribute tasks if you need that – please refer to the documentation on constraints.

You could add

"constraints": [["hostname", "UNIQUE"]]

to your app definition to prevent flink tasks from being co-located.

Please check if the DC/OS Apache Flink integration isn't better suited for you needs.

marzieh-ghasemi commented 5 years ago

@meichstedt, thank you for your guidance. I use this Json file to run Flink in Mesos cluster, would you please guide me that I add "constraints" correctly or not? I need to add just hostname in constraints.

{
    "id": "flink",
    "cmd": "/home/flink-1.7.2/bin/mesos-appmaster.sh -Djobmanager.heap.mb=1024 -Djobmanager.rpc.port=6123 -Drest.port=8081 -Dmesos.resourcemanager.tasks.mem=1024 -Dtaskmanager.heap.mb=1024 -Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2 -Dmesos.resourcemanager.tasks.cpus=1",
    "cpus": 1.0,
    "mem": 1024,
    "instances": 2
    "constraints": [["10.32.0.4" , "10.32.0.5"]]
}

Also, when I run a Jar file in Flink UI, I want the jar file distribute among slave nodes. Is that possible?

Many thanks.