hmsonline / storm-cassandra-cql

Storm Cassandra Bridge built on CQL
Apache License 2.0
43 stars 38 forks source link

[Help needed] partitionPersist().newValuesStream() emits nothing #23

Closed huylv closed 9 years ago

huylv commented 9 years ago

Hi,

I'm using storm-cassandra-cql 0.1.8 from Maven central repo. I'm having issue using partitionPersist() followed by .newValuesStream() so that I can further process the key-value pairs but it emits nothing afterward. My topo is as simple as follows:

myTopo.newStream(...)
      .each(new Fields("key", "value"), new Debug()) // It's ok right here.
      .partitionPersist(new CassandraCqlStateFactory(), new Fields("key", "value"), CassandraCqlStateUpdater<>(new MyTableMapper()), new Fields("key", "value")) // Tuples are persisted to Cassandra.
      .newValuesStream()
      .each(new Fields("key", "value"), new Debug()) // But nothing seems to be emitted here

Am I doing something wrong?

boneill42 commented 9 years ago

No, you are doing everything right.
Presently though, we don't emit from the StateUpdater, which means all the tuples get filtered. I'll update that behavior to echo the tuple back out.

huylv commented 9 years ago

Thank you, Brian. I'll leave this issue open until you update the code. Cheers, Huy

boneill42 commented 9 years ago

I finally got to take a look at this, and I'm not sure it makes sense to emit from the base CassandraCqlState implementation, since it does not produce anything new. (unless downstream components are interested in the CQL statements that executed)

See: "You can also see that StateUpdaters are given a TridentCollector. Tuples emitted to this collector go to the "new values stream". In this case, there's nothing interesting to emit to that stream, but if you were doing something like updating counts in a database, you could emit the updated counts to that stream. You can then get access to the new values stream for further processing via the TridentState#newValuesStream method." https://storm.apache.org/documentation/Trident-state

Based on this, I don't think the updaters should emit. Instead, I think you just want to remove the newValuesStream() call in your topology, which will give you access to the original tuples.

boneill42 commented 9 years ago

It may however make sense to emit the aggregate values elsewhere, but we'll have to take a look at it, because some of those values aren't available during the update call (only after commit)

We'll wait to see if people want that. I'm going to close this issue for now.

ghost commented 9 years ago

Hi Brian,

This point could be interesting for another reason : To coordinate the persistence between two different vessels. (i.e. to Cassandra and then to Kafka.) Pull request shortly.

Thanks, Lucas