wooga / eredis

Erlang Redis client
MIT License
627 stars 279 forks source link

Support for pubsub #13

Closed thijsterlouw closed 12 years ago

thijsterlouw commented 13 years ago

I believe eredis does not support Redis' pubsub ( http://redis.io/topics/pubsub ) Is someone already working on this, or is there interest in adding this feature?

knutin commented 13 years ago

Hi Thijs,

You are right, there is no support for pubsub at the moment. I think it is very interesting as it allows building very interesting applications. There are however some important questions around how it should be implemented.

I think the publish command is just a normal command that returns immediately, so this would already be supported.

Subscribing to a channel requires a dedicated connection where you only listen for messages (and subscribe/unsubscribe). You can also subscribe to a pattern. The messages will be pushed to the socket. This is a radical departure from the current architecture of eredis_client.erl, so I think it should be implemented in a separate module. It could reuse the parser and the general idea of an active once socket.

There is also some design questions that needs to be answered:

Maybe providing both the "subscribe this Erlang process to messages on this channel/pattern" and "execute this callback for every message in this channel/pattern" makes sense. I think the answer to many of the above questions depends on how you want to use it within your own application so not being to opinionated would maybe appeal to a broader audience.

Knut

thijsterlouw commented 13 years ago

Hi Knut, thanks for your detailed reply! You raised some good questions.

knutin commented 13 years ago

Hi Thijs, thanks for good answers. Here are some further thoughts and questions:

I think the first step in implementing something like this is to do the "core" first, ie. a process (a gen_server similar to eredis_client) subscribing to a single topic, executing a callback for every message it receives. On top of this different behaviours could be implemented by providing a callback function.

thijsterlouw commented 13 years ago
knutin commented 13 years ago

I think we agree now on a very good solution. We let the eredis client process execute a callback for every message it receives, with this callback (and callback state) it possible to implement the blocking style and the non-blocking style of integration with Erlang processes. This allows users to choose the implementation that best matches their needs.

The challenge I see with this is that you need to provide a way for the callback to handle requests, so you can do stuff like "subscribe this erlang process to this channel". You also need a way for the callback to issue Redis commands, like "subscribe to this new topic requested by the process".

I wonder if this would best be implemented by using a behaviour. So for the "blocking flow-control-inside-eredis"-style, there would be one module implementing this behaviour. For the "cast a message to a process"-style, another module implementing the behaviour.

What do you think?

Another problem is how to send requests to redis from inside the subscribed clients. Redis might be constantly pushing you messages, so there is no way of doing a blocking call to redis from inside the client.

thijsterlouw commented 13 years ago

This week I will have some time to work on this; I will let you know when I have some progress

jdavisp3 commented 13 years ago

Whoops, I started on this as well, and without reading this thread first :/ My patch is here: https://github.com/wooga/eredis/pull/18

I don't think it addresses all the issues raised in this discussion.

It's using messages, so there is not flow control at the moment. I would describe it as 'bare metal' as it doesn't do any subscription management at all. It does send messages when the connection goes down and up so clients can re-subscribe.

I just extended eredis_client for this. Having a separate client seemed like it would end up with a lot of duplicate code.

If there is interest in my approach, I could convert it to use callbacks pretty easily.

jdavisp3 commented 13 years ago

I do kind of like the 'active_once' flow control system, though, which uses messages not callbacks. That would require a 1:1 relationship between subscribers and clients.

jdavisp3 commented 12 years ago

Ok, my pubsub-2 branch has my latest stab at this. I went with something like the 'controlling process' paradigm along with the 'active once' mode from the gen_tcp module. Pubsub messages must be ack'ed before the next one is sent, otherwise the redis client will stop setting the tcp connection to active once.

This should handle the flow-control issues, I think.

Thoughts?

knutin commented 12 years ago

Hi,

Thanks for the patch. Nice work!

I took your work on eredis_client.erl and moved it into eredis_sub_client.erl. I ripped out the normal client part of it, so we now have a client that only does subscribing. To make it even more simple, the only time you can specify channels are when starting the client.

The code is in the branch "pubsub2" on woogas eredis repository.

My colleague Paolo noticed that with the "active once" approach, that Redis will buffer the messages if the client is unable to keep up. I have changed this to queue the messages inside eredis and provide a configurable way of dealing with overflow. This should happen only in catastrophic cases, so the two options for now is drop the message queue or shut down the eredis client.

What do you think of this?

jdavisp3 commented 12 years ago

I like this direction! I think this is looking good. In my particular use case, I need to dynamically adjust the subscriptions over time. I think it would be pretty easy to support that, do you agree? Maybe a set_channels API?

The client as it stands cannot support the pattern based subscription commands PSUBSCRIBE/PUNSUBSCRIBE. I've never used them, I don't know how badly you want to support them.

knutin commented 12 years ago

Sorry for taking a month to reply.

The problem with sending commands to Redis while subscribing to a channel, is that instead of the reply being the next thing coming down the socket, you might get any number of subscription messages which you need to handle before you get to the actual reply of the command. This is problematic for several reasons, one being that the calling process might also be the one to receive the messages, resulting in a dead lock.

Would it be feasible for your use case to start a new eredis driver for each channel?

I guess it would be fairly easy to add support for patterns by duplicating the functionality for the normal subscription.

jdavisp3 commented 12 years ago

This is true. It's just that if you happen to have a large number of channels that you might want to subscribe to, you end up with a large number of connections, so it's not so scalable in that sense. What if the subscribe/unsubscribe API were asynchronous and delivered erlang messages to the controlling process?

knutin commented 12 years ago

I think an asynchronous API would be much easier to implement in the driver. Good suggestion! I will look into it.

knutin commented 12 years ago

In 20a72f9 there is an implementation of the asynchronous API. What do you think? Would this provide what you need for your use case?

jdavisp3 commented 12 years ago

I like!! I think this would be perfect.

jdavisp3 commented 12 years ago

Although I don't personally need it, I think it would be easy to add support for PSUBSCRIBE and PUNSUBSCRIBE down the road, just keep the patterns as a separate list to the individual channels and send all messages to the controlling process without worrying about pattern matching in erlang.