soniclavier / bigdata-notebook

http://www.vishnuviswanath.com
105 stars 86 forks source link

Using cassandra #1

Open renderit opened 8 years ago

renderit commented 8 years ago

@soniclavier Hey great work and the blog! I was trying the storm kafka code. I'm interested to try kafka-> storm-> cassandra or hadoop.. I'm sorry can you tell me which code blocks needs to be commented out if I don't want to use mongodb or solr? Alll java files in bolt folder and Topology file? I have storm installed and on path, but when I run:

ubuntu@ip-172-31-1-172:~/Downloads/hadoop_datascience/stormkafka$ storm jar target/stormkafka-0.0.1-SNAPSHOT.jar com.vishnu.storm.Topology vis remote
Running: /usr/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/storm -Dstorm.log.dir=/usr/local/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/storm/lib/clojure-1.7.0.jar:/usr/local/storm/lib/log4j-core-2.1.jar:/usr/local/storm/lib/servlet-api-2.5.jar:/usr/local/storm/lib/log4j-api-2.1.jar:/usr/local/storm/lib/log4j-slf4j-impl-2.1.jar:/usr/local/storm/lib/slf4j-api-1.7.7.jar:/usr/local/storm/lib/minlog-1.3.0.jar:/usr/local/storm/lib/reflectasm-1.10.1.jar:/usr/local/storm/lib/asm-5.0.3.jar:/usr/local/storm/lib/objenesis-2.1.jar:/usr/local/storm/lib/storm-core-1.0.1.jar:/usr/local/storm/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/storm/lib/storm-rename-hack-1.0.1.jar:/usr/local/storm/lib/kryo-3.0.3.jar:/usr/local/storm/lib/disruptor-3.3.2.jar:target/stormkafka-0.0.1-SNAPSHOT.jar:/usr/local/storm/conf:/usr/local/storm/bin -Dstorm.jar=target/stormkafka-0.0.1-SNAPSHOT.jar com.vishnu.storm.Topology vis remote
Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: backtype/storm/topology/IRichSpout
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
    at java.lang.Class.getMethod0(Class.java:3018)
    at java.lang.Class.getMethod(Class.java:1784)
    at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
    at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: backtype.storm.topology.IRichSpout
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more
screen shot 2016-06-15 at 3 55 22 pm

Also where do I define the 'type', like if I want to use only mongo?

soniclavier commented 8 years ago

if you don't need MongoDB and SOLR, you can remove the SolrBolt.java and MongodBolt.java files, also you might have to change few lines in the BoltBuilder since BoltBuilder depends on these classes.

Or you can keep those files as is and remove the binding in the Topology.java file where these bolts are added to the topology.

I haven't tried Cassandra integration but you can check this: http://storm.apache.org/releases/1.0.0/storm-cassandra.html

soniclavier commented 8 years ago

Also the code I have was written for Storm 0.10.0 and the current version is 1.0.1.

To make it work for 1.0.1 you have to do the following:

  1. change the versions of all storm dependencies in pom.xml to 1.0.1
  2. change the package import in most of the java files from backtype. to org.apache.storm.
renderit commented 8 years ago

@soniclavier Excellent! Thanks.. If I want to use hdfs but not mongo I still need to remove mongo and solr files? Parts where I updated pom.xml:

<groupId>org.apache.storm</groupId>
                        <artifactId>storm-core</artifactId>
                        <version>1.0.1</version>
                        <scope>provided</scope>
                        <exclusions>

 <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka_2.10</artifactId>
                        <version>0.9.0.1</version>
                        <exclusions>

  <groupId>org.apache.storm</groupId>
                        <artifactId>storm-kafka</artifactId>
                        <version>0.10.0-beta1</version>
   <groupId>org.apache.storm</groupId>
                        <artifactId>storm-hdfs</artifactId>
                        <version>0.10.0</version>

