apache / incubator-heron

Apache Heron (Incubating) is a realtime, distributed, fault-tolerant stream processing engine from Twitter
https://heron.apache.org/
Apache License 2.0
3.65k stars 597 forks source link

Issue in Storm-kafka with heron #1327

Open khushboo13 opened 8 years ago

khushboo13 commented 8 years ago

Hi,

While using kafka spout with heron I am getting following exception.

[2016-09-02 13:08:37 +0530] storm.kafka.DynamicBrokersReader INFO:  Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=10.14.24.194:9092, 1=10.14.24.192:9092}}
[2016-09-02 13:08:37 +0530] com.twitter.heron.instance.HeronInstance SEVERE:  Exception caught in thread: SlaveThread with id: 12
java.lang.NoClassDefFoundError: storm/trident/spout/ISpoutPartition
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at storm.kafka.trident.GlobalPartitionInformation.getOrderedPartitions(GlobalPartitionInformation.java:54)
        at storm.kafka.KafkaUtils.calculatePartitionsForTask(KafkaUtils.java:215)
        at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:80)
        at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69)
        at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135)
        at backtype.storm.topology.IRichSpoutDelegate.nextTuple(IRichSpoutDelegate.java:67)
        at com.twitter.heron.instance.spout.SpoutInstance.produceTuple(SpoutInstance.java:271)
        at com.twitter.heron.instance.spout.SpoutInstance.access$100(SpoutInstance.java:42)
        at com.twitter.heron.instance.spout.SpoutInstance$1.run(SpoutInstance.java:176)
        at com.twitter.heron.common.basics.WakeableLooper.executeTasksOnWakeup(WakeableLooper.java:142)
        at com.twitter.heron.common.basics.WakeableLooper.runOnce(WakeableLooper.java:74)
        at com.twitter.heron.common.basics.WakeableLooper.loop(WakeableLooper.java:64)
        at com.twitter.heron.instance.Slave.run(Slave.java:169)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: storm.trident.spout.ISpoutPartition
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
                                                                                                                             1249,1-8      81%
kramasamy commented 8 years ago

We are working on a kafka spout for Heron - check the pull request

https://github.com/twitter/heron/pull/1317

Once it is merged, we will publish a JAR in maven central it will seamlessly work with Heron

On Sep 2, 2016, at 12:42 AM, khushboo13 notifications@github.com wrote:

Hi,

While using kafka spout with heron I am getting following exception.

[2016-09-02 13:08:37 +0530] storm.kafka.DynamicBrokersReader INFO: Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=10.14.24.194:9092, 1=10.14.24.192:9092}} [2016-09-02 13:08:37 +0530] com.twitter.heron.instance.HeronInstance SEVERE: Exception caught in thread: SlaveThread with id: 12 java.lang.NoClassDefFoundError: storm/trident/spout/ISpoutPartition at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at storm.kafka.trident.GlobalPartitionInformation.getOrderedPartitions(GlobalPartitionInformation.java:54) at storm.kafka.KafkaUtils.calculatePartitionsForTask(KafkaUtils.java:215) at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:80) at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) at backtype.storm.topology.IRichSpoutDelegate.nextTuple(IRichSpoutDelegate.java:67) at com.twitter.heron.instance.spout.SpoutInstance.produceTuple(SpoutInstance.java:271) at com.twitter.heron.instance.spout.SpoutInstance.access$100(SpoutInstance.java:42) at com.twitter.heron.instance.spout.SpoutInstance$1.run(SpoutInstance.java:176) at com.twitter.heron.common.basics.WakeableLooper.executeTasksOnWakeup(WakeableLooper.java:142) at com.twitter.heron.common.basics.WakeableLooper.runOnce(WakeableLooper.java:74) at com.twitter.heron.common.basics.WakeableLooper.loop(WakeableLooper.java:64) at com.twitter.heron.instance.Slave.run(Slave.java:169) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: storm.trident.spout.ISpoutPartition at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 1249,1-8 81%

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/twitter/heron/issues/1327, or mute the thread https://github.com/notifications/unsubscribe-auth/AAWcRDT4YYK1aNWNqHYma35bIssknMvaks5ql9NwgaJpZM4JzcuD.

khushboo13 commented 8 years ago

@kramasamy By when can I expect the new jar.

kramasamy commented 8 years ago

We are very close to merging it in the master. The JAR should be available next week. @nlu90 - can you publish this jar into maven central early next week?

nlu90 commented 8 years ago

@khushboo13 The code is merged here #1317

khushboo13 commented 8 years ago

@nlu90 You will be updating the storm-kafka to work with heron or adding a new heron-kafka.

nlu90 commented 8 years ago

@khushboo13 we updated the existing storm-kafka to work with heron

khushboo13 commented 8 years ago

