salesforce / storm-dynamic-spout

A framework for building spouts for Apache Storm and a Kafka based spout for dynamically skipping messages to be processed later.
BSD 3-Clause "New" or "Revised" License
41 stars 13 forks source link

Fix failed message leak. the message was never removed from trackedMap in certain scenarios #89

Closed Crim closed 6 years ago

Crim commented 6 years ago

Summary

When VirtualSpout decides that a fail()'d message should NOT be retried, it notifies the retry manager to 'ack' it, and notifies the consumer to 'ack' it. Unfortunately what it did NOT do was remove it from the trackedMessages map. This map contains a mapping of all messages in flight, mapping their messageId => message.

Because we were not removing from this map, if a VirtualSpout had a large number of failed messages that exceeded the fail threshold, those map entries would linger around and never be cleaned up, until the vspout was closed. This results in a memory leak.

Solution I think the proper solution is to just call ack() on the messageId in this scenario. This should mark the message as being ack'd in all the required ways.

stanlemon commented 6 years ago

Seems fine, is there a test case for this that would have failed before?

Crim commented 6 years ago

Unfortunately not, unless we test the internals of the class -- which we do tons of other places, so on the fence about it. What do you think should I work out a test case for it?