pystorm / streamparse

Run Python in Apache Storm topologies. Pythonic API, CLI tooling, and a topology DSL.
http://streamparse.readthedocs.io/
Apache License 2.0
1.49k stars 217 forks source link

kafka-python consumer used in streamparse spout not work, and throw timeout #447

Open lusonzeng opened 6 years ago

lusonzeng commented 6 years ago

pyspout.py:

from streamparse import Spout

from kafka import KafkaConsumer

class PySpout(Spout):

    outputs = ['word']

    def initialize(self, stormconf, context):

        self.consumer = KafkaConsumer('test',

                                 group_id='my-group',

                                 bootstrap_servers=['localhost:9092'])

    def next_tuple(self):

        for message in self.consumer:

            self.emit([message.value])```

wordcount.py(bolt):

```python
import os

from collections import Counter

from streamparse import Bolt

class WordCountBolt(Bolt):

    outputs = ['word', 'count']

    def initialize(self, conf, ctx):

        self.counter = Counter()

        self.pid = os.getpid()

        self.total = 0

    def _increment(self, word, inc_by):

        self.counter[word] += inc_by

        self.total += inc_by

    def process(self, tup):

        word = tup.values[0]

        self._increment(word, 10 if word == "dog" else 1)

        if self.total % 1000 == 0:

            self.logger.info("counted [{:,}] words [pid={}]".format(self.total, self.pid))

        self.emit([word, self.counter[word]])```

wordcount.py(topology):

```python
class WordCount(Topology):

    new_spout = PySpout.spec()

    new_bolt = WordCountBolt.spec(inputs={new_spout: Grouping.fields('word')}, par=2)```

after excute: sparse run, then get no data and after a while thow timeout
```bash
5669 [refresh-active-timer] INFO  o.a.s.d.worker - All connections are ready for worker 29a4867e-ce62-4a92-bdae-32a8d28d4b60:1024 with id 85db4c94-fa26-41bc-9ffe-7fc2a15ff0f0
5686 [Thread-22-__acker-executor[1 1]] INFO  o.a.s.d.executor - Preparing bolt __acker:(1)
5688 [Thread-24-__system-executor[-1 -1]] INFO  o.a.s.d.executor - Preparing bolt __system:(-1)
5693 [Thread-24-__system-executor[-1 -1]] INFO  o.a.s.d.executor - Prepared bolt __system:(-1)
5696 [Thread-22-__acker-executor[1 1]] INFO  o.a.s.d.executor - Prepared bolt __acker:(1)
5700 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.d.executor - Opening spout new_spout:(4)
5703 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.u.ShellProcess - Storm multilang serializer: org.apache.storm.multilang.JsonSerializer
5760 [Thread-18-new_bolt-executor[2 2]] INFO  o.a.s.d.executor - Preparing bolt new_bolt:(2)
5761 [Thread-18-new_bolt-executor[2 2]] INFO  o.a.s.u.ShellProcess - Storm multilang serializer: org.apache.storm.multilang.JsonSerializer
5772 [Thread-20-new_bolt-executor[3 3]] INFO  o.a.s.d.executor - Preparing bolt new_bolt:(3)
5773 [Thread-20-new_bolt-executor[3 3]] INFO  o.a.s.u.ShellProcess - Storm multilang serializer: org.apache.storm.multilang.JsonSerializer
5947 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.s.ShellSpout - Launched subprocess with pid 87117
5951 [Thread-18-new_bolt-executor[2 2]] INFO  o.a.s.t.ShellBolt - Launched subprocess with pid 87124
5952 [Thread-18-new_bolt-executor[2 2]] INFO  o.a.s.t.ShellBolt - Start checking heartbeat...
5953 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.d.executor - Opened spout new_spout:(4)
5953 [Thread-18-new_bolt-executor[2 2]] INFO  o.a.s.d.executor - Prepared bolt new_bolt:(2)
5955 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.d.executor - Activating spout new_spout:(4)
5955 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.s.ShellSpout - Start checking heartbeat...
5964 [Thread-20-new_bolt-executor[3 3]] INFO  o.a.s.t.ShellBolt - Launched subprocess with pid 87132
5965 [Thread-20-new_bolt-executor[3 3]] INFO  o.a.s.t.ShellBolt - Start checking heartbeat...
5965 [Thread-20-new_bolt-executor[3 3]] INFO  o.a.s.d.executor - Prepared bolt new_bolt:(3)

52956 [pool-51-thread-1] ERROR o.a.s.s.ShellSpout - Halting process: ShellSpout died. Command: [streamparse_run, -s json spouts.pyspout.PySpout], ProcessInfo pid:87117, name:new_spout exitCode:-1, errorString: 
java.lang.RuntimeException: subprocess heartbeat timeout
    at org.apache.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:299) [storm-core-1.1.3.jar:1.1.3]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_74]
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_74]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_74]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_74]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_74]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_74]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_74]
52960 [pool-51-thread-1] ERROR o.a.s.d.executor - 
java.lang.RuntimeException: subprocess heartbeat timeout
    at org.apache.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:299) [storm-core-1.1.3.jar:1.1.3]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_74]
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_74]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_74]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_74]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_74]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_74]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_74]

Fatal error: local() encountered an error (return code 11) while executing 'storm jar /data/newqspace/lusonzeng/wordcount/_build/wordcount-0.0.1-SNAPSHOT-standalone.jar org.apache.storm.flux.Flux --local --no-splash --sleep 9223372036854775807 /tmp/tmpx6Q9ki.yaml'
amoyiki commented 6 years ago

you can use pykafka instead of kafka-python spouts/words.py

from streamparse import Spout
from pykafka import KafkaClient

class WordSpout(Spout):
    outputs = ['word']

    def initialize(self, stormconf, context):
        client = KafkaClient(hosts="c1:9092,c1_1:9092,c1_2:9092")
        topic = client.topics['test'.encode('utf-8')]
        self.balanced_consumer = topic.get_balanced_consumer(
            consumer_group=b"test_group",
            auto_commit_enable=True,
            zookeeper_connect="c1:2181,c1_1:2181,c1_2:2181"
        )
        self.reg = self.regex()

    def next_tuple(self):
        message = self.balanced_consumer.consume()
        info = message.value.decode('utf-8')
        self.logger.info('==================={}'.format(info))
        word = next(self.words)
        self.emit([message])
lusonzeng commented 6 years ago

i have found that it is just right to throw the exceptino when the kafka producer dost not push data intervally.

it works without exception when kafka producer push data every second.

jhhnjhhn commented 5 years ago

i have found that it is just right to throw the exceptino when the kafka producer dost not push data intervally.

it works without exception when kafka producer push data every second.

how do you solve it?

tianser commented 5 years ago

i have found that it is just right to throw the exceptino when the kafka producer dost not push data intervally.

it works without exception when kafka producer push data every second.

how do you solve it?

tianser commented 5 years ago

@jhhnjhhn how do you solve it ?

Abdelhadi92 commented 4 years ago

This code will be fix the issue:

from streamparse import Spout
from kafka import KafkaConsumer

class PySpout(Spout):

    outputs = ['word']

    def initialize(self, stormconf, context):
        self.consumer = KafkaConsumer('test', group_id='my-group', bootstrap_servers=['localhost:9092'], consumer_timeout_ms=10)

    def next_tuple(self):
        try:
            message = self.consumer.next_v1()
            self.emit([message.value])
        except:
            pass