socketry / async-redis

MIT License
83 stars 18 forks source link

Add support for pipelines #18

Closed davidor closed 5 years ago

davidor commented 5 years ago

This PR adds support for pipelines: https://redis.io/topics/pipelining

Usage:

require 'async/redis'

client = Async::Redis::Client.new

Async.run do
  result = client.pipelined { |context| context.get 'a'; context.get 'b' }
  # result now contains an array of two positions with the values for 'a' and 'b'.
end

A straightforward way to check that the commands are send together instead of 1 by 1: 1 - Run nc -l 6379 2 - Execute the lines in the example usage 3 - Check that nc sees the 2 GETs together. The client did not wait for the value of 'a' before asking for the value of 'b'.

coveralls commented 5 years ago

Pull Request Test Coverage Report for Build 108


Totals Coverage Status
Change from base Build 106: 0.7%
Covered Lines: 394
Relevant Lines: 451

💛 - Coveralls
ioquatix commented 5 years ago

I think that as part of this we need to consider how the user checks the results.

Personally, I'm against futures, but it might not be a bad design choice in this situation.

The question is, if the user says:

client.pipeline do |context|
    context.get('x')
    context.set('y', 10)
    context.pop('list')
end

How should the user access the results?

Should we be able to replay the pipeline?

Should we use a block/callback approach? Or should we look at using async tasks?

When I think about it, why shouldn't everything be pipelined where possible?

class Pipeline
    def initialize
        @queue = Async::IO::Queue.new
    end

    def call(command)
        condition = Async::IO::Condition.new

        @queue << [command, condition]

        return condition.wait
    end

    def run
        while !@queue.empty?
            # Dequeue as many items as possible.
            @waiting = []
            @queue.each do |task, (command, condition)|
                @io.write(command)
                @waiting << condition
            end

            @waiting.each do |condition|
                result = @io.read
                condition.signal(result)
            end
        end
    end
end

It allows you to write code like:

client.pipeline do |context|
    Async do
        context.set('y', 10)
    end

    Async do
        value = context.get('x')
        context.push('list', value)
    end
end

and it will pipeline as much as possible but for you, the code just looks sequential.

Thoughts?

huba commented 5 years ago

@ioquatix that's a pretty cool idea. However the main point of redis pipelining is to reduce the number of round trips and write system calls. Writing requests straight to the io seems to defeat this, while holding them in a buffer would make all the conditions block.

ioquatix commented 5 years ago

Yes that’s the point you batch as many command as possible before calling flush

huba commented 5 years ago

Hmm, does the condition block if you don't store the return value?

client.pipeline do |context|
    Async do
        context.set('x', 10)
        context.set('y', 11)
        context.set('z', 12)
        context.set('w', 13)
    end
end

I would expect all of those to go in one write.

ioquatix commented 5 years ago

I think that comes back to what does the user expect?

We could certainly have a top level scope that executes everything with no capacity for getting the result, or we could allow the user to provide a block.

e.g. here is one way to set them all at the same time like you want:

client.pipeline do |context|
    context.async do
        context.set('x', 10)
    end

    context.async do
        context.set('y', 11)
    end

    context.async do
        context.set('z', 12)
    end

    context.async do
        context.set('w', 13)
    end
end

You could also make it so that everything done at the top level just happens at the same time, i.e. doesn't block:

client.pipeline do |context|
    context.set('x', 10)
    context.set('y', 11)
    context.set('z', 12)
    context.set('w', 13)
end

One way to handle results is to use a block, but that's kind of a poor version of pipeline.async.

client.pipeline do |context|
    context.set('x', 10)
    context.set('y', 11)
    context.set('z', 12) {|result| ...}
    context.set('w', 13)
end # Might raise exception if pipeline fails?
ioquatix commented 5 years ago

From my POV, I'd like to see what code that uses pipeline looks like.

I don't think that code should explicitly know it's running in a pipeline - maybe it could even be the default? To me, it just seems like an overall more efficient strategy for buffering. Remember, spinning up async tasks is likely to be less latency than going over the network... perhaps by a large margin.

ioquatix commented 5 years ago

@davidor can you tell me your use case?

davidor commented 5 years ago

@ioquatix We have some parts of the code where we need to send several commands to redis and none of those commands depends on the result of any of the other commands. In that case, it is not efficient to send the commands one by one over the network.

I did a quick test with redis-rb pipelines. I used an ElastiCache instance to introduce some real network latency and run this simple script from my machine:

require 'redis'

client = Redis.new(host: 'some_remote_host')

t1 = Time.now
100.times { |i| client.set(i, i) }
puts "Without pipelines took: #{Time.now - t1}"

t2 = Time.now
client.pipelined do
  100.times { |i| client.set(i, i) }
end
puts "Without pipelines took: #{Time.now - t2}"

The first example takes almost 50x times more. Of course, this will vary a lot depending on where Redis is deployed. Also, for sure. the effect is not going to be the same in redis-async because several commands can be sent at the same time without waiting for the result of the previous one, but still, redis-async will send only one per packet.

In this case, those SETs could be grouped in an MSET, but there are cases where that is not possible, for example, INCR does not accept several keys. Also, there are times when we need to issue different commands together, for example INCR and SADD and the only way to group them is using pipelines.

ioquatix commented 5 years ago

@davidor that's super helpful.

ioquatix commented 5 years ago

I always like to add a note when I see people collaborating really well. Thanks everyone for your efforts here. I'll try to take a look at this tomorrow with the intent to merge.

ioquatix commented 5 years ago

I'm going to merge this and rework it.

davidor commented 5 years ago

Thanks for your help @ioquatix :+1:

Can you release a new version that includes this, please? Also, I've noticed that you released 0.3.4 a while ago but the tag is not in the repo.

ioquatix commented 5 years ago

Is https://github.com/socketry/async-redis/tree/v0.3.4 the tag you are referring to?