amazon-archives / kinesis-storm-spout

Kinesis spout for Storm
Other
106 stars 64 forks source link

"Cannot process events if state is not active" while deactivating #26

Open ekozo-tgu opened 8 years ago

ekozo-tgu commented 8 years ago

Hi

In our topology we have 1 kinesis-spout and we are observing the following error during topology deactivation:

2016-01-23 13:52:49 c.a.s.k.s.s.z.ZookeeperStateManager [INFO] ZookeeperStateManager[taskIndex=0]Advanced checkpoint for shardId-000000000000 to 49557085486395718085074646145964072477802219383101063170
2016-01-23 13:52:55 b.s.d.executor [INFO] Deactivating spout kinesisSpout:(16)
2016-01-23 13:52:55 c.a.s.k.s.s.z.ZookeeperStateManager [INFO] ZookeeperStateManager[taskIndex=0]Advanced checkpoint for shardId-000000000000 to 49557085486395718085074646148585023654726735571303006210
2016-01-23 13:52:55 o.a.z.ClientCnxn [ERROR] Error while calling watcher 
java.lang.IllegalStateException: Cannot process events if state is not active (a ZK connection is necessary).
        at com.google.common.base.Preconditions.checkState(Preconditions.java:176) ~[stormjar.jar:na]
        at com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperStateManager.process(ZookeeperStateManager.java:268) ~[stormjar.jar:na]
        at com.netflix.curator.framework.imps.NamespaceWatcher.process(NamespaceWatcher.java:38) ~[stormjar.jar:na]
        at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:522) [stormjar.jar:na]
        at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:498) [stormjar.jar:na]
2016-01-23 13:52:55 o.a.z.ZooKeeper [INFO] Session: 0x1518daf83170242 closed
2016-01-23 13:52:55 o.a.z.ClientCnxn [INFO] EventThread shut down

After looking at the source we noticed that the ZK connection is closed as soon as the topology is deactivated. That means that all tuples that are still processing will be re-processed at the next startup.

We know that this follows "at least once", but Is there any way of not closing the zk connection until the topology stop ? With this approach we could accept all incoming ACK events and commit in zookeeper the real status.