airavata-courses / autobots

Team Autobots Project Repo
0 stars 2 forks source link

Error after kafka is started #37

Open DGRamya opened 5 years ago

DGRamya commented 5 years ago

After following the updated document, kafka was setup and I opened the browser ( Google Chrome), there was an empty chart displayed on the screen:

But, When I clicked twice on the screen, the kafka server broke. Node server is functional

Traceback (most recent call last): File "twitter_data_producer.py", line 37, in stream.filter(track="trump") File "/Users/DG/Documents/SGA_AUTOBOTS_TEST/autobots/backend/twitter_feed_microservice/env/lib/python2.7/site-packages/tweepy/streaming.py", line 450, in filter self._start(async) File "/Users/DG/Documents/SGA_AUTOBOTS_TEST/autobots/backend/twitter_feed_microservice/env/lib/python2.7/site-packages/tweepy/streaming.py", line 364, in _start self._run() File "/Users/DG/Documents/SGA_AUTOBOTS_TEST/autobots/backend/twitter_feed_microservice/env/lib/python2.7/site-packages/tweepy/streaming.py", line 297, in _run six.reraise(*exc_info) File "/Users/DG/Documents/SGA_AUTOBOTS_TEST/autobots/backend/twitter_feed_microservice/env/lib/python2.7/site-packages/tweepy/streaming.py", line 266, in _run self._read_loop(resp) File "/Users/DG/Documents/SGA_AUTOBOTS_TEST/autobots/backend/twitter_feed_microservice/env/lib/python2.7/site-packages/tweepy/streaming.py", line 325, in _read_loop next_status_obj = buf.read_len(length) File "/Users/DG/Documents/SGA_AUTOBOTS_TEST/autobots/backend/twitter_feed_microservice/env/lib/python2.7/site-packages/tweepy/streaming.py", line 164, in read_len self._buffer += self._stream.read(read_len) File "/Users/DG/Documents/SGA_AUTOBOTS_TEST/autobots/backend/twitter_feed_microservice/env/lib/python2.7/site-packages/urllib3/response.py", line 430, in read raise IncompleteRead(self._fp_bytes_read, self.length_remaining) File "/usr/local/Cellar/python@2/2.7.15_1/Frameworks/Python.framework/Versions/2.7/lib/python2.7/contextlib.py", line 35, in exit self.gen.throw(type, value, traceback) File "/Users/DG/Documents/SGA_AUTOBOTS_TEST/autobots/backend/twitter_feed_microservice/env/lib/python2.7/site-packages/urllib3/response.py", line 349, in _error_catcher raise ProtocolError('Connection broken: %r' % e, e) urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(0 bytes read, 1673 more expected)', IncompleteRead(0 bytes read, 1673 more expected))

naveenkumarmarri commented 5 years ago

@gowtham06 can you remove the sleep call code https://github.com/airavata-courses/autobots/blob/develop/backend/twitter_feed_microservice/twitter_data_producer.py#L25

DGRamya commented 5 years ago

@naveenkumarmarri I commented the line and tried again, but no updates, I restarted the services as well

gowtham06 commented 5 years ago

@naveenkumarmarri I have removed it. Now it should work fine.

naveenkumarmarri commented 5 years ago

@DGRamya Were you not able to see the stream updates in your http://localhost:5000

You should see something similar to http://129.114.16.27:5000/

DGRamya commented 5 years ago

@DGRamya Were you not able to see the stream updates in your http://localhost:5000

You should see something similar to http://129.114.16.27:5000/

I have seen the jetstream output, but I could see only empty page in browser

DGRamya commented 5 years ago

@naveenkumarmarri I have removed it. Now it should work fine.

I commented the code and tried, Its not wokring

DGRamya commented 5 years ago

for your reference: from tweepy.streaming import StreamListener from tweepy import OAuthHandler from tweepy import Stream from kafka import SimpleProducer, KafkaClient import configparser import time configParser = configparser.RawConfigParser() configFilePath = r'config.txt' configParser.read(configFilePath) access_token = configParser.get('twitter-token-config', 'access_token') access_token_secret = configParser.get('twitter-token-config', 'access_token_secret') consumer_key = configParser.get('twitter-token-config', 'consumer_key') consumer_secret = configParser.get('twitter-token-config', 'consumer_secret')

print(access_token, access_token_secret, consumer_key, consumer_secret)

print("Starting the kafka producer") class StdOutListener(StreamListener): def init(self): self.counter=1; def on_data(self, data):

print(len(data))

    jsonData={"x":self.counter,"y":len(data)}
    producer.send_messages("trump", str(jsonData).encode('utf-8'))
    #print (data)
    #time.sleep(1)
    self.counter+=1
    return True
def on_error(self, status):
    print (status)

kafka = KafkaClient("localhost:9092") producer = SimpleProducer(kafka) l = StdOutListener() auth = OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) stream = Stream(auth, l) stream.filter(track="trump")

naveenkumarmarri commented 5 years ago

@DGRamya Are you getting the same error mentioned in beginning of the this issue?

DGRamya commented 5 years ago

yes