miguelgrinberg / Flask-SocketIO

Socket.IO integration for Flask applications.
MIT License
5.31k stars 888 forks source link

Add support for redis sentinel cluster #1979

Closed wkedz closed 1 year ago

wkedz commented 1 year ago

Hi!

I would like to use SocketIO with a redis sentinel cluster. Unfortunately, right now this is not possible, because I cannot pass additional arguments to queue managers - https://github.com/miguelgrinberg/Flask-SocketIO/blob/main/src/flask_socketio/__init__.py#L200

Is your feature request related to a problem? Please describe.

Yes, my problem is related to one issue that is now closed. https://github.com/miguelgrinberg/Flask-SocketIO/issues/359 I think that it shouldn't.

Basically, all implementation is already provided in python-socketio https://github.com/miguelgrinberg/python-socketio/commit/7dbc47049a605f78d795b6f59d458522d67c6fe6

Describe the solution you'd like

I need to SocketIO to pass additional arguments to managers. For example something like this

import inspect

class FakeSocketIO(object):

  def __init__(self, url, **kwargs):

    self.queue = None

    if url.startswith(('redis://', "rediss://")):
      queue_class = FakeRedisManager
    else:
      queue_class = FakeKombuManager

    init_arguments = self.filter_init_args(queue_class, kwargs)
    self.queue = queue_class(**init_arguments)

  def run(self):
    self.queue.run()

  def filter_init_args(self, queue_class, kwargs):
    #Get __init__ names 
    init_var_names = inspect.signature(queue_class.__init__).parameters
    init_arguments = {}
    #only get args for target queue manager except self.
    for name in init_var_names:
      if name != "self":
        init_arguments[name] = kwargs.get(name, None)
    return  init_arguments

class FakeKombuManager(object):

  def __init__(self, connection_options=None, exchange_options=None,
                 queue_options=None, producer_options=None):
    self.connection_options = connection_options or {}
    self.exchange_options = exchange_options or {}
    self.queue_options = queue_options or {}
    self.producer_options = producer_options or {}

  def run(self):
    print("self.connection_options: ", self.connection_options)
    print("self.exchange_options: ", self.exchange_options)
    print("self.queue_options: ", self.queue_options)
    print("self.producer_options: ", self.producer_options)

class FakeRedisManager(object):

  def __init__(self, redis_options=None):
    self.redis_options = redis_options or {}

  def run(self):
    print("self.redis_options: ", self.redis_options)

if __name__ == "__main__":

  #Redis test 
  print("RedisManager with redis options and wrong argument")
  sioR = FakeSocketIO("redis://", arg_not_for_RedisManager=11, redis_options={"arg1":"arg1", "arg2":"arg2"})
  sioR.run()

  print("RedisManager without redis options")
  sioR = FakeSocketIO("redis://")
  sioR.run()

  print("KombuManager with connection_options and wrong argument")
  sioK = FakeSocketIO("sentinel://", arg_not_for_KombuManager=11, connection_options={"master_name":"master", "arg2":"arg2"})
  sioK.run()

  print("KombuManager with only wrong argument")
  sioK = FakeSocketIO("sentinel://", arg_not_for_KombuManager=12)
  sioK.run()

Describe alternatives you've considered

I don't think that there are any alternatives.

Logs


  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/socketio/pubsub_manager.py", line 152, in _thread
    for message in self._listen():
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/socketio/kombu_manager.py", line 125, in _listen
    with connection.SimpleQueue(reader_queue) as queue:
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/connection.py", line 787, in SimpleQueue
    return SimpleQueue(channel or self, name, no_ack, queue_opts,
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/simple.py", line 135, in __init__
    consumer = messaging.Consumer(channel, queue, accept=accept)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/messaging.py", line 387, in __init__
    self.revive(self.channel)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/messaging.py", line 400, in revive
    channel = self.channel = maybe_channel(channel)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/connection.py", line 1052, in maybe_channel
    return channel.default_channel
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/connection.py", line 895, in default_channel
    self._ensure_connection(**conn_opts)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/connection.py", line 433, in _ensure_connection
    return retry_over_time(
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/utils/functional.py", line 312, in retry_over_time
    return fun(*args, **kwargs)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/connection.py", line 877, in _connection_factory
    self._connection = self._establish_connection()
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/connection.py", line 812, in _establish_connection
    conn = self.transport.establish_connection()
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/transport/virtual/base.py", line 949, in establish_connection
    self._avail_channels.append(self.create_channel(self))
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/transport/virtual/base.py", line 927, in create_channel
    channel = self.Channel(connection)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/transport/redis.py", line 737, in __init__
    self.client.ping()
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/utils/objects.py", line 30, in __get__
    return super().__get__(instance, owner)
  File "/usr/lib/python3.10/functools.py", line 981, in __get__
    val = self.func(instance)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/transport/redis.py", line 1239, in client
    return self._create_client(asynchronous=True)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/transport/redis.py", line 1195, in _create_client
    return self.Client(connection_pool=self.async_pool)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/transport/redis.py", line 1233, in async_pool
    self._async_pool = self._get_pool(asynchronous=True)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/transport/redis.py", line 1418, in _get_pool
    return self._sentinel_managed_pool(asynchronous)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/transport/redis.py", line 1408, in _sentinel_managed_pool
    raise ValueError(
ValueError: 'master_name' transport option must be specified.
miguelgrinberg commented 1 year ago

This is how you do it:

mgr = ManagerClass("your", "arguments", "here")
socketio = SocketIO(client_manager=mgr)

All you need to do is pick a manager class that does what you need. I assume the RedisManager or KombuManager classes are what makes more sense to adapt it to work with a cluster, but I'm not sure if you can make this work just by adding extra arguments. Worst case you need to create a custom class that inherits from these and makes the necessary changes to work with a cluster.

wkedz commented 1 year ago

Hi @miguelgrinberg

I pass the manager through client_manager and it works as it should.

Code:

app.logger.info(f"Connecting SocketIO to redis at {REDIS_HOST}")
url = REDIS_HOST # self.server_options.get('message_queue', None)
channel = "flask-socketio" # self.server_options.pop('channel', 'flask-socketio')
connection_options={"transport_options" : {"master_name":"mymaster"}}
write_only = False # app is None
client_manager = socketio.KombuManager(url=url, channel=channel, write_only=write_only, connection_options=connection_options)
sio = SocketIO(app, client_manager=client_manager, logger=True, engineio_logger=True)

Logs:

[2023-04-26 11:01:25,031] INFO in server: Connecting SocketIO to redis at sentinel://0.0.0.0:26379
Server initialized for eventlet.
h2j59VBXCfZbUIPRAAAA: Sending packet OPEN data {'sid': 'h2j59VBXCfZbUIPRAAAA', 'upgrades': ['websocket'], 'pingTimeout': 20000, 'pingInterval': 25000}
kombu backend initialized.
h2j59VBXCfZbUIPRAAAA: Received packet MESSAGE data 0{}
h2j59VBXCfZbUIPRAAAA: Sending packet MESSAGE data 0{"sid":"cI-uIDemeBQ6rEnlAAAB"}
pubsub message: emit
h2j59VBXCfZbUIPRAAAA: Sending packet MESSAGE data 2["my event",{"data":1}]
pubsub message: emit
h2j59VBXCfZbUIPRAAAA: Sending packet MESSAGE data 2["my event",{"data":2}]

Thanks for your help!

Closing, because there is no need to pass additional args to SocketIO.