salesforce / storm-dynamic-spout

A framework for building spouts for Apache Storm and a Kafka based spout for dynamically skipping messages to be processed later.
BSD 3-Clause "New" or "Revised" License
41 stars 13 forks source link

[KAFKA] Add support to override consumer position and reset to head/tail #29

Open stanlemon opened 6 years ago

Crim commented 6 years ago

So wondering the best way to do this, as our startup logic is already fairly complicated.

Here's the 4 scenarios that exist today:

  1. Consumer started with no explicit start position, and has no previous consumer state
    • starts from HEAD
  2. Consumer started with no explicit start position, and has previous consumer state
    • starts from previous state
  3. Consumer started with explicit start position, and has no previous consumer state
    • starts from explicit start position
  4. Consumer started with explicit start position, and has previous consumer state
    • starts from previous state

I believe conditions 3 and 4 only exist if a consumer is being started as a sideline.

So this is difficult because today the Kafka Consumer has no idea if its a "firehose" or a sideline instance. If we set a KafkaConsumerConfig option that said "reset to head" we need to somehow know to ONLY do it for the fire hose, and not for sideline instances.

So it'd have to be something like: If has no starting state, and flag = true, then reset consumer position to TAIL/HEAD.

But this intrinsically couples Kafka Consumer to sidelining. I wonder if we can somehow wrap this logic into the Sideline SpoutHandler, or something. Of course then that tightly couples Sideline Spout Handler to the kafka consumer

Crim commented 6 years ago

Yeah I guess the trick is to figure out how to only instruct the fire hose. I think you'd need to use the virtual spout id.

What if we did the following:

Configuration key is something like:

consumer.reset.default: HEAD|TAIL
consumer.reset.<virtualSpoutId>: HEAD|TAIL

Today we default to HEAD for reset. If an offset is no good, we back up to head. the default key makes this logic configurable.

the reset logic might look something like this:

if (has no persisted state) {
  if (consumer.reset.default == HEAD) {
    if (hasStartingState) => reset to starting state;
    if (has No startingState) => reset to HEAD of topic;
  } else if (consumer.reset.default == tail) {
    if (hasEndingState) => reset to ending state;
    if (has NO endingState) => reset to tail of topic;
  }
} else if (has persisted state && consumer.reset.<my consumerid> != null) {
  if (consumer.reset.<myconsumerid> == HEAD) {
    if (hasStartingState) => reset to starting state;
    if (has No startingState) => reset to HEAD of topic;
  } else if (consumer.reset.<myconsumerid> == tail) {
    if (hasEndingState) => reset to ending state;
    if (has NO endingState) => reset to tail of topic;
  }
}

(Sweetjesus) thats complicated. It also means thats if you set default = tail, then it'll skip every sideline.

stanlemon commented 6 years ago

@Crim Consider this...

So I think this actually could be relatively straight forward and Kafka-specific. I really think we add this reset position config with two options, check them in the Kafka consumer's open() call, do a seek and replace state accordingly and then let the rest of the spout work as intended, with this one caveat here https://github.com/salesforce/storm-dynamic-spout/blob/master/src/main/java/com/salesforce/storm/spout/dynamic/kafka/Consumer.java#L234-L236 where we think about what the 'head' of the consumer is a little differently.

If I get time today I may try to carve out what I'm talking about so you can look at it more concretely.