Yelp / pyleus

Pyleus is a Python framework for developing and launching Storm topologies.
Apache License 2.0
404 stars 113 forks source link

Add broker_path option for kafka spout #124

Closed dearli closed 9 years ago

dearli commented 9 years ago

Hi: I'm using kafka spout.I met a problem.I find what ever I set the zk_root,it seems don't work.My yaml file is as bellow name: kafka_seclog

topology:

- spout:
    name: kafka_access-spout
    type: kafka
    options:
        # The Kafka topic to stream from.
        # Required.
        topic: biztech_apache_accesslog

        # ZooKeeper connection string. Comma-separated list of ZooKeeper
        # servers.
        # Required.
        zk_hosts: kafkamirror.zk1:2181,kafkamirror.zk2:2181,kafkamirror.zk3:2181
        #zk_hosts: 10.11.207.97:2181
        # Root path in ZooKeeper to store consumer offsets.
        # Defaults to: /pyleus-kafka-offsets/<topology name>
        zk_root: /kafka/0.8.1/brokers
        # Kafka consumer ID.
        # Defaults to: pyleus-<topology name>
        consumer_id: pyleus-kafka_spout_lizheyi

        # Whether the initial offset should be that specified by
        # start_offset_time (true) or the head of the stream (false).
        # Defaults to false.
        from_start: false

        # The offset time to start with if from_start is true.
        # Defaults to the earliest offset time in the stream.
        # start_offset_time: 139897106000
- bolt:
    name: kafka_msg_print
    module: kafka_seclog.msg_print
    parallelism_hint: 1
    groupings:
        - shuffle_grouping: kafka_access-spout

However I change zk_root,the error is always the same.

The Error is: image

My workmates wrote topology with JAVA and run on the same kafka Cluster,it is ok Thanks

dearli commented 9 years ago

Hi,I know the reason now.After reading the source code of com.yelp.pyleus.PyleusTopologyBuilder.I find pyleus always uses the default brokerPath (/brokers).But in my kafka Cluster, it's not by default. So ,I change line 144 in com.yelp.pyleus.PyleusTopologyBuilder,

    SpoutConfig config = new SpoutConfig(
        new ZkHosts(zkHosts,"/kafka/0.8.1/brokers"),
        topic,
        zkRoot,
        consumerId
    );

"/kafka/0.8.1/brokers" is my broker path,my program runs well. So I think pyleus can add a new "broker_path" option in yaml file.

poros commented 9 years ago

Thank you for letting us know. I'll change the name of the to turn it into a feature request

jwestboston commented 9 years ago

Any chance this can get fixed for the next release? It is a one line change/patch. This way I don't have to maintain a forked/patched Pyleus :-)

poros commented 9 years ago

Have you already submitted a pull request? I believe you have some code working if you have/are considering a patched Pyleus ;)

jwestboston commented 9 years ago

@poros Rrraaaahhhhhh waaahh wahhh making me do work.....! ;-)

Ok pull request created and submitted :-D

jwestboston commented 9 years ago

This is now working for me with PR #137

poros commented 9 years ago

Closed by #137

howardx commented 8 years ago

Hi, thanks for the YAML example for kafka storm spout, one question thou - where are you specifying the kafka host and port?

I see you are specifying zookeeper host and port in your YAML file, under the "spout" section. But in my case my kafka is installed on another host, port 9092.

In Pyleus, if we need not to write a python script to implement kafka spout, then how should we let storm know where Kafka is?

Thanks in advance. Howard