celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.87k stars 928 forks source link

Add support for Queue max_length to other transports #799

Open chadrik opened 7 years ago

chadrik commented 7 years ago

The librabbitmq and pyamqp transports support rabbitmq's x-max-length policy via the max_length attribute of Queue. When enabled this causes old messages to be discarded to make room for new messages.

More detail from the rabbitmq blog:

If we want that our queues don't get more messages than certain threshold, we can configure that via the x-max-length argument when we declare the queue. This is rather neat and simply way to control capacity; if our queue reaches the threshold and a new message arrives, then messages that are at the front of the queue, –older messages–, will be dropped, making room for the newly arrived messages. One of the reasons for this behaviour is that old messages are probably irrelevant for your application, so new ones are let into the queues.

I'd like to add emulation for this behavior to the redis and memory virtual transports. If this sounds like something that you'd be willing to accept, I can make the pull requests.

georgepsarakis commented 7 years ago

Hello @chadrik . Sounds useful, can you give a brief outline on how you have designed this feature?

chadrik commented 7 years ago

@georgepsarakis I haven't implemented it yet, but for redis I'm hoping it's as simple as adding a pipeline with LTRIM to redis.Channel._put in the case that max_length is set on the Queue. Here's a sketch:

import redis
import random
maxlen = 5
r = redis.Redis(host='localhost', port=6379, db=0)
pipe = r.pipeline()
pipe.lpush('KEY', random.randint(0, 10))
# discard older messages
pipe.ltrim('KEY', 0, maxlen - 1)
result = pipe.execute()
chadrik commented 7 years ago

Here's a rough draft for redis:

diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 763b3dfa..9c03e20c 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -485,6 +485,7 @@ class Channel(virtual.Channel):
         self.ResponseError = self._get_response_error()
         self.active_fanout_queues = set()
         self.auto_delete_queues = set()
+        self.max_length_queues = {}
         self._fanout_to_queue = {}
         self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}

@@ -764,7 +765,16 @@ class Channel(virtual.Channel):
         pri = self._get_message_priority(message, reverse=False)

         with self.conn_or_acquire() as client:
-            client.lpush(self._q_for_pri(queue, pri), dumps(message))
+            key = self._q_for_pri(queue, pri)
+            max_length = self.max_length_queues.get(queue)
+            if max_length:
+                with client.pipeline() as pipe:
+                    pipe.lpush(key, dumps(message))
+                    # discard older messages if necessary
+                    pipe.ltrim(key, 0, max_length - 1)
+                    pipe.execute()
+            else:
+                client.lpush(key, dumps(message))

     def _put_fanout(self, exchange, message, routing_key, **kwargs):
         """Deliver fanout message."""
@@ -777,6 +787,9 @@ class Channel(virtual.Channel):
     def _new_queue(self, queue, auto_delete=False, **kwargs):
         if auto_delete:
             self.auto_delete_queues.add(queue)
+        max_length = kwargs.get('max_length')
+        if max_length:
+            self.max_length_queues[queue] = max_length

     def _queue_bind(self, exchange, routing_key, pattern, queue):
         if self.typeof(exchange).type == 'fanout':
@@ -792,6 +805,7 @@ class Channel(virtual.Channel):

     def _delete(self, queue, exchange, routing_key, pattern, *args, **kwargs):
         self.auto_delete_queues.discard(queue)
+        self.max_length_queues.pop(queue, None)
         with self.conn_or_acquire(client=kwargs.get('client')) as client:
             client.srem(self.keyprefix_queue % (exchange,),
                         self.sep.join([routing_key or '',
georgepsarakis commented 7 years ago

Redis documentation states that this operation should be close to O(1). Your solution looks good to me.