PolicyStat / jobtastic

Make your user-responsive long-running Celery jobs totally awesomer.
http://policystat.github.com/jobtastic/
MIT License
644 stars 61 forks source link

Thundering herd avoidance doesn't wait the full `herd_avoidance_timeout` #6

Open winhamwr opened 12 years ago

winhamwr commented 12 years ago

The acquire_lock implementation is brain-dead. Make it do what it looks like it should be doing.

Maybe take a look at: http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html

rhunwicks commented 9 years ago

I was getting spurious errors in tests as a result of a failed call to cache.decr() when the key didn't exist. I didn't troubleshoot exactly how that happened, but I have replaced the acquire_lock function and stopped the errors.

I don't know how to write a test that can simulate this - presumably you need multiple processes running at once.

Based on the link here, and some others I am assuming that we are using either redis or memcached, and if we aren't then we are running tests and there is a single process anyway so nothing else will try and run the task while we are

On that basis, this seems to work:

@contextmanager
def acquire_lock(lock_name, timeout=900):
    """
    A contextmanager to wait until an exclusive lock is available,
    hold the lock and then release it when the code under context
    is complete.

    Attempt to use lock and unlock, which will work if the Cache is Redis,
    but fall back to a memcached-compliant add/delete approach.

    See:
    - http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html
    - http://celery.readthedocs.org/en/latest/tutorials/task-cookbook.html#ensuring-a-task-is-only-executed-one-at-a-time

    """
    try:
        redis = cache.client.client
        have_lock = False
        lock = redis.lock(lock_name, timeout=timeout)
        try:
            have_lock = lock.acquire(blocking=True)
            if have_lock:
                yield
        finally:
            if have_lock:
                lock.release()
    except AttributeError:
        have_lock = False
        try:
            while not have_lock:
                have_lock = cache.add(lock_name, 'locked', timeout)
            if have_lock:
                yield
        finally:
            if have_lock:
                cache.delete(lock_name)
winhamwr commented 9 years ago

Thanks, rhunwicks! This looks like a solid implementation for Redis and Memcached. I'm not sure how to test this, either. Hrm. Async testing is hard.

rhunwicks commented 8 years ago

We've been using it in production for almost a year - do you want a PR on it without a test - I think it's probably more useful than the existing implementation.

winhamwr commented 8 years ago

do you want a PR on it without a test

At this point, that's probably the pragmatic option, yup.