colinsurprenant / redstorm

JRuby on Storm
Other
298 stars 56 forks source link

No way to call methods on bolts/spouts in the topology DSL #99

Open tlrobinson opened 10 years ago

tlrobinson commented 10 years ago

There doesn't appear to be a way to call instance methods on bolts or spouts in the DSL, e.x. as is required here: https://github.com/ptgoetz/storm-jms/blob/master/examples/src/main/java/backtype/storm/contrib/jms/example/ExampleJmsTopology.java#L63-L67

Perhaps the instance should be passed to the block, so you can do something like this:

spout JmsSpout, :id => :events_jms do |spout|
    spout.setJmsProvider(jmsQueueProvider)
    # etc
    output_fields :bytes
end
colinsurprenant commented 10 years ago

thanks for your suggestion @tlrobinson. yes, this is an interesting caveat with the way I imagined the dsl and the use-case you refer to. I will need to think about this a bit.

A temporary workaround could be to write a simple Java spout wrapper class that initializes itself correctly and that you could use as-is in the JRuby DSL? Or you could try the same idea but with a JRuby wrapper spout class?

tlrobinson commented 10 years ago

That's basically what I'm trying to do, however I'm getting a somewhat opaque error message about undefined method 'java_proxy'. Maybe you could offer some guidance:

