Bogdanp / dramatiq

A fast and reliable background task processing library for Python 3.
https://dramatiq.io
GNU Lesser General Public License v3.0
4.34k stars 312 forks source link

rate limiting using redis backend error #147

Closed weiztech closed 5 years ago

weiztech commented 5 years ago

i'm following documentation https://dramatiq.io/cookbook.html#rate-limiting and got error on running task with rate limiting using redis backend. tested using dramatiq 1.4.0 with ubuntu 18.04

Traceback (most recent call last): File ".../env/lib/python3.6/site-packages/dramatiq/worker.py", line 397, in process_message res = actor(*message.args, *message.kwargs) File "..env/lib/python3.6/site-packages/dramatiq/actor.py", line 213, in call return self.fn(args, **kwargs) File "./tasks.py", line 54, in print_current_date with ONE_CONCURRENT.acquire(): File "/usr/lib/python3.6/contextlib.py", line 81, in enter return next(self.gen) File ".../env/lib/python3.6/site-packages/dramatiq/rate_limits/rate_limiter.py", line 71, in acquire acquired = self._acquire() File "../env/lib/python3.6/site-packages/dramatiq/rate_limits/concurrent.py", line 45, in _acquire added = self.backend.add(self.key, 1, ttl=self.ttl) AttributeError: 'RedisBackend' object has no attribute 'add'

Bogdanp commented 5 years ago

Could you post your setup code so I can see it? The redis backend for sure has an add method.

weiztech commented 5 years ago

Thanks for reply,

Here's the code

from datetime import datetime
import time

from dramatiq import actor, set_broker
from dramatiq.rate_limits import ConcurrentRateLimiter
from dramatiq.brokers.rabbitmq import RabbitmqBroker
from dramatiq.results.backends import RedisBackend
from dramatiq.results import Results

result_backend = RedisBackend()
broker = RabbitmqBroker()
broker.add_middleware(Results(backend=result_backend))
set_broker(broker)
ONE_CONCURRENT = ConcurrentRateLimiter(result_backend, "one-concurrent", limit=1)

@actor(max_retries=0)
def print_current_date():
    with ONE_CONCURRENT.acquire():
        time.sleep(1)
        print("Done wait")
    print("Current Date", datetime.now())
Bogdanp commented 5 years ago

You'll want to replace the line

from dramatiq.results.backends import RedisBackend
from dramatiq.rate_limits.backends import RedisBackend

That should solve your issue!

weiztech commented 5 years ago

I thought its was the same class because the name is same.(just checked the code, its different..) its fixed now.

Thanks!

Bogdanp commented 5 years ago

Yep, that's definitely confusing! Glad you got it fixed.

Volhen commented 3 years ago

Hello. Please tell me I used from dramatiq.rate_limits.backends import RedisBackend from the example above, but it gives an error like this:

Traceback (most recent call last):
  File ".../venv/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message
    res = actor(*message.args, **message.kwargs)
  File ".../venv/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__
    result = self.fn(*args, **kwargs)
  File ".../task/fake.py", line 8, in print_current_date
    with ONE_CONCURRENT.acquire():
  File "/usr/lib/python3.6/contextlib.py", line 81, in __enter__
    return next(self.gen)
  File ".../venv/lib/python3.6/site-packages/dramatiq/rate_limits/rate_limiter.py", line 71, in acquire
    acquired = self._acquire()
  File ".../venv/lib/python3.6/site-packages/dramatiq/rate_limits/concurrent.py", line 49, in _acquire
    return self.backend.incr(self.key, 1, maximum=self.limit, ttl=self.ttl)
  File ".../venv/lib/python3.6/site-packages/dramatiq/rate_limits/backends/redis.py", line 51, in incr
    pipe.watch(key)
  File ".../venv/lib/python3.6/site-packages/rediscluster/pipeline.py", line 304, in watch
    raise RedisClusterException("method watch() is not implemented")
rediscluster.exceptions.RedisClusterException: method watch() is not implemented

Thanks in advance!

Volhen commented 3 years ago

Please tell me how I can better organize the trotting process in dramatiq. Thanks in advance!

mitangelo commented 2 years ago

Will rate limit RedisBackend also let u get the results?

mitangelo commented 2 years ago

[2022-07-14 16:06:51,619] [PID 10] [Thread-4] [dramatiq.broker.RedisBroker] [CRITICAL] Unexpected failure in after_process_message of <dramatiq.results.middleware.Results object at 0x7f95b09e5fa0>.
Traceback (most recent call last):
  File "/opt/venv/lib/python3.8/site-packages/dramatiq/broker.py", line 114, in emit_after
    getattr(middleware, signal)(self, *args, **kwargs)
  File "/opt/venv/lib/python3.8/site-packages/dramatiq/results/middleware.py", line 95, in after_process_message
    self.backend.store_result(message, result, result_ttl)
AttributeError: 'RedisBackend' object has no attribute 'store_result'```
alex-utk commented 1 year ago

@mitangelo also struggled a bit with this, but here's the solution:

import dramatiq
from dramatiq.brokers.rabbitmq import RabbitmqBroker
from dramatiq.results import Results
from dramatiq.middleware import GroupCallbacks

import dramatiq.results.backends as result_backend
import dramatiq.rate_limits.backends as rate_limit 

broker = RabbitmqBroker(max_priority=255)
res_backend = result_backend.RedisBackend() # here we have result redis backend
broker.add_middleware(Results(backend=res_backend))

rate_limiter = rate_limit.RedisBackend()  # here we have redis rate limiter
broker.add_middleware(GroupCallbacks(rate_limiter))

dramatiq.set_broker(broker)

Both classes have same name which makes usage a bit confusing