bikeshedder / deadpool

Dead simple pool implementation for rust with async-await
Apache License 2.0
1.08k stars 137 forks source link

Redis PubSub #226

Open Owez opened 2 years ago

Owez commented 2 years ago

There's a .publish() method in deadpool redis but I can't find any way to subscribe to channels for pubsub usage in redis. How should I go about this, and are there any async alternatives if deadpool doesn't support it?

bikeshedder commented 2 years ago

deadpool_redis::Connection is just a wrapper around redis::aio::Connection which doesn't support into_pubsub as it requires ownership of the object and not just a reference to it. Most users don't need shdrt lived PubSub objects and therefore I never really bothered implementing a solution for this. Just create the object directly and keep it for the entire duration of the program. (or use Connection::take as explained below)

It could be possible to provide this feature by implementing some clever wrapping utilizing the redis::aio::Connectioninto_pubsub and `redis::aio::PubSub::into_connection methods. If this is indeed a valid use case of yours feel free to submit a PR for this feature.

In the mean time the easiest way to get a PubSub object is by calling Pool::get followed by a Object::take thus detaching the object from the pool and taking full ownership over the underlying redis::aio::Connection object:

let obj = deadpool_redis::Connection::take(pool.get().await?);
let pubsub = obj.into_pubsub();

Due to the take() call the object just won't be returned to the pool afterwards and there is currently no way to manually add a connection back to the pool.

bikeshedder commented 1 year ago

I'm closing this as I don't think it's very practical to use a pool for listening for messages. I might be totally wrong so please feel free to comment here and/or open a new issue describing a use case where this is needed.

Owez commented 1 year ago

Yep your right, this ended up being an issue with the redis pubsub implementation itself. I think it was a third party library which was merged into it?

sp1ff commented 1 year ago

Would keyspace notifications be a use case? I need to know about changes to a particular key, but once they've happened, I'm done with it.

bbaldino commented 1 year ago

I'm also interested in a pooled pubsub usecase. We're using an adhoc pubsub channel to add some request/response type semantic; a stream for the return channel means that the data would be persisted, and we want something more ephemeral so we don't have to worry about cleanup, so looking at using pubsub. Pubsub works well, but creating a new connection every time isn't very efficient.

bikeshedder commented 1 year ago

You should be able to call as_pubsub on a deadpool_redis::Connection object without any troubles as it implements both Deref and DerefMut for the underlying redis::Connection.

Having a second look at it I realized it's as_pubsub and not into_pubsub and doesn't consume self. So you don't even need to Object::take the Connection out of the wrapper first. My https://github.com/bikeshedder/deadpool/issues/226#issuecomment-1291711091 back then was giving wrong instructions.

Is that what you're looking for? :thinking:

bikeshedder commented 1 year ago

I once again fell into the trap of confusing redis::aio::Connection with redis::Connection. I've just marked my previous comment as outdated as it was highly misleading.

https://github.com/bikeshedder/deadpool/issues/226#issuecomment-1291711091 is still correct about the way it needs to be implemented. The connection wrapper needs to provide a as_pubsub method which calls into_pubsub underneath and returns a PubSub wrapper object. That wrapper must borrow mutably from connection so it can take the redis::aio::Connection from it, call into_pubsub on it and on drop return it to the connection wrapper by calling into_connection first and restoring the original state of the connection wrapper.

Owez commented 1 year ago

Would that solution be a workaround because of the current limitations in redis::aio?

My eventual solution was to write a custom PubSub implementation for my specific usecase and use deadpool for the plain TCP connections, but obviously that was a heavy-handed approach 🙂

carlocorradini commented 5 months ago

Any update 😢

paulkre commented 3 months ago

redis::aio::Connection is deprecated and redis::aio::MultiplexedConnection does not have the into_pubsub method. It has a subscribe method in redis ^0.26 though. Can we upgrade the dependency on redis?

attila-lin commented 3 months ago

redis::aio::Connection is deprecated and redis::aio::MultiplexedConnection does not have the into_pubsub method. It has a subscribe method in redis ^0.26 though. Can we upgrade the dependency on redis?

updated.

musjj commented 2 weeks ago

Are there any updates here? What's currently the idiomatic way to subscribe to a stream of messages with deadpool-redis (like with PubSub::on_message)? Do you need to open a new connection every time? Like:

redis::Client::open("redis://127.0.0.1");
bikeshedder commented 1 week ago

I just checked the documentation and example code from the current redis version and it seams that PubSub now works very different from previous versions. Instead of calling into_pubsub you create a dedicated PubSub object via the Client::get_async_pubsub method.

After a bit more digging it is possible to convert a Connection into a PubSub but there doesn't seam to be a way to get back the original connection once it's been converted into a PubSub object.

I currently see only one way for this to work:

It really is a bummer that the redis crate makes a distinction between Connection, PubSub and Monitor which are all different types of connections which can't be interchanged.

It's weird as the MultiplexedConnection even provides methods for (un)subscribing but there doesn't seam to be a way to access the published messages: https://docs.rs/redis/latest/redis/aio/struct.MultiplexedConnection.html

musjj commented 1 week ago

Yeah, the way the connection is designed feels very awkward. Even stranger is that PubSub can't even publish messages, betraying its name.

Would a change in redis-rs be necessary here? I thought about opening an issue there, but I'm not sure what kind of change would be ideal.