AirSage / Petrel

Tools for writing, submitting, debugging, and monitoring Storm topologies in pure Python
BSD 3-Clause "New" or "Revised" License
247 stars 70 forks source link

ack() is not happening with BasicBolt / Apache Storm 1.0.3? #88

Open dmitry-saritasa opened 7 years ago

dmitry-saritasa commented 7 years ago

hi Barry,

Example of the bolt on my side. When I run topology - everything works normal and my messages are persisted into database, but Storm UI shows NO acknowledged tuples. Zero. Although my bolt is inherited from BasicBolt that's supposed to do automated ack().

Thoughts?

import pprint
import traceback

import config as cfg
import inflection
from db.bigtable import records
from db.bigtable.connections import connect
from petrel.emitter import BasicBolt

# enable loggers
log = cfg.config.loggers.storm

# establish bigtable connection
connect(*cfg.config.db_connection_params, admin=True)

class PersistenceBolt(BasicBolt):
    def __init__(self):
        self.producer = cfg.config.kafka_producer
        super().__init__(script=__file__)

    def declareOutputFields(self):
        """Final bolt should not define any output fields
        you should always return empty array for the final bolt
        """
        return []

    def process(self, tup):
        # record is supposed to be dict with data we need to persist
        topic, record = tup.values
        log.debug("{}:{}".format(topic, record))

        # if record is not dictionary - bypass the record
        if isinstance(record, dict):
            try:
                persist = getattr(
                    records,
                    inflection.camelize("{}_record".format(topic)))
                persist(**record).save()
            except:               
                problem = {
                    "topic": topic,
                    "record": record,
                    "error": traceback.format_exc()
                }
                # send to *_failed kafka topic so we can reprocess later
                self.producer.send("{}_failed".format(topic), problem)
                pp = pprint.PrettyPrinter(indent=4)
                log.debug(pp.pformat(problem))

def run():
    """Default function needed to run bolt as task
    inside Apache Storm Topology
    """
    PersistenceBolt().run()
barrywhart commented 7 years ago

The ack() happens here. You might try adding logging to that function to make sure it's being executed. Or temporarily switch to Bolt and see if it helps to ack() explicitly.