bkeepers / qu

a Ruby library for queuing and processing background jobs.
MIT License
505 stars 50 forks source link

Bulk processing for workers #82

Closed mauricio closed 10 years ago

mauricio commented 10 years ago

The rationale behind this is that sometimes you have workers that publish work in units but would be better served by taking the work in batches.

The best example of this is solr/elasticsearch, when integrated, most of the time you will want to work with them publishing every change, but then it's much more efficient to process them in batches given they both offer batch/bulk update interfaces. So, instead of incurring the cost of publishing every document at solr/elasticsearch, you only pay the price once for every 10 documents.

Other than that, there's the fact that SQS itself has a much higher latency and taking in more messages instead of just one is much better.

This is mostly a proof of concept, but I have hacked a solution to do this before and it would be nice to have the queue supporting this out of the box.

jnunemaker commented 10 years ago

This one is going to take me a bit to review.

mauricio commented 10 years ago

The build will stay broken for a bit until this goes in https://github.com/iain/fake_sqs/pull/9

mauricio commented 10 years ago

@jnunemaker ping

jnunemaker commented 10 years ago

Overall, I love the idea of making batching possible. I also love all the hard work you put it. It is far easier for me to critique than to fully work on the solution.

The thing this pull is really missing is good abstraction of batch. A few spots are directly related to functionality in SQS rather than batch in general.

As I think about it more, I almost wonder if the right abstraction is just a BatchPayload and a new backend for any queue that can handle batch operations. For example, here is some psuedo ruby for sqs and batching:

class SQSBatch
  class BatchPayload
    def initialize(payloads)
      @payloads = payloads
    end

    def perform
      payloads.each(&:perform)
    end
  end

  def pop(queue_name = 'default')
    payloads = sqs.receive_messages(limit: 10).map { |message| 
      create_payload(message) 
    }

    BatchPayload.new(payloads)
  end
end

I don't doubt that this will require some thought on how to ensure that batch payloads have the same semantics as payload, but I think that is probably the route I would want to take. It avoids qu itself having any batch knowledge (though it may need a slim down of the stock payload interface) which I like because it means that the abstraction of qu is correct in that it is flexible enough to work with anything that can pop and push.

Any of this make sense or am I just blabbering?

jnunemaker commented 10 years ago

There are several places in qu where qu assumes payload to have certain methods, such as queue, klass, etc. Those are what we would need to abstract a bit to make qu work with single or batch payloads.

mauricio commented 10 years ago

WOW.

@jnunemaker ok, gonna work on this feedback and thanks for taking a look at this!

It's important to have another set of eyes on top of it, since it creates a, somewhat, different way of processing stuff.

jnunemaker commented 10 years ago

WOW.

Hope that is a good wow!

mauricio commented 10 years ago

@jnunemaker that's a much comments, very feedback wow, hahaha.

jnunemaker commented 10 years ago

Lol.

mauricio commented 10 years ago

@jnunemaker the main problem I see with this:

    def perform
      payloads.each(&:perform)
    end

Is that it wouldn't allow me to send the many payloads as a single entity (in the case of batch indexing, for instance).

The job would have to receive the many items so it would be able to process them all at once and not one by one.

jnunemaker commented 10 years ago

Is that it wouldn't allow me to send the many payloads as a single entity (in the case of batch indexing, for instance).

Well more what I mean is that payload could be the abstraction. We could even allow people swaping out their own payload mechanism. Then you could have a batch payload that "does the right thing".

mauricio commented 10 years ago

Makes sense, gonna dig more into this and build something to look at how it would be used.

mauricio commented 10 years ago

This is for another PR, but the current worker implementation could possibly take more than one job at a time if the runner handles it. I need to write more tests for the forking runner to have this in action and flesh out a thread-pool based runner as well, both would allow the main worker to run more than one job at a time.

jnunemaker commented 10 years ago

:+1: to small iterations to the worker and payload rather than a big PR. Big PR's are a lot harder to digest.

jnunemaker commented 10 years ago

Closing this due to staleness. Feel free to pick it back up someday.

mauricio commented 10 years ago

Sure, still haven't found a nice way to implement this :(