ptgoetz / storm-jms

Storm JMS Integration
Apache License 2.0
78 stars 66 forks source link

JMS Spout ack() and fail() implementation #4

Open marjan-sterjev opened 12 years ago

marjan-sterjev commented 12 years ago

Hi,

The JMS Spout implements the ack() and fail() mechanisms by JMS message acknowledgment and JMS session recovery. One very important fact that is usually misunderstood in JMS is that the JMS message.acknowledge() call acknowledges the whole JMS session up to that moment, not just that particular message.

http://docs.oracle.com/javaee/5/api/javax/jms/Message.html#acknowledge%28%29 void acknowledge() throws JMSException Acknowledges all consumed messages of the session of this consumed message.

So, if you have received 2 JMS messages, msg1 and msg2 (in that order), msg2 is successfully processed, msg1 has failed and the acknowledgment for the msg2 is received first, msg1 won’t be redelivered.

Probably the message reply mechanism in the JMS case shall use some intermediate (semi) persistent storage (Hazelcast map for example).

mike-wendt commented 11 years ago

ActiveMQ does support individual acknowledgement; however, there is performance issues with this on the ActiveMQ side. An intermediate store is a possible solution or we could remove the message listener and do batch acknowledgement:

Consume N messages -> send all N messages as tuples -> get acks/fails for all N messages -> add M failed messages back to queue -> acknowledge all N messages with one acknowledgement.

I welcome input and would be more than happy to refactor the spout to make it more durable.