Created a topology with two kafka sources reading from yelp_review topic and feeding to a Join processor with a 10 sec window. Join on user_id and "select all" for output fields. Go to test mode and fill the test data (pasted in the issue) for both the sources and give repeat=3 and sleepTime = 1000 and run test.
In the streamline log following exception is printed.
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.util.Map
at org.apache.storm.bolt.JoinBolt.lookupField(JoinBolt.java:521) ~[storm-core-1.1.0.2.6.0.2-SNAPSHOT.jar:1.1.0.2.6.0.2-SNAPSHOT]
at com.hortonworks.streamline.streams.runtime.storm.bolt.query.WindowedQueryBolt.doProjectionStreamLine(WindowedQueryBolt.java:126) ~[streamline-runtime-storm-0.1.0-SNAPSHOT.jar:0.1.0-SNAPSHOT]
at com.hortonworks.streamline.streams.runtime.storm.bolt.query.WindowedQueryBolt.doProjection(WindowedQueryBolt.java:112) ~[streamline-runtime-storm-0.1.0-SNAPSHOT.jar:0.1.0-SNAPSHOT]
at org.apache.storm.bolt.JoinBolt$ResultRecord.<init>(JoinBolt.java:389) ~[storm-core-1.1.0.2.6.0.2-SNAPSHOT.jar:1.1.0.2.6.0.2-SNAPSHOT]
at org.apache.storm.bolt.JoinBolt.doInnerJoin(JoinBolt.java:273) ~[storm-core-1.1.0.2.6.0.2-SNAPSHOT.jar:1.1.0.2.6.0.2-SNAPSHOT]
at org.apache.storm.bolt.JoinBolt.doJoin(JoinBolt.java:252) ~[storm-core-1.1.0.2.6.0.2-SNAPSHOT.jar:1.1.0.2.6.0.2-SNAPSHOT]
at org.apache.storm.bolt.JoinBolt.hashJoin(JoinBolt.java:238) ~[storm-core-1.1.0.2.6.0.2-SNAPSHOT.jar:1.1.0.2.6.0.2-SNAPSHOT]
at org.apache.storm.bolt.JoinBolt.execute(JoinBolt.java:188) ~[storm-core-1.1.0.2.6.0.2-SNAPSHOT.jar:1.1.0.2.6.0.2-SNAPSHOT]
at com.hortonworks.streamline.streams.runtime.storm.testing.TestRunWindowProcessorBolt.execute(TestRunWindowProcessorBolt.java:39) ~[streamline-runtime-storm-0.1.0-SNAPSHOT.jar:0.1.0-SNAPSHOT]
at org.apache.storm.topology.WindowedBoltExecutor$1.onActivation(WindowedBoltExecutor.java:332) ~[storm-core-1.1.0.2.6.0.2-SNAPSHOT.jar:1.1.0.2.6.0.2-SNAPSHOT]
at org.apache.storm.windowing.WindowManager.onTrigger(WindowManager.java:145) ~[storm-core-1.1.0.2.6.0.2-SNAPSHOT.jar:1.1.0.2.6.0.2-SNAPSHOT]
at org.apache.storm.windowing.TimeTriggerPolicy$1.run(TimeTriggerPolicy.java:120) [storm-core-1.1.0.2.6.0.2-SNAPSHOT.jar:1.1.0.2.6.0.2-SNAPSHOT]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_144]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_144]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_144]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]
Created a topology with two kafka sources reading from yelp_review topic and feeding to a Join processor with a 10 sec window. Join on user_id and "select all" for output fields. Go to test mode and fill the test data (pasted in the issue) for both the sources and give repeat=3 and sleepTime = 1000 and run test.
In the streamline log following exception is printed.
Use the following service pool: http://ctr-e134-1499953498516-197834-01-000002:8080/api/v1/clusters/test
/etc/hosts entry:
test data: