vapor / redis

Vapor provider for RediStack
MIT License
459 stars 58 forks source link

pipelining #85

Closed John-Connolly closed 3 years ago

John-Connolly commented 6 years ago

It would be nice to have a way to send multiple commands at once. This api feels a bit out of place now in vapor 3 but something similar would be nice.

 let pipeline = connection.makePipeline()
 let result = try pipeline
         .enqueue(command: "SET", arguments: ["KEY", "VALUE"])
         .enqueue(command: "INCR", arguments: ["COUNT"])
         .execute() // Future<[RedisData]>
tanner0101 commented 6 years ago

This should definitely be possible.

Joannis commented 6 years ago

This looks really good to me 👍

ericchapman commented 5 years ago

@John-Connolly I just saw you had requested this. I coincidentally added a PR for this earlier today. Hopefully @tanner0101 will merge :)

https://github.com/vapor/redis/pull/143

I used a closure to wrap it and also added multi support. Example below

conn.multi { pipe in
    pipe.command("INCR", args: RedisData(bulk: "key1"))
    pipe.command("GET", args: RedisData(bulk: "key3"))
    pipe.command("SET", args: RedisData(bulk: "key3"), RedisData(bulk: "string"))
    pipe.command("GET", args: [RedisData(bulk: "key3")])
    pipe.command("PING")
}
ericchapman commented 5 years ago

@tanner0101 Maybe better to move this conversation over here rather than in the PR so we can close on the API.

So your suggested API was something like

let a = redis.get("foo") // ELFuture<RedisData>
let b = redis.get("bar") // ELFuture<RedisData>
a.and(b).map {
    let (a, b) = $0
    print(a) // RedisData
    print(b) // RedisData
}

I was pointing out that in order for a transaction to work, you have to somehow tell the "GET foo" call that this will be a transaction and it needs to wait and get ALL of the commands that will be sent in the transaction before sending them.

I do agree that we can use futures to return the results of each indvidual command so the user can decide if they want the response or not. My compromise would be as follows

let pipeline = redis.makePipeline()
_ = pipeline.command("GET", [RedisData(bulk: "foo")])
_ = pipeline.command("GET", [RedisData(bulk: "bar")]).map { resp in
   // TODO: Do something with this response
}
pipeline.execute()

You can then create a convience closure like you would for getting a pooled DB connection where the closure will "make" and "execute" the pipeline.

redis.pipeline { pipeline in
    _ = pipeline.command("GET", [RedisData(bulk: "foo")])
    _ = pipeline.command("GET", [RedisData(bulk: "bar")]).map { resp in
        // TODO: Do something with this response
    }
}

I like the way @John-Connolly suggested we chain the pipeline, however, that will make it difficult to get individual responses back from each submission.

What do you guys think?

tanner0101 commented 5 years ago

Check out my comment here first so that we are on the same page regarding the synchronicity: https://github.com/vapor/redis/pull/143#discussion_r264830504

Transactions seems like a separate issue / API to me, not necessarily related to pipelining.

John-Connolly commented 5 years ago

Chiming in late here. I ended just writing my own redis client because I need something that was more performant. What I ended up doing is having all commands implicitly pipelined i.e you don't have to wait until a command has completed before sending another. I also made an api that allows you to batch commands to save os calls. That allows you to queue up a lot of commands than commands to a ByteBuffer than it calls flush when you want to send them all. This has limited use because if you send a bunch of commands in a tight loop it will most likely have the same behavior and it adds a lot of complexity for error handling in the client code.

John-Connolly commented 5 years ago

That being said I am excited for Vapors redis to work in a similar way so I can go back to using it!

tanner0101 commented 5 years ago

@John-Connolly yup, that sounds similar to how NIORedis (https://github.com/mordil/nio-redis) works, minus the helpers for combining ByteBuffer. See https://github.com/vapor/redis/pull/143#discussion_r264452908 for more detailed discussion.

FWIW, I think that NIO (ByteToMessageEncoder, Channel, etc) should handle minimizing OS calls if possible. It might already do that actually, idk. But that's not something that a user of NIO should have to worry about.

John-Connolly commented 5 years ago

Nice! NIO redis sounds great

Mordil commented 5 years ago

Following up on this thread - I still don't have an adequate API for supporting a pipeline API, but in RediStack, and as of Redis 3.4.0 (and #149), it should be possible to use the same connection to queue requests.

This is "pipelining" as according to Redis' topic documentation, and NIO itself tries to buffer writes & reads to cut down on expensive OS calls.

@John-Connolly I'm interested in what you have with regards to ByteBuffers - would you be willing to draft a use case and example of what you have in a new issue on RediStack so that I can evaluate dropping it into the library?

Also, do you think there's enough available between RediStack & Redis 3.4.0 to close this issue?

ericchapman commented 5 years ago

@Mordil So this was the other reason I created a protocol for the helper methods. That allowed me to create a wrapper for transactions (like MULTI/EXEC) without having to reimplement all of the helper methods.

I made my repo public here. You can see how I implemented multi in the RedisApiTransaction object.

From prior discussions I had with @tanner0101 on this, I think the NIO connection makes pipelining not so important, since the NIO connection doesn't have the round trip penalty per call (as I understand it), however, there may still be a desire for MULTI/EXEC support. You can see all I did was the following for pipelining and then you can still build a MULTI/EXEC wrapper on top of it, but you won't have the helper methods available unless you protocol them or re-implement.

    public func send(pipeline commands: [[String]]) -> Future<[RedisApiData]> {
        let promise = self.eventLoop.newPromise([RedisApiData].self)

        // Dispatch the requests
        var futures = [Future<RedisData>]()
        for command in commands {
            let data = command.map { RedisData(bulk: $0) }
            futures.append(self.redis.send(RedisData.array(data)))
        }

        // Flatten them and return the array of responses
        _ = futures.flatten(on: self.worker).do { (response: [RedisData]) in
            promise.succeed(result: response)
        }

        return promise.futureResult
    }

Anyways, all that being said, I think you have enough there to not need this and I am good closing it.

Mordil commented 3 years ago

This feature request has been upstreamed to RediStack as Issue 88.

If anyone has additional thoughts on this (especially as it's been almost 3 years since it was originally proposed) please continue discussion in the new issue.

I am planning on making this feature one of the core pieces of RediStack 2.0 (as some changes to how commands are created, passed, etc. need to be reworked to make this feature possible)