hmsonline / storm-cassandra-cql

Storm Cassandra Bridge built on CQL
Apache License 2.0
43 stars 38 forks source link

CassandraCqlIncrementalState commit only one entry #36

Closed joohnnie closed 9 years ago

joohnnie commented 9 years ago

Hi, I'm using CassandraCqlIncrementalState to update the counter.

https://github.com/hmsonline/storm-cassandra-cql/blob/master/src/main/java/com/hmsonline/trident/cql/incremental/CassandraCqlIncrementalState.java

Here is the commit method:

@Override public void commit(Long txid) { boolean applied = false; DriverException lastException = null; // Read current value. //if we failed to apply the update , maybe the state has change already , we need to calculate the new state and apply it again for (Map.Entry<K, V> entry : aggregateValues.entrySet()) { int attempts = 0; while (!applied && attempts < maxAttempts) { try{ applied = updateState(entry, txid); } catch(QueryExecutionException e) { lastException = e; LOG.warn("Catching {} attempt {}"+txid+"-"+partitionIndex, e.getMessage(), attempts); } attempts++; } if(!applied) { if(lastException != null) { throw new CassandraCqlIncrementalStateException("Ran out of attempts ["+attempts+"] max of ["+maxAttempts+"] "+txid+"-"+ partitionIndex, lastException); } else { throw new CassandraCqlIncrementalStateException("Ran out of attempts ["+attempts+"] max of ["+maxAttempts+"] "+txid+"-"+ partitionIndex); } } } }

When the aggregateValues has multiple entries, it only updated the first entry into cassandra. As the applied variable will be set true if the first invocation of updateState successfully, and the following loop will not enter any more

while (!applied && attempts < maxAttempts)

Do we need to set applied to false after each entry updated successfully?

boneill42 commented 9 years ago

Great catch. Fix on its way.

joohnnie commented 9 years ago

Thanks

boneill42 commented 9 years ago

Fixed merged to master, will be in 0.3.0 release.