I keep getting the Vertigo Cluster timeout exception when my kafka queue sending message to vertigo is idle for a few minutes
@Override
public void start(final Future startResult) {
LOGGER.info("App Startup Verticle To Create Network Cluster\n");
final Vertigo vertigo = new Vertigo(this);
vertigo.deployCluster("app_cluster", new Handler<AsyncResult<Cluster>>() {
public void handle(AsyncResult<Cluster> asyncResult) {
if (asyncResult.failed()) {
LOGGER.info("Unable to start Network Cluster\n");
startResult.setFailure(asyncResult.cause());
} else {
LOGGER.info("Started Network Cluster!\n");
// The cluster is used to deploy, undeploy, and reconfigure networks in a cluster
Cluster cluster = asyncResult.result();
// Create a new network configuration. This network uses circular connections to send "ack" messages back to the FaultTolerantFeeder from the MessageReceiver.
NetworkConfig network = vertigo.createNetwork("deepstream-network");
network.addVerticle("kafkaConsumer", KafkaMessageConsumer.class.getName());
network.addVerticle("kafkaMessageSender", KafkaMessageHandlerComponentVerticle.class.getName());
network.addVerticle("phaseOneMessageHandler", PhaseOneMessageHandlerComponentVerticle.class.getName());
network.createConnection("kafkaMessageSender", "out", "phaseOneMessageHandler", "in");
network.createConnection("phaseOneMessageHandler", "ack", "kafkaMessageSender", "ack");
// Deploy the network to the cluster. Once all the components
// in the network have been started and all the connections
// are successfully communicating with one another the async
// handler will be called.
cluster.deployNetwork(network, new Handler<AsyncResult<ActiveNetwork>>() {
@Override
public void handle(AsyncResult<ActiveNetwork> result) {
if (result.failed()) {
startResult.setFailure(result.cause());
} else {
startResult.setResult((Void) null);
}
}
});
}
}
});
}
Oct 02, 2014 10:24:28 PM org.vertx.java.core.logging.impl.JULLogDelegate warn
WARNING: Message reply handler for message.address='app_cluster' timed out as no reply was received - it will be removed
[ERROR]
net.kuujo.vertigo.cluster.ClusterException: org.vertx.java.core.eventbus.ReplyException: Timed out waiting for reply
at net.kuujo.vertigo.cluster.impl.DefaultCluster$19$1$1.handle(DefaultCluster.java:713)
at net.kuujo.vertigo.cluster.impl.DefaultCluster$19$1$1.handle(DefaultCluster.java:709)
at org.vertx.java.core.eventbus.impl.DefaultEventBus$4.handle(DefaultEventBus.java:703)
at org.vertx.java.core.eventbus.impl.DefaultEventBus$4.handle(DefaultEventBus.java:697)
at org.vertx.java.core.impl.DefaultVertx$InternalTimerHandler.run(DefaultVertx.java:432)
at org.vertx.java.core.impl.DefaultContext$3.run(DefaultContext.java:175)
at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:123)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:370)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.vertx.java.core.eventbus.ReplyException: Timed out waiting for reply
... 10 more
I keep getting the Vertigo Cluster timeout exception when my kafka queue sending message to vertigo is idle for a few minutes
@Override public void start(final Future startResult) {
LOGGER.info("App Startup Verticle To Create Network Cluster\n");
}
Oct 02, 2014 10:24:28 PM org.vertx.java.core.logging.impl.JULLogDelegate warn WARNING: Message reply handler for message.address='app_cluster' timed out as no reply was received - it will be removed [ERROR] net.kuujo.vertigo.cluster.ClusterException: org.vertx.java.core.eventbus.ReplyException: Timed out waiting for reply at net.kuujo.vertigo.cluster.impl.DefaultCluster$19$1$1.handle(DefaultCluster.java:713) at net.kuujo.vertigo.cluster.impl.DefaultCluster$19$1$1.handle(DefaultCluster.java:709) at org.vertx.java.core.eventbus.impl.DefaultEventBus$4.handle(DefaultEventBus.java:703) at org.vertx.java.core.eventbus.impl.DefaultEventBus$4.handle(DefaultEventBus.java:697) at org.vertx.java.core.impl.DefaultVertx$InternalTimerHandler.run(DefaultVertx.java:432) at org.vertx.java.core.impl.DefaultContext$3.run(DefaultContext.java:175) at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:123) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:370) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) Caused by: org.vertx.java.core.eventbus.ReplyException: Timed out waiting for reply ... 10 more