spring-attic / spring-xd-ambari

Apache Ambari integration for Spring XD
Apache License 2.0
9 stars 18 forks source link

xd.messagebus.kafka.zkAddress is not updated in distributed mode when Kafka used as transport #15

Closed nsarvi closed 8 years ago

nsarvi commented 8 years ago

Spring XD 1.2.1 installed on PHD 3.0 release using the Ambari plugin. Its a 5 node PHD cluster with 3 zookeeper server and 2 Spring XD containers installed and running. On trying to create a simple stream for eg: 'stream create --name ticktock --definition "time | log" --deploy', the module deployment fails with below exception on Admin and Container log files. This issue happens ONLY when Spring XD admin decides to deploy to a XD container node where zookeeper is either not running or not installed on host. There are two issues

  1. xd.messagebus.kafka.zkAddress is not updated with list of all zookeeper hosts.
  2. zk.client.connect is not updated with all the zookeeper hosts, currently, it does just one zookeeper host and port.

org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 10000 at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:880) ~[na:na] at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98) ~[na:na] at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84) ~[na:na] at org.springframework.xd.dirt.integration.kafka.KafkaMessageBus.ensureTopicCreated(KafkaMessageBus.java:566) ~[na:na] at org.springframework.xd.dirt.integration.kafka.KafkaMessageBus.bindProducer(KafkaMessageBus.java:502) ~[na:na] at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindMessageProducer(AbstractMessageBusBinderPlugin.java:287) ~[spring-xd-dirt-1.2.1.RELEASE.jar:1.2.1.RELEASE] at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindConsumerAndProducers(AbstractMessageBusBinderPlugin.java:143) ~[spring-xd-dirt-1.2.1.RELEASE.jar:1.2.1.RELEASE] at org.springframework.xd.dirt.plugins.stream.StreamPlugin.postProcessModule(StreamPlugin.java:73) ~[spring-xd-dirt-1.2.1.RELEASE.jar:1.2.1.RELEASE] at org.springframework.xd.dirt.module.ModuleDeployer.postProcessModule(ModuleDeployer.java:238) ~[spring-xd-dirt-1.2.1.RELEASE.jar:1.2.1.RELEASE] at org.springframework.xd.dirt.module.ModuleDeployer.doDeploy(ModuleDeployer.java:218) ~[spring-xd-dirt-1.2.1.RELEASE.jar:1.2.1.RELEASE] at org.springframework.xd.dirt.module.ModuleDeployer.deploy(ModuleDeployer.java:200) ~[spring-xd-dirt-1.2.1.RELEASE.jar:1.2.1.RELEASE] at org.springframework.xd.dirt.server.container.DeploymentListener.deployModule(DeploymentListener.java:365) [spring-xd-dirt-1.2.1.RELEASE.jar:1.2.1.RELEASE] at org.springframework.xd.dirt.server.container.DeploymentListener.deployStreamModule(DeploymentListener.java:334) [spring-xd-dirt-1.2.1.RELEASE.jar:1.2.1.RELEASE] at org.springframework.xd.dirt.server.container.DeploymentListener.onChildAdded(DeploymentListener.java:181) [spring-xd-dirt-1.2.1.RELEASE.jar:1.2.1.RELEASE] at org.springframework.xd.dirt.server.container.DeploymentListener.childEvent(DeploymentListener.java:149) [spring-xd-dirt-1.2.1.RELEASE.jar:1.2.1.RELEASE] at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:509) [curator-recipes-2.6.0.jar:na] at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:503) [curator-recipes-2.6.0.jar:na] at org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:92) [curator-framework-2.6.0.jar:na] at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297) [guava-16.0.1.jar:na] at org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:83) [curator-framework-2.6.0.jar:na] at org.apache.curator.framework.recipes.cache.PathChildrenCache.callListeners(PathChildrenCache.java:500) [curator-recipes-2.6.0.jar:na] at org.apache.curator.framework.recipes.cache.EventOperation.invoke(EventOperation.java:35) [curator-recipes-2.6.0.jar:na] at org.apache.curator.framework.recipes.cache.PathChildrenCache$10.run(PathChildrenCache.java:762) [curator-recipes-2.6.0.jar:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_67] at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_67] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_67] at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_67] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]

Analysis: The /etc/springxd/servers.yml file is not updated with list of zookeepers for the param xd.messagebus.kafka.zkAddress. Somehow the parameter kafka_installed in (servers.yml.j2) is returning false and hence its not updated with correct values.

{% if kafka_installed %} kafka: brokers: {{xd_messagebus_kafka_brokers}} zkAddress: {{zk_server}}:{{zk_port}} {% endif %}

However, it update the zk.client.connect with one zookeeper host and port where it should update with all the zookeeper hosts and their port.

Workaround: The current workaround is to update the template file servers.yml.j2 for the correct values of zookeepers xd.messagebus.kafka.zkAddress and zk.client.connect and then restart the Spring XD containers.

jvalkeal commented 8 years ago

paging @nsarvi. ok, just pushed some fixes. zk connect string with all zk hosts is not used. I couldn't reprod other issue where xd container was deployed to a host where zk is not installed.

nsarvi commented 8 years ago

For other issue, Kafka is installed outside of the Pivotal Ambari. There could be chances that kafka_installed param is false in the Ambari DB and hence its not updating /etc/springxd/servers.yml with list of zookeepers for the param xd.messagebus.kafka.zkAddress.

Somehow the parameter kafka_installed in (servers.yml.j2) is returning false.

{% if kafka_installed %} kafka: brokers: {{xd_messagebus_kafka_brokers}} zkAddress: {{zk_server}}:{{zk_port}} {% endif %}

I've reproducible case, you want to connect and verify ?

jvalkeal commented 8 years ago

Yeah it's false if ambari didn't install Kafka. Sucks that ambari config ui cannot be extended with better components. I'll try to make a some sort of tweak/fix.

jvalkeal commented 8 years ago

reopen. this gets complicated indeed because it then opens a question if external kafka is using ambari managed zk or not. ambari ui is very limited to make anything clever(we can only add an empty placeholder which needs to have one space to determine if something is set). configuration is getting very awkward.

jvalkeal commented 8 years ago

@nsarvi ok just made some changes to handle external kafka/zk. check above commit.

jvalkeal commented 8 years ago

closing as this should be fixed now.