I changed all backtypes to org.apache My topology looks like this( trying only sinkBolt for now):

  1   package com.vishnu.storm;
  2
  3 import java.util.Properties;
  4
  5 //import org.apache.storm.hdfs.bolt.HdfsBolt;
  6
  7 import com.vishnu.storm.bolt.BoltBuilder;
  8 //import com.vishnu.storm.bolt.MongodbBolt;
  9 import com.vishnu.storm.bolt.SinkTypeBolt;
 10 //import com.vishnu.storm.bolt.SolrBolt;
 11 import com.vishnu.storm.spout.SpoutBuilder;
 12
 13 import org.apache.storm.Config;
 14 import org.apache.storm.StormSubmitter;
 15 import org.apache.storm.topology.TopologyBuilder;
 16 import storm.kafka.KafkaSpout;
 17
 18 public class Topology {
 19
 20         public Properties configs;
 21         public BoltBuilder boltBuilder;
 22         public SpoutBuilder spoutBuilder;
 23         public static final String SOLR_STREAM = "solr-stream";
 24         public static final String HDFS_STREAM = "hdfs-stream";
 25         public static final String MONGODB_STREAM = "mongodb-stream";
 26
 27
 28         public Topology(String configFile) throws Exception {
 29                 configs = new Properties();
 30                 try {
 31                         configs.load(Topology.class.getResourceAsStream("/default_config.properties"));
 32                         boltBuilder = new BoltBuilder(configs);
 33                         spoutBuilder = new SpoutBuilder(configs);
 34                 } catch (Exception ex) {
 35                         ex.printStackTrace();
 36                         System.exit(0);
 37                 }
 38         }
 39
 40         private void submitTopology() throws Exception {
 41                 TopologyBuilder builder = new TopologyBuilder();
 42                 KafkaSpout kafkaSpout = spoutBuilder.buildKafkaSpout();
43                 SinkTypeBolt sinkTypeBolt = boltBuilder.buildSinkTypeBolt();
 44                 //SolrBolt solrBolt = boltBuilder.buildSolrBolt();
 45                 //HdfsBolt hdfsBolt = boltBuilder.buildHdfsBolt();
 46                 //MongodbBolt mongoBolt = boltBuilder.buildMongodbBolt();
 47
 48
 49                 //set the kafkaSpout to topology
 50                 //parallelism-hint for kafkaSpout - defines number of executors/threads to be spawn per contai    ner
 51                 int kafkaSpoutCount = Integer.parseInt(configs.getProperty(Keys.KAFKA_SPOUT_COUNT));
 52                 builder.setSpout(configs.getProperty(Keys.KAFKA_SPOUT_ID), kafkaSpout, kafkaSpoutCount);
 53
 54
 55                 //set the sinktype bolt
 56                 int sinkBoltCount = Integer.parseInt(configs.getProperty(Keys.SINK_BOLT_COUNT));
 57                 builder.setBolt(configs.getProperty(Keys.SINK_TYPE_BOLT_ID),sinkTypeBolt,sinkBoltCount).shuffl    eGrouping(configs.getProperty(Keys.KAFKA_SPOUT_ID));
 58
 59                 //set the solr bolt
 60                 //int solrBoltCount = Integer.parseInt(configs.getProperty(Keys.SOLR_BOLT_COUNT));
 61                 //builder.setBolt(configs.getProperty(Keys.SOLR_BOLT_ID), solrBolt,solrBoltCount).shuffleGroup    ing(configs.getProperty(Keys.SINK_TYPE_BOLT_ID),SOLR_STREAM);
 62
 63                 //set the hdfs bolt
 64                 //int hdfsBoltCount = Integer.parseInt(configs.getProperty(Keys.HDFS_BOLT_COUNT));
 65                 //builder.setBolt(configs.getProperty(Keys.HDFS_BOLT_ID),hdfsBolt,hdfsBoltCount).shuffleGroupi    ng(configs.getProperty(Keys.SINK_TYPE_BOLT_ID),HDFS_STREAM);
 66
 67                 //set the mongodb bolt
 68                 //int mongoBoltCount = Integer.parseInt(configs.getProperty(Keys.MONGO_BOLT_COUNT));
 69                 //builder.setBolt(configs.getProperty(Keys.MONGO_BOLT_ID),mongoBolt,mongoBoltCount).shuffleGro    uping(configs.getProperty(Keys.SINK_TYPE_BOLT_ID),MONGODB_STREAM);
 70
 71
 72                 Config conf = new Config();
 73                 //conf.put("solr.zookeeper.hosts",configs.getProperty(Keys.SOLR_ZOOKEEPER_HOSTS));
 74
 75
 76                 String topologyName = configs.getProperty(Keys.TOPOLOGY_NAME);
 77                 //Defines how many worker processes have to be created for the topology in the cluster.
                conf.setNumWorkers(4);
 79                 StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());
 80         }
 81
 82         public static void main(String[] args) throws Exception {
 83                 String configFile;
 84                 if (args.length == 0) {
 85                         System.out.println("Missing input : config file location, using default");
 86                         configFile = "default_config.properties";
 87
 88                 } else{
 89                         configFile = args[0];
 90                 }
 91
 92                 Topology ingestionTopology = new Topology(configFile);
 93                 ingestionTopology.submitTopology();
 94         }
 95
 96 }

