salesforce / storm-dynamic-spout

A framework for building spouts for Apache Storm and a Kafka based spout for dynamically skipping messages to be processed later.
BSD 3-Clause "New" or "Revised" License
41 stars 13 forks source link

Race condition on VirtualSpout close cleaning up consumer state #92

Closed Crim closed 6 years ago

Crim commented 6 years ago

Summary

On VirtualSpout.close() when a sideline has been completed, it attempts to cleanup consumer state. If you have multiple Spout instances (and therefor multiple VirtualSpout instances for the sideline) you can run into a race condition cleaning up consumer state. When it attempts to delete the parent state node in zookeeper, it blindly attempts to wipe the parent node, but does not handle if the parent node has already been wiped by another instance.

Stack Trace

2017-12-13 16:44:41.875 topo-id] c.s.s.s.d.c.SpoutRunner [ERROR] SpoutRunner for topo-id:sideline:14064E9096F16FF755BD21C640976428 threw an exception org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /sideline-spout/topo-id/consumers/topo-id:sideline:14064E9096F16FF755BD21C640976428 
java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /sideline-spout/topo-id/consumers/topo-id:sideline:14064E9096F16FF755BD21C640976428
    at com.salesforce.storm.spout.dynamic.persistence.zookeeper.CuratorHelper.deleteNodeIfNoChildren(CuratorHelper.java:184) ~[stormjar.jar:?]
    at com.salesforce.storm.spout.dynamic.persistence.ZookeeperPersistenceAdapter.clearConsumerState(ZookeeperPersistenceAdapter.java:168) ~[stormjar.jar:?]
    at com.salesforce.storm.spout.dynamic.kafka.Consumer.removeConsumerState(Consumer.java:620) ~[stormjar.jar:?]
    at com.salesforce.storm.spout.dynamic.VirtualSpout.close(VirtualSpout.java:260) ~[stormjar.jar:?]
    at com.salesforce.storm.spout.dynamic.coordinator.SpoutRunner.run(SpoutRunner.java:181) [stormjar.jar:?]
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) [?:1.8.0_102]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_102]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_102]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102]
stanlemon commented 6 years ago

This will need to be fixed in 0.9 and 0.10.

Crim commented 6 years ago

Fixed in master (0.10.x) and backported to 0.9.x as version 0.9.2