ppat / storm-rabbitmq

A library of tools for interacting with RabbitMQ from Storm.
MIT License
126 stars 77 forks source link

Slows down to a crawl with many queued messages #28

Closed otearle closed 9 years ago

otearle commented 9 years ago

Hi ppat, I like your library but I'm seeing issues with speed. I'm writing an application which gets continually bombarded with messages so I always have a very large number of messages on the queue: in the order of thousands. I used your basic connection code from the readme and noticed when I run my app that it starts off well, it processes messages very fast but that gradually slows down to a crawl. I was looking through your code and I couldn't see anything immediately obvious, have you experienced this before?

SeanTAllen commented 9 years ago

@otearle What are you setting max spout pending to Storm? Without max spout pending set, it is possible to swamp a topology that can't keep up with the incoming rate of messages such that it will eventually grind to halt.

otearle commented 9 years ago

Only 100. I'm using storm-0.9.4 now as well in an attempt to fix

SeanTAllen commented 9 years ago

@otearle We are using an earlier version of the library in production without any issue and are planning to upgrade to the latest soon. Its possible with recent changes that a bug was introduced but that seems unlikely. Can you make your code available somewhere and I could take a look?

otearle commented 9 years ago

Ok, which version are you using? I'm afraid it's production code so I can't point you anywhere unfortunately. I have been using my fork of the AMQP-storm library without any issues but I like this library because it's cleaner.

The code for using this library is below:

    ConnectionConfig connectionConfig = new ConnectionConfig(amqpConfig.getAmqpInputHost(), amqpConfig.getAmqpInputPort(), amqpConfig.getAmqpInputUserName(), amqpConfig.getAmqpInputPassword(), amqpConfig.getAmqpInputVHost(), 10); // host, port, username, password, virtualHost, heartBeat 
    ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig)
                                                            .queue(amqpConfig.getAmqpInputQueue())
                                                            .prefetch(100)
                                                            .requeueOnFail()
                                                            .build();
    Config config = new Config();
    config.setMaxSpoutPending(100);
    config.putAll(spoutConfig.asMap());

    Scheme scheme = new InputDataScheme();
    IRichSpout amqpSpout = new RabbitMQSpout(scheme);
otearle commented 9 years ago

I think I'm on to something now, if I change the RabbitMQSpout to an UnanchoredRabbitMQSpout I can see the messages flying off the queue. This would mean that messages are being lost somewhere in my topology instead of being replayed all the time until success. I need to carry on investigating.

SeanTAllen commented 9 years ago

@otearle any luck?

otearle commented 9 years ago

I have had a major dig into my code. I think there is a problem with an aggregator which causes messages to take ages to go through. I fixed the issue by reducing the Trident batch size and batch emit interval. Seems to be alright with this library now although I'm not sure what caused it to stop before when it was slow but still functional with the storm-amqp library. Thanks for all your help in resolving this