twitter / hbc

A Java HTTP client for consuming Twitter's realtime Streaming API
https://developer.twitter.com/en/docs/tweets/filter-realtime/overview
Apache License 2.0
962 stars 373 forks source link

Trying to use hbc with apache storm on Eclipse on Windows #160

Closed mhelal closed 2 years ago

mhelal commented 8 years ago

Hello,

I need support trying to use your hpc APIs, to connect to twitter streams as a spout in apache storm. The java file below is what I am trying, using the pom.xml below.

I am getting a runtime exception from line 106:

cluster.submitTopology("test", conf, builder.createTopology());

says:

java.lang.RuntimeException: java.io.NotSerializableException: com.twitter.hbc.core.event.ConnectionEvent

with traceback:

java.lang.RuntimeException: java.io.NotSerializableException: com.twitter.hbc.core.event.ConnectionEvent at backtype.storm.utils.Utils.javaSerialize(Utils.java:91) ~[storm-core-0.10.0.jar:0.10.0] at backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:112) ~[storm-core-0.10.0.jar:0.10.0] at storm.starter.TwitterTopology.main(TwitterTopology.java:106) ~[classes/:?] Caused by: java.io.NotSerializableException: com.twitter.hbc.core.event.ConnectionEvent at java.io.ObjectOutputStream.writeObject0(Unknown Source) ~[?:1.7.0_76] at java.io.ObjectOutputStream.writeObject(Unknown Source) ~[?:1.7.0_76] at java.util.concurrent.LinkedBlockingQueue.writeObject(Unknown Source) ~[?:1.7.0_76] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.7.0_76] at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:1.7.0_76] at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:1.7.0_76] at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.7.0_76] at java.io.ObjectStreamClass.invokeWriteObject(Unknown Source) ~[?:1.7.0_76] at java.io.ObjectOutputStream.writeSerialData(Unknown Source) ~[?:1.7.0_76] at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source) ~[?:1.7.0_76] at java.io.ObjectOutputStream.writeObject0(Unknown Source) ~[?:1.7.0_76] at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source) ~[?:1.7.0_76] at java.io.ObjectOutputStream.writeSerialData(Unknown Source) ~[?:1.7.0_76] at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source) ~[?:1.7.0_76] at java.io.ObjectOutputStream.writeObject0(Unknown Source) ~[?:1.7.0_76] at java.io.ObjectOutputStream.writeObject(Unknown Source) ~[?:1.7.0_76] at backtype.storm.utils.Utils.javaSerialize(Utils.java:87) ~[storm-core-0.10.0.jar:0.10.0] ... 2 more


As far as I see, I have made the spout and bolt themselves serializable. Not sure how to make the ConnectionEvent serializable, or how to get it to accept it as is.

I appreciate your support very much


The Java File:


/**

import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils;

import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.topology.base.BaseRichSpout; import java.util.HashMap; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue;

import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists; import com.twitter.hbc.ClientBuilder; import com.twitter.hbc.core.; import com.twitter.hbc.core.endpoint.; import com.twitter.hbc.core.event.*; import com.twitter.hbc.core.processor.StringDelimitedProcessor; import com.twitter.hbc.httpclient.auth.Authentication; import com.twitter.hbc.httpclient.auth.OAuth1;

/**

class TwitterSpout extends BaseRichSpout {

/**
 * 
 */
private static final long serialVersionUID = -5695079926219121864L;
public static Logger LOG = LoggerFactory.getLogger(TwitterSpout.class);
boolean _isDistributed;
SpoutOutputCollector _collector;
BlockingQueue<String> msgQueue;
BlockingQueue<Event> eventQueue;

public TwitterSpout() {
    this(true);

    /** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */
    msgQueue = new LinkedBlockingQueue<String>(100000);
    eventQueue = new LinkedBlockingQueue<Event>(1000);

    /** Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */
    Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
    StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
    // Optional: set up some followings and track terms
    List<Long> followings = Lists.newArrayList(1234L, 566788L);
    List<String> terms = Lists.newArrayList("twitter", "api");
    hosebirdEndpoint.followings(followings);
    hosebirdEndpoint.trackTerms(terms);

    // These secrets should be read from a config file
    Authentication hosebirdAuth = new OAuth1("fffff", "eeee", "eee", "cccc");
    //Authentication hosebirdAuth = new BasicAuth("username", "password");

    ClientBuilder builder = new ClientBuilder()
              .name("Hosebird-Client-01")                              // optional: mainly for the logs
              .hosts(hosebirdHosts)
              .authentication(hosebirdAuth)
              .endpoint(hosebirdEndpoint)
              .processor(new StringDelimitedProcessor(msgQueue))
              .eventMessageQueue(eventQueue);                          // optional: use this if you want to process client events

            Client hosebirdClient = builder.build();
            // Attempts to establish a connection.
            hosebirdClient.connect();

}

public TwitterSpout(boolean isDistributed) {
    _isDistributed = isDistributed;
}

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    _collector = collector;
}

public void close() {

}

public void nextTuple() {
    Utils.sleep(100);

// final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; // final Random rand = new Random(); // final String word = words[rand.nextInt(words.length)];

    String msg;
    try {
        msg = msgQueue.take();
        _collector.emit(new Values(msg));
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

}

public void ack(Object msgId) {

}

public void fail(Object msgId) {

}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("tweet"));
}

@Override
public Map<String, Object> getComponentConfiguration() {
    if(!_isDistributed) {
        Map<String, Object> ret = new HashMap<String, Object>();
        ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
        return ret;
    } else {
        return null;
    }
}    

}


The pom file:


<?xml version="1.0" encoding="UTF-8"?>

4.0.0 storm org.apache.storm 0.10.0 ../../pom.xml org.apache.storm storm-starter jar storm-starter junit junit test org.testng testng 6.8.5 test org.mockito mockito-all test org.easytesting fest-assert-core 2.0M8 test org.jmock jmock 2.6.0 test org.twitter4j twitter4j-stream 3.0.3 org.apache.storm storm-core 0.10.0 compile org.apache.storm multilang-javascript ${project.version} org.apache.storm multilang-ruby ${project.version} org.apache.storm multilang-python ${project.version} commons-collections commons-collections 3.2.1 com.google.guava guava ``` com.twitter hbc-core 2.2.0 ``` install src/jvm test/jvm ${basedir}/multilang ``` org.apache.maven.plugins maven-shade-plugin 1.4 true package shade com.theoryinpractise clojure-maven-plugin true src/clj build build build org.codehaus.mojo exec-maven-plugin 1.2.1 exec java true false compile ${storm.topology} ```