@nlu90 I cannot see any new releases for storm-kafka in maven.Latest version it is showing is 1.0.2. Is it not published yet?

kramasamy commented 8 years ago

@khushboo13 - we are almost ready to get the release out and push the heron-kafka spout into maven. I will try to get this done tomorrow and keep you posted.

khushboo13 commented 8 years ago

@kramasamy - Hi, is the new jar published?

mycFelix commented 8 years ago

@khushboo13 - Hi, #1317 has been merged and Heron 0.14.3 version was released.

I have no idea about whether kafka-spout has already been pushed into maven. But kafka-spout source code is in the heron/contrib folder.

I package and install the jar to my local maven lib by myself, fortunately it works, which means we can use kafka-spout in Heron.

> cd $heron-path/contrib/kafka-spout/
> mvn clean package

I hope that will help you.

khushboo13 commented 8 years ago

@mycFelix - Hi, Thanks for the response.I upgraded my version and compiled the new jar.But still I am not able to make it work with heron.

Can you please post the updated code with pom.xml?

mycFelix commented 8 years ago

@khushboo13 - Hi, let's make it work.

> $path/heron-api-install-0.14.3-darwin.sh --user --maven
> cd $heron-path/contrib/kafka-spout/
> mvn clean package
> mvn install:install-file -q -Dfile="${path}/kafka-spout.jar" -DgroupId="org.apache.storm" \
    -DartifactId="kafka-spout" -Dversion="SNAPSHOT" -Dpackaging="jar"
     <dependency>
            <groupId>com.twitter.heron</groupId>
             <artifactId>heron-api</artifactId>
            <version>SNAPSHOT</version>
            <scope>compile</scope>
      </dependency>
      <dependency>
            <groupId>com.twitter.heron</groupId>
            <artifactId>heron-storm</artifactId>
            <version>SNAPSHOT</version>
            <scope>compile</scope>
        </dependency>
     <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>kafka-spout</artifactId>
            <version>SNAPSHOT</version>
      </dependency>

I hope that will help you.

khushboo13 commented 8 years ago

Hi,

I followed these steps previously, in the code I am getting some errors because Kafka spout is using BaseRichSpout which is not supported in the heron topology builder.

Please find the code I am using

package rtm.rtm;

import java.util.Arrays;

import org.apache.storm.kafka.BrokerHosts; import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig; import org.apache.storm.kafka.ZkHosts;

import com.twitter.heron.api.Config; import backtype.storm.StormSubmitter; import backtype.storm.spout.MultiScheme; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder;

public class Spout { public static void main( String[] args ) throws backtype.storm.generated.AlreadyAliveException, backtype.storm.generated.InvalidTopologyException { String zkConnString = "ip"; String topicName = "kafkastorm"; String zkRoot = "/brokers";

    BrokerHosts hosts = new ZkHosts(zkConnString, zkRoot);
    SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/kafka/" + topicName, "felix");
    //spoutConfig.scheme = new MultiScheme();
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    spoutConfig.zkServers = Arrays.asList("ip");
    spoutConfig.zkPort = 2181;

    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("kafka",  kafkaSpout);
    builder.setBolt("reciever", new BoltTest(),3).shuffleGrouping("kafka");
    Config conf = new Config();
    // must set,or it will cause null exceptions
    conf.put("storm.zookeeper.session.timeout", 20000);
    conf.put("storm.zookeeper.connection.timeout", 15000);
    conf.put("storm.zookeeper.retry.times", 5);
    conf.put("storm.zookeeper.retry.interval", 1000);

    try {
        StormSubmitter.submitTopology("Kafka-Base-Test", conf, builder.createTopology());
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

}

}

mycFelix commented 8 years ago

@khushboo13

Hi, would you like to import org.apache.storm instead of backtype.storm and try it again?

khushboo13 commented 8 years ago

@mycFelix - I am using it with the heron, if I change to apache.storm then my other topologies will not work

mycFelix commented 8 years ago

@khuhboo13 - You dont need to change your pom.xml just the code you show me. Heron has been included org.apache.storm

khushboo13 commented 8 years ago

@mycFelix - I don't need to change pom.xml, the problem I have is that in same topology builder I am having several other spouts(like RabbitMQ spout) which will not work with apache.storm.

mycFelix commented 8 years ago

@khushboo13

I see.

You are using backtype.storm.topology.TopologyBuilder in your code which required backtype.storm.topology.IRichSpout in setSpout method.

KafkaSpout comes from org.apache.storm.kafka which implements org.apache.storm.topology.IRichSpout.

I think you may get the same error if you change it from heron-kafka to storm-kafka

kramasamy commented 8 years ago

@khushboo13 - based on my understanding, you need a kafka spout with the name space backtype.storm, is that correct? Our rational for supporting org.apache.storm is that it is going to be API moving forward. Hence we thought we will support the spouts for org.apache.storm.