Closed arinto closed 9 years ago
I thought about this a bit more. If we close the consumer on deactivate, then that would automatically put all the unacknowledged tuples back in the queue. Mean time downstream bolt could have finished processing them and we'd have no way to send the acks back.
I think the only way to address this is by not close the consumer in deactivate, but rather set a flag saying the spout is inactive and only execute the code block in nextTuple() if this isn't the case.
Sorry this didn't occur to me when you first floated the idea.
On Oct 25, 2014, at 3:12 AM, Arinto Murdopo notifications@github.com wrote:
Implement activate() and deactivate() method in RabbitMQSpout. For activate method, we need to initialise new RabbitMQConsumer because we can't reuse the closed RabbitMQConsumer.
I've tested it with a topology in Storm cluster and it works. However, since there is no unit test in this project, I don't create the corresponding unit tests for this change.
You can merge this Pull Request by running
git pull https://github.com/arinto/storm-rabbitmq activate-deactivate Or view, comment on, or merge it at:
https://github.com/ppat/storm-rabbitmq/pull/18
Commit Summary
Add activate and deactivate impl for RabbitMQSpout. File Changes
M src/main/java/io/latent/storm/rabbitmq/RabbitMQSpout.java (50) Patch Links:
https://github.com/ppat/storm-rabbitmq/pull/18.patch https://github.com/ppat/storm-rabbitmq/pull/18.diff — Reply to this email directly or view it on GitHub.
Noted, yes, it makes senses. We don't want to re-process the tuples again. How should I proceed? Should I create a new pull request?
I committed a version (0e438a383cd2187c66a94f933861a85663fd5eed) that addresses the flaw discussed above. It's not ideal since it keeps rabbitmq connection open, but it's the best I can think of as the Storm Spout API does not provide any insight to the number of tuples in-flight.
Implement activate() and deactivate() method in RabbitMQSpout, resolves #17 For activate method, we need to initialise new RabbitMQConsumer because we can't reuse the closed RabbitMQConsumer.
I've tested it with a topology in Storm cluster and it works. However, since there is no unit test in this project, I don't create the corresponding unit tests for this change.