Yelp / pyleus

Pyleus is a Python framework for developing and launching Storm topologies.
Apache License 2.0
404 stars 113 forks source link

a complete kafka spout example needed #40

Open mazhechao opened 9 years ago

mazhechao commented 9 years ago

There mentions in README that

use the Kafka spout built into Storm with only a YAML change

What does this mean? How to use? Can you show me a complete kafka spout example by using pyleus?

Thanks!

poros commented 9 years ago

Hello,

Unfortunately, we can't provide a complete functioning example of a topology featuring a kafka spout, since it is very dependent on your configuration and use case. However, the examples folder contains an example of a YAML file including a kafka spout: https://github.com/Yelp/pyleus/tree/master/examples/kafka_spout

What is written in the README just means that you don't need to write any Python spout to use the kafka spout, but just include and configure it from your YAML file.

I hope it helps.

mazhechao commented 9 years ago

I'm still confused. Do you mean that I have to write a Java Kafka spout, and include and configure it? Could you explain in detail?

poros commented 9 years ago

Ah, got it. The Java Kafka Spout we are talking about is something already shipped with Apache Storm (this should be the link: https://github.com/apache/storm/tree/master/external/storm-kafka).

Pyleus simply provides an integration with that specific spout. All you need to do in order to use that spout is including and configuring it in your YAML file, without writing a single line of code, neither in Java nor in Python.

Was that more helpful?

mazhechao commented 9 years ago

Hmm... a little more clear. So the topology will be a combination of a Java Kafka spout and python bolts? Let me have a try.

moandcompany commented 9 years ago

Hi,

I'm trying to use Pyleus to consume data from Kafka in a Storm topology.

In the example topology yaml file there is an option value called 'start_offset_time':

        "start_offset_time: 1398971060"

Pyleus builds this topology correctly but I am getting a type casting error when submitting the topology for local or nimbus execution.

I believe the issue I'm running into is figuring out how to explicitly specify a 'Long' in YAML versus having it be read as an 'int' or 'String'

A start_offset_time of '0L' for example builds but is read as a string during execution.

Cheers, Andrew

         Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
            at com.yelp.pyleus.PyleusTopologyBuilder.handleKafkaSpout(PyleusTopologyBuilder.java:155)
            at com.yelp.pyleus.PyleusTopologyBuilder.handleSpout(PyleusTopologyBuilder.java:103)
            at com.yelp.pyleus.PyleusTopologyBuilder.buildTopology(PyleusTopologyBuilder.java:196)
            at com.yelp.pyleus.PyleusTopologyBuilder.main(PyleusTopologyBuilder.java:292)
w7374520 commented 9 years ago

I am very confused, very much like to have a full Kafka example +1

What does this mean? pyleus build: error: [InvalidTopologyError] [kafka-my_test] Python class should specify attributes 'output_fields' and 'options'. Found: None. Are you inheriting from Bolt or Spout?

poros commented 9 years ago

We'll start thinking about how to provide a runnable example of a topology using kafka, even though kafka is not included in the project.

For the time being, @w7374520, let's try to dig into your error. What kind of component is kafka-my_test? Is it a Python Bolt consuming from a Kafka Spout?

w7374520 commented 9 years ago

Great, very much looking forward to, thank you!

moandcompany commented 9 years ago

Here's a very basic example Pyleus / Storm topology, using a Kafka spout, based on a POC I recently put together.

With this topology, all tuples coming from the spout / Kafka topic are sent to a processing bolt called 'parse-event' with parallelism 3; tuple assignment from the spout to 'parse-event' bolts are handled randomly due to the 'shuffle_grouping' topology specification.

All output emitted from 'parse-event' bolts are directly funneled to a single 'count_events' bolt, which has a method defined for processing incoming tuples, and a method defined for handling a special 'tick-tuple,' which will be triggered every thirty (30) seconds.

The topology definition also takes advantage of the 'options' feature to pass parameters from the topology to a bolt -- in this case a database name, etc.

# NOTE: THIS MUST BE UNIQUE
name: kafka-spout-test

topology:

    - spout:
        name: kafka-en-wikipedia
        type: kafka
        options:
            # The Kafka topic to stream from.
            # Required.
            topic: en.wikipedia

            # ZooKeeper connection string. Comma-separated list of ZooKeeper
            # servers.
            # Required.
            zk_hosts: 0.0.0.0:2181

            # Root path in ZooKeeper to store consumer offsets.
            # Defaults to: /pyleus-kafka-offsets/<topology name>
            zk_root: /pyleus-kafka-offset/kafka-spout/test

            # Kafka consumer ID.
            # Defaults to: pyleus-<topology name>

            # IMPORTANT - ALWAYS CONFIRM KAFKA CONSUMER GROUP/ID
            #
            consumer_id: pyleus-kafka-spout-test

            # Whether the initial offset should be that specified by
            # start_offset_time (true) or the head of the stream (false).
            # Defaults to false.
            from_start: false

            # The offset time to start with if from_start is true.
            # Defaults to the earliest offset time in the stream.

            # We need to set a very large start_offset_time, which will be unused due to this issue:
            # See: https://github.com/Yelp/pyleus/issues/93
            start_offset_time: 2147483648
            # start_offset_time: 1398971060

    # the initial processing step from the Kafka spout
    - bolt:
        name: parse-event
        module: en-wikipedia-streaming.parse_event
        parallelism_hint: 3
        groupings:
            - shuffle_grouping: kafka-en-wikipedia

    # an interval accumulation counter for receiving parsed tuples
    - bolt:
        name: count-events
        module: en-wikipedia-streaming.count_events

        tick_freq_secs: 30    # 3600 seconds per hour
        parallelism_hint: 1
        options:

    # YES THAT IS A CLEARTEXT PASSWORD BELOW - HAVE FUN REACHING THIS HOST

            db_host: "0.0.0.0"
            db_user: "devel_user"
            db_pass: "devel_pass"
            db_name: "devel_db"
            db_table: "devel_table"

        groupings:
            - global_grouping: parse-event
howardx commented 8 years ago

Hi moandcompany, thanks for your example, one question thou - where are you specifying the kafka host and port?

I see you are specifying zookeeper host and port in your YAML file, under the "spout" section. But in my case my kafka is installed on another host, port 9092.

In Pyleus, if we need not to write a python script to implement kafka spout, then how should we let storm know where Kafka is?

Thanks in advance. Howard

howardx commented 8 years ago

A follow-up question, what if we use Avro serialization in Kafka, how will Pyleus spout/bolt de-serialize the message?

Thanks again.

Chisha14 commented 7 years ago

Hey folks, I need help, but firstly I want to be sure if my thoughts are right! My thought: The Kafka spout written in the yaml file represents the Kafka consumer isn't it? I want to write a kafka producer connected to it and a bolt to process the incoming data from the consumer Kafka! Can this be done in pyleus python? @mazhechao @poros @moandcompany @w7374520 @howardx please help, and thanks in advance

westover commented 7 years ago

Chisha writing a Kafka producer in python is pretty simple. There are lots of good libraries out there for handling Kafka. I happen to like kafka-python.

banjin commented 6 years ago

@moandcompany can you show your blot file? I use one blot receive the data from kafka,but i can not use blot send data to other blot,help,thanks