colinsurprenant / redstorm

JRuby on Storm
Other
298 stars 56 forks source link

Add DSL support for declaring named streams for Bolts #129

Open cattywampus opened 9 years ago

cattywampus commented 9 years ago

This is an initial attempt at adding support for declaring named streams and emitting messages on streams for bolts. The intent of this feature is to make it easier to build storm topologies where bolts can control where they emit messages to by directing flow using streams and enabling other bolts to subscribe to a source with those declared streams. These modifications should not introduce any breaking changes to the RedStorm API and enable existing users to upgrade without having to rewrite their bolts or topologies.

I am submitting this as in initial draft implementation and hoping to receive comments or open a discussion about what the API should look like to enable this type of support to the user.

Declaring a new stream occurs with the call to output_fields within the bolt implementation. Instead of an array of field names, streams are declared using a hash where the key is the name of the stream and the value is a single field or array of fields.

class Bolt2 < RedStorm::DSL::Bolt
  output_fields :custom_stream => :f1

  on_receive :stream => :custom_stream do |tuple|
    tuple
  end
end

In this example, a new stream is declared called custom_stream with a single output field. The on_receive method defines the default stream that it will emit to, since auto emit is enabled. For more granular control, a Bolt that defines multiple streams can emit values using the emit_tuple_stream method of the OutputCollector, which is called from the `(un)anchored_stream_emit' helper method of the bolt.

class Bolt1 < RedStorm::DSL::Bolt
  output_fields :f1, :another_stream => :f2

  on_receive :emit => false  do |tuple|
    if something
      unanchored_stream_emit 'custom_stream', tuple
    else
      unanchored_emit tuple
    end
  end
end

Once a bolt defines what streams it will emit messages on, other bolts can subscribe to those streams when they are configured in their topology. I added a third parameter to the source method which allows the bolt to provide the name of the stream it wants to receive messages from. This introduces a small API change because if the user original provided a Hash for the grouping, then that hash will need to be wrapped in curly braces to avoid a syntax error with the Ruby parser.

class Topology1 < RedStorm::DSL::Topology
  spout SomeSpout

  bolt Bolt1 do
    source SomeSpout, :fields => ["f1"]
  end

  bolt Bolt2 do 
    source Bolt1, { :fields => ["f1"] }, "custom_stream"
  end

  ...
end

What's not implemented (yet)

Currently only bolts can declare streams. I still need to add support for Spouts. Which means that bolts can only subscribe to other bolt streams.

Also, you cannot declare streams and fields for bolts within the bolt definition of a topology. The output_fields that get declared in the definition block of the topology are passed the the constructor of the JRubyShellBolt class which currently accepts an array of Strings for the fields parameter. I wasn't sure what the best path was to add stream support here. I thought an overloaded constructor which expects a HashMap instead of an array might work, but that would prevent users from declaring custom streams and fields for the default stream, like this:

output_fields :f1, :f2, :my_stream => [:f3, :f4]

I think a decision needs to be made about how this should impact the API. Restricting the fields parameter of that Java class to use a map would then result in:

output_fields :default => [:f1, :f2], :my_stream => [:f3, :f4]

...where the default stream needs to be explicitly declared if the user wants to use it. Alternatively, the BoltDefinition code in Ruby could be smart enough to parse out this data structure and invoke the appropriate constructor and pass in the default fields if provided through a setter on the Java class.

Anyways, I am looking for lots of feedback and an open discussion about what direction makes the most sense for adding in this type of feature. Any and all comments are welcome and I'm happy to make any changes necessary to help get this accepted and into the RedStorm baseline.

coveralls commented 9 years ago

Coverage Status

Coverage increased (+0.15%) to 97.91% when pulling dbf7888bb79fa81c782170fcb37cc55d2bd786f0 on cattywampus:streams into cbb62efc529fd86b62f0fc39d8c8981256c2b55e on colinsurprenant:master.

coveralls commented 9 years ago

Coverage Status

Coverage decreased (-76.98%) to 20.78% when pulling 7b7f3f7c993cb208f40e613bb32ce504626d944f on cattywampus:streams into cbb62efc529fd86b62f0fc39d8c8981256c2b55e on colinsurprenant:master.

coveralls commented 9 years ago

Coverage Status

Coverage increased (+0.04%) to 97.8% when pulling db8547ebd681ebaebe9510ce594b104686aa06e3 on cattywampus:streams into cbb62efc529fd86b62f0fc39d8c8981256c2b55e on colinsurprenant:master.

coveralls commented 9 years ago

Coverage Status

Coverage increased (+0.09%) to 97.85% when pulling b82a4a9ff27d1bed2a569f7dd9c7a8c34926198a on cattywampus:streams into cbb62efc529fd86b62f0fc39d8c8981256c2b55e on colinsurprenant:master.

coveralls commented 9 years ago

Coverage Status

Coverage increased (+0.09%) to 97.85% when pulling f593b216ca628ef6d489f95343ae64f3991be095 on cattywampus:streams into cbb62efc529fd86b62f0fc39d8c8981256c2b55e on colinsurprenant:master.

rtyler commented 9 years ago

@cattywampus I've been maintaining a fork of redstorm which includes some patches we've developed at @lookout and build tooling changes for Gradle at jruby-gradle/redstorm. Would you be interested in merging this code into that repository?

cattywampus commented 9 years ago

@rtyler Definitely. These changes fundamentally expand the way topologies can exchange data between bolts so I would love to fold this into a baseline that gets more eyes on it. Especially since I'm very new to Storm and hope I am not enabling any storm cluster faux-pas.

I'll check out your fork and rebase my changes on top of it to make sure everything still works.

Thanks!

rtyler commented 9 years ago

@cattywampus FWIW, this change has been merged in and released as part of the jruby-gradle/redstorm 0.8.0 release:. This version of the library is available in jcenter and is what is consumed by the jruby-gradle-storm-plugin by default