stefanwille / crystal-redis

Full featured Redis client for Crystal
MIT License
380 stars 61 forks source link

Check if in the connection is in a subscription loop #83

Closed prutya closed 5 years ago

prutya commented 5 years ago

Hi, I am building an SSE Server based on Redis pub-sub. Is there a way to check if a connection is already in a subscription loop? I want to reuse connections from pool to subscribe them to more redis channels.

UPD: I see there is a private method #already_in_subscription_loop?, gonna check if I can call it

UPD: Okay, I can monkey-patch it like this

class Redis
  def already_in_subscription_loop?
    strategy.is_a? Redis::Strategy::SubscriptionLoop
  end
end

redis = Redis.new

puts redis.already_in_subscription_loop?

# => false

But maybe making this method public could be a thing to consider for future releases?

UPD: I would also like to be able to call #with_connection on a pool instance to obtain a connection explicitly to #subscribe to more channels. The reason is I have multiple fibers on my SSE server and I can't call this method in a subscription handler block later.

Right now I have to spawn a redis connection for each event-stream request and this does not look like it can handle a lot of simultaneous streams.

stefanwille commented 5 years ago

Hi, connections in the connection pool are supposed to be idle and ready to be reused. As such, they should never be in a subscription loop.

Redis::PooledClient checks out a connection from the connection pool for a single command and then immediately returns it. This is a usage pattern that works for many things, but not for subscriptions. For a subscription, you really need a separate Redis instance.

Right now I have to spawn a redis connection for each event-stream request and this does not look like it can handle a lot of simultaneous streams.

Have you tried it? I would expect Redis to handle many connections very well, as that is something that Redis' architecture is very well suited for.

kostya commented 5 years ago

PooledClient can work with subscription, but it just checkout connection from pool until it loop stops. So you dont need such check, because in pool all other connections would be idle.

prutya commented 5 years ago

@stefanwille Okay, I see. But I can still call #subscribe on a subscription-looped connection, right? I need a way to figure out whether it is in such a loop before calling the method, because if it is not, I'll have to call a different #subscribe method (the one that requires a block argument).

Have you tried it? I would expect Redis to handle many connections very well, as that is something that Redis' architecture is very well suited for.

Yes, it works. The default redis configuraiton is 10_000 simultaneous connections, we don't have that much users, so we can live with it for now (and even tweak this config value later). Just wondering if reusing such connections is possible.

stefanwille commented 5 years ago

I think you want to call subscribe twice on the same connection? That should not work. You would need a separate connection per subscriber.

z64 commented 4 years ago

Hi @stefanwille -

Redis::PooledClient checks out a connection from the connection pool for a single command and then immediately returns it. This is a usage pattern that works for many things, but not for subscriptions. For a subscription, you really need a separate Redis instance.

In our application we were slightly mislead about this detail, because PooledClient provides #subscribe which allows you to do this "anti-pattern" that you noted above;

class Redis::PooledClient
  # ..

  def subscribe(*channels, &callback_setup_block : Redis::Subscription ->)
    with_pool_connection &.subscribe(*channels) { |s| callback_setup_block.call(s) }
  end
end

Our application is very latency sensitive so we highly value having a pool of client available.

We can change to using our own cached Redis connection, dedicated for subscription, with a light wrapper for error handling, reconnection, etc. - but I'm curious what your input is here.

Thank you!

stefanwille commented 4 years ago

Hi Zac,

I don't have a quick answer to your first question. I guess it could be that the the method should throw an exception. I haven't thought about this enough though.

Regarding your second question: There is nothing preventing you from "resetting" a Redis connection and returning it to the pool for reuse. That is what is happening already in PooledClient. You can see in its source code that PooledClient checks out a connection from a connection pool, does whatever it needs to, and then later returns it back to the pool. You can do the same.

About resetting a connection: a Redis connection is a bit stateful, and the most important bit off the top of my head is the selected database number. If you don't change the database number then you should be fine without performing a "reset" on the connection.

stefanwille commented 4 years ago

@z64

z64 commented 4 years ago

Thanks for the reply @stefanwille !

There is nothing preventing you from "resetting" a Redis connection and returning it to the pool for reuse.

Normally yes, but in the case of subscribe - take the following example:

require "redis"

redis = Redis::PooledClient.new

sub_connection = redis.pool.checkout
sub_connection.subscribe("foo") do |on|
  on.message do |c, m|
    p({c, m})
    sub_connection.unsubscribe(c)
  end
end

# We're done with pub/sub using sub_connection at this point, return it to the pool:
redis.pool.checkin(sub_connection)

# The following fails because the only connection in the pool was previously
# used for pub/sub, and is still internally configured as such:
# Unhandled exception: Command SET not allowed in the context of a subscribed connection (Redis::Error)
redis.set("foo", "bar")

In the context of my question, "reset" refers to resetting what I believe is @strategy inside Redis so that it may be used for general commands again if it is selected from the pool again later, since its pub/sub purpose is complete. But maybe there is some API in this shard that I am missing to do so already?

Another alternative we've considered is requesting that ysbaddaden/pool add something like pool.leak, so we can request a connection without any intention of returning it - ideally doing something like asynchronously preparing a replacement connection.


Aside, I will note that redis.unsubscribe (no arguments) from the example given on def subscribe fails to compile with:

Showing last frame. Use --error-trace for full trace.

In lib/redis/src/redis/commands.cr:1731:7

 1731 | unsubscribe channels.to_a
        ^----------
Error: recursive splat expansion: Array(NoReturn), Array(Array(NoReturn)), ...
stefanwille commented 4 years ago

@z64 If the connection is still thinking it is subscribed after you have called unsubscribe(), then this is a bug. You should not have to reset it manually in this case.

stefanwille commented 4 years ago

I have made this issue to track it better: https://github.com/stefanwille/crystal-redis/issues/106