salesforce / storm-dynamic-spout

A framework for building spouts for Apache Storm and a Kafka based spout for dynamically skipping messages to be processed later.
BSD 3-Clause "New" or "Revised" License
40 stars 13 forks source link

Add support for resume state in sidelining #79

Closed stanlemon closed 6 years ago

stanlemon commented 6 years ago

Summary

This PR adds a new state to the sideline sequence which we are calling resume.

Current States

Today sidelines are binary, they can be started or stopped. Starting a sideline means that a starting offset is captured for a future virtual spout and a filter is applied to the firehose virtual spout. This is typically done around the concept of a 'tenant', for example you might want to sideline account Foobar for a period of time. Stopping a sideline means that an ending offset is captured for a virtual spout that will be now created using the starting offset saved during the start of the sideline. That new virtual spout is immediately added to the coordinator with a filter that only emits the tenant in question and the filter is removed altogether from the firehose.

Potential Problems

The consequences of the binary approach are that ordering gets wildly thrown off and when a sideline stops we are essentially doubling a tenants emitted messages. This can be controlled with a different MessageBuffer, but the underlying problem remains: if a tenant's operations required sidelining we go from 0-60 (or 65) almost immediately without slowing down the emission of data into the main spout.

New States

Moving forward, and the goal of this PR is to split up the current stopping state into two different states:

  1. Resume: This state will create a virtual spout using the starting offset from the starting state, and it will use a filter that only emits the given tenant. What's different here is that there is no ending state for the new virtual spout and the firehose still has its tenant filter.
  2. Resolve: This state will mark the ending offset of the virtual spout for the sideline and remove the filter from the firehose. When the resolving state is complete (fully resolved) the virtual spout for the sideline should be gone and there should be no filters on the filter chain for the tenant in question.

Why does this matter?

The primary use we want to support here is throttling of traffic for a single tenant without having to know the ending point. So by using this new 'resume' state and a MessageBuffer that emits at a slower rate for sidelines, someone could slow the emission of a tenant into the underlying DynamicSpout, and then at the point at which they are ready to resume full processing they can set that sideline to 'resolve'.

It's worth noting here that by calling resume() and resolve() back to back you should have the functional equivalent of today's stop() and that nothing is changing with the current start() process.

In the Future

Related to this, my suspicion is that this implementation will also give us better mechanics to control the timing of resolving the sideline so that some day in the future we can try to target the sideline once it's near the tail of the firehose (probably using some tolerance level), but that's not the purpose of this PR just an additional thought on why this change makes sense.

Crim commented 6 years ago

I know you have a doc somewhere, but it may be helpful to updatethe PR description with clear definitions of all the states and what they mean.

stanlemon commented 6 years ago

This PR will need a README overhaul, I'd like to work on that in a separate PR.

stanlemon commented 6 years ago

I want to add the validation mentioned by StevieP but I'd like to do it in a separate PR as part of the work on #75 as I think it'll be easier to address when that interface allows saving partitions together.

Crim commented 6 years ago

let er rip.