salesforce / storm-dynamic-spout

A framework for building spouts for Apache Storm and a Kafka based spout for dynamically skipping messages to be processed later.
BSD 3-Clause "New" or "Revised" License
41 stars 13 forks source link

[Question] Does it allow to solve for priority polling across multiple kafka spouts as such #105

Closed s-aravind-flipkart closed 6 years ago

Crim commented 6 years ago

Hey!

If I understand your question correctly, you want to define a priority in which messages are consumed from multiple kafka spouts? I believe that this can be solved by a custom implementation of the MessageBuffer interface.

We have some documentation here of our existing implementations: https://github.com/salesforce/storm-dynamic-spout#messagebuffer-implementations

Essentially this interface defines how messages that have been consumed from the individual virtual kafka spout instances will be sent out into the storm topology. It should allow you to define a priority system.

If you have any other questions please post up! Happy to help!

s-aravind-flipkart commented 6 years ago

@Crim Thanks for the quick response and really appreciate it.

I had few other questions first regarding the Kafka consumer will the starved spouts rebalance if they are not read as such if the higher priority starves them for a long time as such.

From the readme, It would be really helpful on how the spout could be used from topology builder of the storm as such with few simple examples Currently I'm reading the source code to figure out the same.

Few examples in storm topology code will be super helpful from my view to quickly try out.

stanlemon commented 6 years ago

@s-aravind-flipkart I am overdue updating the README with more clear example using this library, apologies for that. You might find this helpful though https://github.com/stanlemon/sideline-example it's an end to end example using the DynamicSpout and the Sideline implementation. Please feel free to keep posting questions here too!

s-aravind-flipkart commented 6 years ago

@stanlemon Thanks will definitely take a look

s-aravind-flipkart commented 6 years ago

@stanlemon / @Crim Hi, I was using the dynamic spout I found that for adding the virtual spout the spout should be open as such. Please could you suggest on the same I need to read from 3 topics and priority read from this topics as such for that

From test utils, i could formulate the same

        DynamicSpout spout = new DynamicSpout(config);
        final VirtualSpoutIdentifier virtualSpoutIdentifier = new DefaultVirtualSpoutIdentifier("Main");
        final FactoryManager factoryManager = new FactoryManager(config);
        final VirtualSpout virtualSpout =  new VirtualSpout(
                virtualSpoutIdentifier,
                config,
                new ConsumerPeerContext(1, 0),
                factoryManager,
                new LogRecorder(),
                null,
                null
        );
        spout.addVirtualSpout(virtualSpout);

Please, could you help me by using the multiple spouts in the builder? Thanks

stanlemon commented 6 years ago

@s-aravind-flipkart It should look something like this (I didn't compile this, so please keep that in mind).

final DynamicSpout spout = new DynamicSpout(config);
final FactoryManager factoryManager = new FactoryManager(config);

// Configure first kafka topic
final String kafkaTopic1 = "one";
final Map<String, Integer> config1 = Maps.newHashMap();
config1.putAll(config);
config1.put(KafkaConsumerConfig.KAFKA_TOPIC, kafkaTopic1);

final VirtualSpout virtualSpout1 =  new VirtualSpout(
    new DefaultVirtualSpoutIdentifier(kafkaTopic1),
    config1,
    new ConsumerPeerContext(1, 0),
    factoryManager,
    new LogRecorder(),
    null,
    null
);
spout.addVirtualSpout(virtualSpout1);

// Configure second kafka topic
final String kafkaTopic2 = "two";
final Map<String, Integer> config2 = Maps.newHashMap();
config1.putAll(config);
config1.put(KafkaConsumerConfig.KAFKA_TOPIC, kafkaTopic2);

final VirtualSpout virtualSpout2 =  new VirtualSpout(
    new DefaultVirtualSpoutIdentifier(kafkaTopic2),
    config2,
    new ConsumerPeerContext(1, 0),
    factoryManager,
    new LogRecorder(),
    null,
    null
);
spout.addVirtualSpout(virtualSpout2);

// Configure third kafka topic
final String kafkaTopic3 = "three";
final Map<String, Integer> config3 = Maps.newHashMap();
config1.putAll(config);
config1.put(KafkaConsumerConfig.KAFKA_TOPIC, kafkaTopic3);

final VirtualSpout virtualSpout3 =  new VirtualSpout(
    new DefaultVirtualSpoutIdentifier(kafkaTopic3),
    config1,
    new ConsumerPeerContext(1, 0),
    factoryManager,
    new LogRecorder(),
    null,
    null
);
spout.addVirtualSpout(virtualSpout3);
s-aravind-flipkart commented 6 years ago

@stanlemon Thanks !! but as stated earlier addVirtualSpout works if the spout is already opened as such. Trying with this throws SpoutNotOpenedException.

stanlemon commented 6 years ago

Ah! Sorry @s-aravind-flipkart so the part that's missing here is a VirtualSpout would normally get added through a SpoutHandler. The SpoutHandler has a method called onSpoutOpen() which allows you, on the specific storm node, to add VirtualSpout instances. Check out this example in sidelining: https://github.com/salesforce/storm-dynamic-spout/blob/master/src/main/java/com/salesforce/storm/spout/sideline/handler/SidelineSpoutHandler.java#L177

stanlemon commented 6 years ago

I'm assuming this is resolved and closing accordingly. Please feel free to reopen if needed.