RedStorm v0.7.0.beta1 starting topology MyTopology/my_topology in local environment
2041 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn - Thread Thread[main,5,main] died
org.jruby.exceptions.RaiseException: (NoMethodError) undefined method `java_proxy' for MyJmsSpout:Class
    at RUBY.new_instance(/Users/trobinson/git/my_topologies/target/lib/red_storm/dsl/topology.rb:49) ~[na:na]
    at RUBY.build_topology(/Users/trobinson/git/my_topologies/target/lib/red_storm/dsl/topology.rb:147) ~[na:na]
    at org.jruby.RubyArray.each(org/jruby/RubyArray.java:1613) ~[jruby-complete-1.7.12.jar:na]
    at RUBY.build_topology(/Users/trobinson/git/my_topologies/target/lib/red_storm/dsl/topology.rb:146) ~[na:na]
    at RUBY.start(/Users/trobinson/git/my_topologies/target/lib/red_storm/dsl/topology.rb:159) ~[na:na]
    at RUBY.main(/Users/trobinson/.rvm/gems/jruby-1.7.10/gems/redstorm-0.7.0.beta1/lib/red_storm/topology_launcher.rb:66) ~[na:na]

Here's basically the code I'm running:

java_import 'backtype.storm.contrib.jms.JmsProvider'
java_import 'backtype.storm.contrib.jms.JmsTupleProducer'
java_import 'backtype.storm.contrib.jms.spout.JmsSpout'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Values'
java_import 'javax.jms.Session'
java_import 'javax.jms.TextMessage'
java_import 'javax.jms.Topic'
java_import 'javax.jms.TopicSession'
java_import 'org.apache.activemq.ActiveMQConnectionFactory'
java_import 'storm.kafka.KafkaConfig'
java_import 'storm.kafka.KafkaSpout'
java_import 'storm.kafka.SpoutConfig'
java_import 'storm.kafka.ZkHosts'

require 'red_storm'

class CreateEventBolt < RedStorm::DSL::Bolt
  on_receive do |tuple|
    object = JSON.parse(tuple[:bytes].to_s)
    puts object.inspect
    [json]
  end
end

class MyTopic
  include Topic

  attr_accessor :topic_name
  def initialize(topic)
    @topic_name = topic
  end

  def getTopicName
    @topic_name
  end
end

class MyJmsProvider
  include JmsProvider

  attr_accessor :connectionFactory
  attr_accessor :destination

  def initialize(url, topic)
    @connectionFactory = ActiveMQConnectionFactory.new(url)
    @destination = MyTopic.new(topic)

  end

  def connectionFactory
    @connectionFactory
  end

  def destination
    @destination
  end
end

class JsonTupleProducer
  include JmsTupleProducer

  def toTuple(msg)
    if msg.java_kind_of? TextMessage
      Values.new(msg.getText())
    else
      nil
    end
  end

  def declareOutputFields(declarer)
    declarer.declare(Fields.new("json"))
  end
end

class MyJmsSpout < JmsSpout
  def initialize(provider, tuple_producer, ack_mode=Session.CLIENT_ACKNOWLEDGE, distributed=true)
    super
    self.setJmsProvider provider
    self.setJmsTupleProducer tuple_producer
    self.setJmsAcknowledgeMode ack_mode
    self.setDistributed distributed
  end
end

class MyTopology < RedStorm::DSL::Topology

  jms_provider = MyJmsProvider.new("tcp://localhost:61616", "virtual.events")
  tuple_producer = JsonTupleProducer.new()

  spout MyJmsSpout, [jms_provider, tuple_producer] do
    output_fields :bytes
  end

  bolt CreateEventBolt, :parallelism => 2 do
    output_fields :json
    source MyJmsSpout, :shuffle
  end

  configure self.topology_name do |env|

  end

end

It needs (some of?) these dependencies:

    <dependency org="com.github.ptgoetz" name="storm-jms" rev="0.9.0" conf="default" transitive="true"/>
    <dependency org="org.apache.activemq" name="activemq-spring" rev="5.9.1" conf="default" transitive="true"/>
    <dependency org="org.springframework" name="spring-core" rev="3.2.5.RELEASE" conf="default" transitive="true"/>
    <dependency org="org.springframework" name="spring-beans" rev="3.2.5.RELEASE" conf="default" transitive="true"/>
    <dependency org="org.springframework" name="spring-test" rev="3.2.5.RELEASE" conf="default" transitive="true"/>
colinsurprenant commented 10 years ago

The problem is here: https://github.com/colinsurprenant/redstorm/blob/master/lib/red_storm/dsl/topology.rb#L46

Since MyJmsSpout is a actually Ruby class and not a Java class, it tries to initialize it using the spout java proxy which is required because Ruby classes are not java-serializable.

So what can we do. Well, you could write MyJmsSpout in Java, which would be very simple and do basically what you have done in Ruby. You could also write a "proper" Ruby Spout and in the on_init initialize the JmsSpout and wrap the spout methods with their Ruby equivalent. But that would be less efficient that writing it in Java.

I'm afraid that there's no simple work around to this other than writing a few lines of Java... but we should totally offer something to make this work, not sure how exactly how at this point.

colinsurprenant commented 10 years ago

I think we could make it work the way you suggested with

spout JmsSpout, :id => :events_jms do |spout|
    spout.setJmsProvider(jmsQueueProvider)
    # etc
    output_fields :bytes
end

by creating the spout/bolt instance in the self.spout/bolt methods and passing the instance to the block, instead of delaying the instance creation at topology build. I'll create a branch shortly you can test.

The potential caveat is that the spout/bolt block is already called in the configurator context but I'm guessing that you will typically only be using the |spout| param to call methods on it...

thoughts?

tlrobinson commented 10 years ago

That would be fine. I'm not sure I understand the caveat you mentioned though.

tlrobinson commented 10 years ago

In the meantime, I've gotten a JMSSpout subclass working, but where would you recommend storing Java source files? Currently I have it in target/src/mycompany/storm/JmsSpout.java, which works fine, but as I understand it the target directory isn't supposed to be checked into source control.

colinsurprenant commented 10 years ago

You can put java sources under src/main/ and use rake build to compile them and you should just be able to use them the same way as putting your classes in the target/ dir. I did make some progress with the refactor today while on the train (currently traveling in Europe) and I should be able to push a branch so you can test.

tlrobinson commented 10 years ago

Thanks. What do I need to do to set up Rake to build from src/main? I'm getting Don't know how to build task 'build'