hmsonline / storm-cassandra-cql

Storm Cassandra Bridge built on CQL
Apache License 2.0
43 stars 38 forks source link

Error while running example WordCountTopology #58

Open harsha273 opened 8 years ago

harsha273 commented 8 years ago

Guys,

I am quite new to trident and cassandra. While trying to run the wordcount example , am getting this exception midway through the run. Any ideas please?

DEBUG (com.hmsonline.trident.cql.CassandraCqlMapState:165) - Putting the following keys: [[went, spout1], [and, spout2]] with values: [73, 73]
ERROR (clojure.tools.logging$eval37$fn__43:16) - Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78)
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
    at backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183$fn__5230.invoke(executor.clj:745)
    at backtype.storm.util$async_loop$fn__390.invoke(util.clj:433)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1
    at com.hmsonline.trident.cql.CassandraCqlMapState.checkCassandraException(CassandraCqlMapState.java:227)
    at com.hmsonline.trident.cql.CassandraCqlMapState.multiGet(CassandraCqlMapState.java:157)
    at storm.trident.state.map.CachedMap.multiGet(CachedMap.java:52)
    at storm.trident.state.map.NonTransactionalMap.multiGet(NonTransactionalMap.java:39)
    at storm.trident.state.map.SnapshottableMap.multiGet(SnapshottableMap.java:37)
    at storm.trident.operation.builtin.MapGet.batchRetrieve(MapGet.java:31)
    at storm.trident.operation.builtin.MapGet.batchRetrieve(MapGet.java:28)
    at storm.trident.planner.processor.StateQueryProcessor.finishBatch(StateQueryProcessor.java:84)
    at storm.trident.planner.SubtopologyBolt.finishBatch(SubtopologyBolt.java:152)
    at storm.trident.topology.TridentBoltExecutor.finishBatch(TridentBoltExecutor.java:252)
    at storm.trident.topology.TridentBoltExecutor.checkFinish(TridentBoltExecutor.java:285)
    at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:364)
    at backtype.storm.daemon.executor$eval5170$fn__5171$tuple_action_fn__5173.invoke(executor.clj:630)
    at backtype.storm.daemon.executor$mk_task_receiver$fn__5091.invoke(executor.clj:398)
    at backtype.storm.disruptor$clojure_handler$reify__1894.onEvent(disruptor.clj:58)
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104)
    ... 6 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
    at storm.trident.tuple.TridentTupleView.getValue(TridentTupleView.java:227)
    at storm.trident.tuple.TridentTupleView.get(TridentTupleView.java:222)
    at com.hmsonline.trident.cql.example.wordcount.WordCountAndSourceMapper.retrieve(WordCountAndSourceMapper.java:41)
    at com.hmsonline.trident.cql.example.wordcount.WordCountAndSourceMapper.retrieve(WordCountAndSourceMapper.java:15)
    at com.hmsonline.trident.cql.CassandraCqlMapState.multiGet(CassandraCqlMapState.java:137)
    ... 20 more
gaurchandan commented 8 years ago

Actually problem is in WordCountTopology we were not passing source(spout) so what I do to overcome this...

  1. Change the parameters of drpc client client.execute("words", "cat:spout1 dog:spout2 the:spout1 man:spout1")); with each word , i m sending spout source seprated by double colon.
  2. Declare my own MultiSplit class which will emit both word and source @Override public class MultiSplit extends Split{
public void execute(TridentTuple tuple, TridentCollector collector) {
    // TODO Auto-generated method stub
    for(String word: tuple.getString(0).split(" ")) {
        if(word.length() > 0) {
            String[] sourceAndWord=word.split(":");
            collector.emit(new Values(sourceAndWord[0],sourceAndWord[1]));
        }
    }
}
  1. Then use it in TopologyBuilder

TridentState wordCounts = topology.newStream("spout1", spout1) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word", "source")) .persistentAggregate(CassandraCqlMapState.nonTransactional(new WordCountAndSourceMapper()), new IntegerCount(), new Fields("count")) .parallelismHint(6); topology.newDRPCStream("words", drpc).each(new Fields("args"), new MultiSplit(), new Fields("word", "source")) .groupBy(new Fields("word", "source")) .stateQuery(wordCounts, new Fields("word", "source"), new MapGet(), new Fields("count")) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

4,. And last change in WordCountAndSourceMapper as i change the indexes of where clause @Override public Statement retrieve(List keys) { // Retrieve all the columns associated with the keys Select statement = QueryBuilder.select().column(SOURCE_KEY_NAME) .column(WORD_KEY_NAME).column(VALUE_NAME) .from(KEYSPACE_NAME, TABLE_NAME); statement.where(QueryBuilder.eq(WORD_KEY_NAME, keys.get(0))); statement.where(QueryBuilder.eq(SOURCE_KEY_NAME, keys.get(1))); return statement; }

so this will definately resolves the issue. All the best