Yelp / pyleus

Pyleus is a Python framework for developing and launching Storm topologies.
Apache License 2.0
404 stars 113 forks source link

the problems of kafka spout in yaml and my own implementation #120

Closed mendynew closed 9 years ago

mendynew commented 9 years ago

I'm new to storm and pyleus, and I encountered some problems when reading data from kafka.

  1. use the kafka_spout example The example runs successfully. I setup KafkaOffsetMonitor to view kafka status, whereas from the UI I can not see the consumer name that set in yaml.
  2. I write my own kafka spout, the codes just looks like below:
import logging
from pyleus.storm import Spout
from util.settings import KAFKA_RAW_SIMBA
from util.kafka_handler import ConsumerHandler
log = logging.getLogger('simba_logging.data_spout')
class DataSpout(Spout):
    OUTPUT_FIELDS = ['data-spout']
    def __init__(self): 
        super(Spout,self).__init__()
        consumerName = 'pyleus-kafka-spout-content-bdp-2'
        self.consumer = ConsumerHandler(consumerName,KAFKA_RAW_SIMBA)
    def next_tuple(self):
        msg = self.consumer.read()
        log.info(msg)

The class ConsumerHandler is a wrapper for reading from kafka which is tested successfully. The complie is successful, but error happens when run in storm:

7623 [Thread-11-kafka-content-bdp] INFO  backtype.storm.daemon.executor - Activating spout kafka-content-bdp:(3)
7736 [Thread-11-kafka-content-bdp] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: Unknown command received: error
        at backtype.storm.spout.ShellSpout.querySubprocess(ShellSpout.java:115) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.spout.ShellSpout.nextTuple(ShellSpout.java:68) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.executor$fn__5573$fn__5588$fn__5617.invoke(executor.clj:563) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_72]
7736 [Thread-11-kafka-content-bdp] ERROR backtype.storm.daemon.executor - 
java.lang.RuntimeException: Unknown command received: error
        at backtype.storm.spout.ShellSpout.querySubprocess(ShellSpout.java:115) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.spout.ShellSpout.nextTuple(ShellSpout.java:68) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.executor$fn__5573$fn__5588$fn__5617.invoke(executor.clj:563) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_72]
7759 [Thread-9-classification-bolt] INFO  backtype.storm.task.ShellBolt - Launched subprocess with pid 13895
7759 [Thread-11-kafka-content-bdp] INFO  backtype.storm.util - Halting process: ("Worker died")
7760 [Thread-9-classification-bolt] INFO  backtype.storm.daemon.executor - Prepared bolt classification-bolt:(2)
pyleus local: error: [StormError] Storm command failed. Run with --verbose for more info.

I don't know how to solve this problem, the only purpose I want to reach is just checking the topic consumer offset and unread messages in kafka queue. Thank you very much if anybody can help me.

poros commented 9 years ago

Is there any traceback dumped in your python log file?

mendynew commented 9 years ago

I didn't see any message in log file.

mendynew commented 9 years ago

The implementation of ConsumerHandler is below:

class ConsumerHandler:
    def __init__(self,consumerName,topic):
        self.kafka = KafkaClient("127.0.0.1:9092")
        self.topic = topic
        self.consumer = SimpleConsumer(self.kafka, consumerName, self.topic,max_buffer_size=10000000)
    def read(self):
        msg = self.consumer.get_message()
        if not msg:
            return ''
        return msg.message.value
    def close(self):
        self.consumer.commit()
        self.kafka.close()
    def count(self):
        return self.consumer.pending()
mendynew commented 9 years ago

It's my fault that I used time.sleep() function in next_tuple. Thanks a lot @poros