pystorm / streamparse

Run Python in Apache Storm topologies. Pythonic API, CLI tooling, and a topology DSL.
http://streamparse.readthedocs.io/
Apache License 2.0
1.5k stars 218 forks source link

how to send optional parameters to bolts or spouts? #385

Closed aminalaee closed 7 years ago

aminalaee commented 7 years ago

Hi, I have a spout like

class KafkaSpout(Spout):

    """
    Read data from kafka brokers
    """

    outputs = ['data']

    def initialize(self, stormconf, context):
        self.consumer = Consumer({
            'bootstrap.servers': 'localhost',
            'group.id': 'example',
            'default.topic.config': {'auto.offset.reset': 'smallest'}
        })
        self.consumer.subscribe([''])

    def next_tuple(self):
        data = self.consumer.poll()
        if data:
            self.emit([data.value()])

and I want to use this spout in multiple topologies with just a minor change in Kafka config, so I don't want to copy/paste the whole class again, is there a way to send a parameter to bolts and spouts when initializing? something added to stormconf or context in initialize method? Thanks,

Darkless012 commented 7 years ago

We are using this while configuring different spouts. Here is the example:

rabbitmq_multi_spout = RabbitMQMultiSpout.spec(
        name='rabbitmq_multi_spout',
        par=1,
        config={
            "stats.configfile": "etc/default-topology.yml",
            "stats.group":          "stats",
            "rabbitmq.configfile":  "etc/default-topology.yml",
            "rabbitmq.group":       "spout",
        }
    )

This can be then found in initialize(self, stormconf, context): method Like:

x = storm_conf.get('stats', None)
# { "configfile": "etc/default-topology.yml",
#   "group":      "stats" }

I would recommend printing stormconf to see what you got there.

aminalaee commented 7 years ago

Thanks man :+1: I'd guessed it would be something like this but hadn't really tried it. I guess it'd be a good idea to add this in the doc for future visitors.

Darkless012 commented 7 years ago

On behalf of issues like this. Do you think it would be possible to create Gitter channel for streamparse? Where we can exchange things like this? I have very nice experience with Gitter while using "Aurelia" framework. :) cc @amontalenti @dan-blanchard