AirSage / Petrel

Tools for writing, submitting, debugging, and monitoring Storm topologies in pure Python
BSD 3-Clause "New" or "Revised" License
247 stars 70 forks source link

directGrouping question #86

Closed dmitry-saritasa closed 7 years ago

dmitry-saritasa commented 7 years ago

hi Barry,

create.py

import engagement
import kafka_stream
import viewability

def create(builder):
    builder.setSpout("kafka-stream-spout", kafka_stream.KafkaSpout(), 1)
    builder.setBolt(
        "viewability-persistence-bolt",
        viewability.ViewabilityPersistenceBolt(), 1).\
        directGrouping("kafka-stream-spout")
    builder.setBolt(
        "engagement-persistence-bolt",
        engagement.EngagementPersistenceBolt(), 1).\
        directGrouping("kafka-stream-spout")

KafkaSpout:


# ---------------------------------------------------
# kafka stream spout using kafka-stream library
# ---------------------------------------------------
import time

import config as cfg
from petrel import storm
from petrel.emitter import Spout

# enable loggers
log = cfg.config.loggers.storm

class KafkaSpout(Spout):

    def __init__(self):
        self.stream = cfg.config.kafka
        super().__init__(script=__file__)

    @classmethod
    def declareOutputFields(cls):
        return ['topic', 'record']

    def nextTuple(self):
        try:
            record = next(self.stream)
            log.debug("{}:{}:{}: key={} value={}".format(
                record.topic,
                record.partition,
                record.offset,
                record.key,
                record.value))
            # storm.emit([record.topic, record.value])
            storm.emitDirect("viewability-persistence-bolt",
                             [record.topic, record.value])
        except StopIteration:
            pass

def run():
    KafkaSpout().run()

I'm getting the following exception in petrelXXXXX_kafka_stream.log

2017-03-26 15:20:43,900][storm][DEBUG]engagement:2:1: key=None value={'user': 100, 'mobile': True, 'game': 1, 'asset': 10, 'action': 'coupon'}
[2017-03-26 15:20:44,326][storm][ERROR]Sent failure message ("E_SPOUTFAILED__kafka_stream__dmitry_npb_saritasa_io__pid__24285__port__-1__taskindex__-1__StormIPCException") to Storm
[2017-03-26 15:20:49,331][storm][ERROR]Caught exception in Spout.run: Read EOF from stdin
Traceback (most recent call last):
  File "/home/dmitry/Projects/trivver/storm/src/.venv/storm/lib/python3.5/site-packages/petrel-1.0.2.0.3-py3.5.egg/petrel/storm.py", line 438, in run
    msg = readCommand()
  File "/home/dmitry/Projects/trivver/storm/src/.venv/storm/lib/python3.5/site-packages/petrel-1.0.2.0.3-py3.5.egg/petrel/storm.py", line 70, in readCommand
    msg = readMsg()
  File "/home/dmitry/Projects/trivver/storm/src/.venv/storm/lib/python3.5/site-packages/petrel-1.0.2.0.3-py3.5.egg/petrel/storm.py", line 39, in readMsg
    raise StormIPCException('Read EOF from stdin')
petrel.storm.StormIPCException: Read EOF from stdin
Worker kafka_stream exiting normally.

so I'm getting kafka message but emitDirect seems doesn't work? Or am I doing something wrong?

thanks, Dmitry

barrywhart commented 7 years ago

I have never used direct grouping, so I'm afraid I can't help with your question. I will say that the core of Petrel (i.e. the communication with Storm) is very much based on the standard storm.py file (https://github.com/apache/storm/blob/master/storm-multilang/python/src/main/resources/resources/storm.py#L94), so there's a good chance you'd see similar behavior using that. Thus, I you may have better luck checking with the Apache Storm community, or you may have to dig deeper into this yourself if direct grouping is a requirement for your topology.

barrywhart commented 7 years ago

The error you're getting ("Read EOF from stdin") makes it sound like the Java Storm worker which the Petrel process is communicating with exited. You should definitely check the Storm (not Petrel) logs to see if there is any other information. The workers shouldn't just exit without a reason and without saying why.

barrywhart commented 7 years ago

Closing since I made some suggestions and haven't heard back. Please reopen or create a new issue if needed.

Note that I am not a Storm expert and generally can't support general Storm questions; you may need to engage with the larger Storm community or learn more about Storm on your own. If you solve a tricky problem, please report back, and I am open to PRs that address issues or add important features.