Error I am getting:

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project stormkafka: Compilation failure: Compilation failure:
[ERROR] /home/ubuntu/Downloads/hadoop_datascience/stormkafka/src/main/java/com/vishnu/storm/spout/SpoutBuilder.java:[30,36] cannot access backtype.storm.spout.MultiScheme
[ERROR] class file for backtype.storm.spout.MultiScheme not found
[ERROR] /home/ubuntu/Downloads/hadoop_datascience/stormkafka/src/main/java/com/vishnu/storm/Topology.java:[52,24] cannot access backtype.storm.topology.base.BaseRichSpout
[ERROR] class file for backtype.storm.topology.base.BaseRichSpout not found

My SpountBuilder:

import org.apache.storm.spout.RawScheme;
import org.apache.storm.spout.SchemeAsMultiScheme;

public class SpoutBuilder {

        public Properties configs = null;

        public SpoutBuilder(Properties configs) {
                this.configs = configs;
        }
        public KafkaSpout buildKafkaSpout() {
                BrokerHosts hosts = new ZkHosts(configs.getProperty(Keys.KAFKA_ZOOKEEPER));
                String topic = configs.getProperty(Keys.KAFKA_TOPIC);
                String zkRoot = configs.getProperty(Keys.KAFKA_ZKROOT);
                String groupId = configs.getProperty(Keys.KAFKA_CONSUMERGROUP);
                SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, groupId);
                spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
                KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
                return kafkaSpout;
        }
}
soniclavier commented 8 years ago

pom.xml should have the following.

         <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.0.1</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>1.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-hdfs</artifactId>
            <version>1.0.1</version>
        </dependency>
renderit commented 8 years ago

@soniclavier So I was having too many issues so I downgraded to 0.10.1; so I won't have to change? So it compiles fine and creates the jar,(I think it works now) but sinkbolt should be printing the kafka output in the terminal correct? (or maybe not, since storm says - 'topology submitted') Anyway, one last thing I wanted to ask is, since Iam passing json from kafka, where should I parse it? In Spout.. I saw this, is this passing messages as is?

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

p.s. why in the ui, the spout is showing up as 'kafka-spout' instead of 'SpoutBuilder'. I don't see kafka-spout

sudcha commented 8 years ago

hi, when i tried to execute below code on spark 2.0.0 i get below error. Can please help me out with the same.? is it issues with datatype from csv/data frame. ?

I executed same code you have written.


Original Code -

val predictionAndLabels = result.map { row => (row.get(0).asInstanceOf[Double],row.get(1).asInstanceOf[Double]) } val metrics = new BinaryClassificationMetrics(predictionAndLabels) println("Area under ROC = " + metrics.areaUnderROC())

Modified Code -

val predictionAndLabels = model.transform(test).rdd.map { row => (row.get(0).asInstanceOf[Double], row.get(1).asInstanceOf[Double]) }

val metrics = new BinaryClassificationMetrics(predictionAndLabels) println("Area under ROC = " + metrics.areaUnderROC())

I get error when i do metrics.areaUnderROC()

16/10/14 19:19:09 ERROR TaskSetManager: Task 0 in stage 130.0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 130.0 failed 1 times, most recent failure: Lost task 0.0 in stage 130.0 (TID 132, localhost): java.lang.ClassCastException

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.collect(RDD.scala:892) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$4$lzycompute(BinaryClassificationMetrics.scala:190) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$4(BinaryClassificationMetrics.scala:144) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions$lzycompute(BinaryClassificationMetrics.scala:146) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions(BinaryClassificationMetrics.scala:146) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.createCurve(BinaryClassificationMetrics.scala:221) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.roc(BinaryClassificationMetrics.scala:85) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:96) ... 54 elided Caused by: java.lang.ClassCastException