salesforce / storm-dynamic-spout

A framework for building spouts for Apache Storm and a Kafka based spout for dynamically skipping messages to be processed later.
BSD 3-Clause "New" or "Revised" License
40 stars 13 forks source link

MessageBus / Coordinator Abstraction #53

Closed Crim closed 6 years ago

Crim commented 6 years ago

I'll do my best to explain this PR.

It took me a while to mentally map out what is/was wrong with the SpoutCoordinator, SpoutMonitor, SpoutRunner mess. I believe that SpoutCoordinator was doing double duty in responsibilities. It was (poorly) providing as a message routing service between DynamicSpout and VirtualSpouts, AND handling spinning up VirtualSpouts (via SpoutMonitor). So lets first split those responsibilities up.

MessageBus Abstraction

Justification

Lets examine how SpoutCoordinator was providing a message routing service. From DynamicSpout's point of view it provided a pretty good abstraction exposing just what we needed via its nextTuple(), ack(), fail(), getErrors() methods. This part was good. On the other side of Spout Coordinator it talked to VirtualSpouts (via SpoutRunner). This side of the abstraction was 100% garbage, in fact there was no abstraction. We exposed the Concurrent maps, queues, etc.. and passed them from the SpoutCoordinator to SpoutMonitor to SpoutRunner. Not great.

So first step, lets remove the message routing bits from SpoutCoordinator entirely and create a new class called MessageBus.

MessageBus

MessageBus contains all of the concurrent maps, queues, etc.. that SpoutCoordinator used to:

    private final MessageBuffer messageBuffer;
    private final Map<VirtualSpoutIdentifier, Queue<MessageId>> ackedTuplesQueue = new ConcurrentHashMap<>();
    private final Map<VirtualSpoutIdentifier, Queue<MessageId>> failedTuplesQueue = new ConcurrentHashMap<>();
    private final Queue<Throwable> reportedErrorsQueue = new ConcurrentLinkedQueue<>();

These are the objects we want to provide abstractions for, no one should know how this is implemented besides itself..... and lets take it a step further, the parts that we want DynamicSpout to be able to access are different from those we want to expose towards VirtualSpout. So lets introduce two new interfaces SpoutMessageBus and VirtualSpoutMessageBus that sit ontop of this new MessageBus class.

SpoutMessageBus Interface

Now in DynamicSpout instead of talking to SpoutCoordinator, we want to talk to MessageBus. But we don't get full access to the class. Instead we expose it via the SpoutMessageBus interface, which limits the scope of what we can do to the following:

/**
     * @return Returns any errors that should be reported up to the topology.
     */
    Optional<Throwable> getErrors();

    /**
     * @return Returns the next available Message to be emitted into the topology.
     */
    Optional<Message> nextMessage();

    /**
     * Acks a tuple on the spout that it belongs to.
     * @param id Tuple message id to ack
     */
    void ack(final MessageId id);

    /**
     * Fails a tuple on the spout that it belongs to.
     * @param id Tuple message id to fail
     */
    void fail(final MessageId id);

VirtualSpoutMessageBus Interface

Now in SpoutRunner we'll no longer interact with the various concurrent maps, queues, buffers etc.. We'll interact with MessageBus, but more specifically we'll interact with it via the VirtualSpoutMessageBus interface which looks like:

/**
     * For registering new VirtualSpout.
     * @param virtualSpoutIdentifier
     */
    void registerVirtualSpout(final VirtualSpoutIdentifier virtualSpoutIdentifier);

    /**
     * Publish message.
     * @param message
     */
    void publishMessage(final Message message) throws InterruptedException;

    /**
     * @return How many un-read messages exist.
     */
    int messageSize();

    /**
     * Publish an error.
     * @param throwable
     */
    void publishError(final Throwable throwable);

    /**
     * Get next acked messageId for the given VirtualSpout.
     * @param virtualSpoutIdentifier
     * @return
     */
    Optional<MessageId> getAckedMessage(final VirtualSpoutIdentifier virtualSpoutIdentifier);

    /**
     * Get next failed messageId for the given VirtualSpout.
     * @param virtualSpoutIdentifier
     * @return
     */
    Optional<MessageId> getFailedMessage(final VirtualSpoutIdentifier virtualSpoutIdentifier);

    /**
     * Called to un-register a VirtualSpout.
     * @param virtualSpoutIdentifier
     */
    void unregisterVirtualSpout(final VirtualSpoutIdentifier virtualSpoutIdentifier);

Where does this get us?

MessageBus

Now we create a MessageBus instance once in DynamicSpout and keep a reference in it using the SpoutMessageBus interface. Then we take this MessageBus instance and pass it down (casted via the VirtualSpoutMessageBus interface) to the Coordinator/SpoutMonitor/SpoutRunner. This provides a super clean abstraction for passing messages, providing each side access only to the parts it needs access to. No more having to pass around various Maps/Queues, and no one has to care about how those details are implemented anymore. Yay!

SpoutCoordinator

If SpoutCoordinator is no longer handling message routing, what does it do now? That's a great question! Spout Coordinator now it manages (indirectly) starting and stopping VirtualSpouts. But wait, isn't that what SpoutMonitor does?? Yep, SpoutCoordinator essentially has become a pass through for SpoutMonitor, which means we should be able to easily merge SpoutCoordinator + SpoutMonitor and eliminate one of them. This piece of work is considered out of scope for this PR and should be slated as follow-on work after this PR is completed (Phase 2)