NicolasLM / spinach

Modern Redis task queue for Python 3
https://spinach.readthedocs.io
BSD 2-Clause "Simplified" License
63 stars 4 forks source link

Add max_concurrency to tasks #8

Closed bigjools closed 3 years ago

bigjools commented 3 years ago

This change adds the max_concurrency parameter to Task, so that it can define the maximum number of simultaneous jobs that are running, no matter how many are queued and waiting to run.

Other drive-by changes:

Fixes https://github.com/NicolasLM/spinach/issues/6

bigjools commented 3 years ago

I realised after submitting that I didn't convert remove_job_from_running to a Lua script. I can do this later. I will also note that keeping the uninitialised max_concurrency as None in Python and -1 in Lua has created some tension between the two language domains. I've had to put extra checks in all the Lua for both -1 and nil, to avoid a ton of changes in tests that create Tasks and blow up without the check because of nil default values.

NicolasLM commented 3 years ago

Really great work!

A couple of things in addition to my comments in the code:

bigjools commented 3 years ago

Really great work!

Thank you! The Lua took me a day to get used to but it seems pretty simple. At least the duck typing is familiar :)

A couple of things in addition to my comments in the code:

  • The tests on Python 3.5 are failing, it's a simple str vs bytes issue

Right, I can add that to the tox environments so it gets tested locally too. Did you like that use of tox & docker-compose, BTW?

  • The feature is not implemented in the MemoryBroker. It shouldn't be too difficult as this broker is able to take many shortcuts and as long as the implementation is thread-safe it's good enough.

Yeah I was going to mention that after submitting the PR and completely forgot. I didn't want to take that on at the same time as this in case it was going in the wrong direction but since you're happy with things I'll see what I can do.

NicolasLM commented 3 years ago

Right, I can add that to the tox environments so it gets tested locally too. Did you like that use of tox & docker-compose, BTW?

I always rely on CI for running the test suite on multiple environments so I don't think that I will personally have a use for tox. That being said I'm completely fine with having it in the code base if it's part of someone's workflow.

bigjools commented 3 years ago
  • The tests on Python 3.5 are failing, it's a simple str vs bytes issue

I just realised 3.5 EOLed 7 months ago. Perhaps you should just drop the CI run there?

bigjools commented 3 years ago

Hi - I've found a problem in the _arbiter_func that I will need your help with please. These lines:

                if (stop_when_queue_empty and available_slots > 0
                        and received_jobs == 0):
                    logger.info("Stopping workers because queue '%s' is empty",
                                self._working_queue)

Cause a problem in a functional test I've added which I'm using to test that concurrency limits are observed properly in real tasks. It seems to work fine for the Redis broker, which is maybe a fluke, but it fails 100% of the time in the Memory broker. The test stops early due to the workers stopping, because there is no job being returned on a second worker due to concurrency limits - ie received_jobs == 0 but there are still jobs in the queue!

I suppose I can rewrite the test to avoid using stop_when_queue_empty but that makes things quite tricky and I thought you might have some ideas.

bigjools commented 3 years ago

This is the test I'm trying to get working:

@pytest.fixture(params=[MemoryBroker, RedisBroker])
def spin(request):
    broker = request.param
    spin = Engine(broker(), namespace='tests')
    yield spin

def test_concurrency_limit(spin):
    count = 0

    @spin.task(name='do_something', max_retries=10, max_concurrency=1)
    def do_something(index):
        nonlocal count
        assert index == count
        count += 1

    for i in range(0, 5):
        spin.schedule(do_something, i)

    # Start two workers; test that only one job runs at once as per the
    # Task definition.
    spin.start_workers(number=2, block=True, stop_when_queue_empty=True)
    assert count == 5
NicolasLM commented 3 years ago

I think that you uncovered a bug in the stop_when_queue_empty feature. We cannot rely anymore on the absence of task from the broker as an indication that the queue is empty.

One way around it would be to add an explicit call to the broker to check whether if the queue is actually empty or not.

                if (stop_when_queue_empty and available_slots > 0
                        and received_jobs == 0
                        and self._broker.is_queue_empty(self._working_queue)):

Since I don't think stop_when_queue_empty is actually used much in production, I don't mind the extra call to Redis once in a while to make this work.

(Note that it probably works on Redis currently because the content of the task in your test makes it very fast. I believe if you make it slightly longer it will fail also on Redis.)

I just realised 3.5 EOLed 7 months ago. Perhaps you should just drop the CI run there?

Yes, I will remove 3.5 from the supported versions but for you it might less work to just add the .decode() that's missing as I think that 3.5 is referenced in a few places.

bigjools commented 3 years ago

Yes, I will remove 3.5 from the supported versions but for you it might less work to just add the .decode() that's missing as I think that 3.5 is referenced in a few places.

Yeah no worries I already did that. And of course I can't use f-strings on python3.5 either.

Ok I think I addressed everything, let me know how it's looking.

bigjools commented 3 years ago

I'll also note once you are happy with the code I'll squash the commits and do a better commit message.

NicolasLM commented 3 years ago

I took the code for a ride and found a couple of issues.

Issue 1

By playing with small variations of the quickstart example (changing the number of workers, max_retry and max_concurrency), I managed to get Redis in an inconsistent state:

127.0.0.1:6379> hgetall spinach/_current_concurrency
1) "compute"
2) "100"
127.0.0.1:6379> hgetall spinach/_max_concurrency
1) "compute"
2) "5"

At this point, workers get stuck because they cannot take tasks anymore. Unfortunately I do not know exactly which combinations got me there but I should note that I got this with a single spinach process, without killing it (so no _running-jobs-on-broker-xxx key) and without jobs failing.

Issue 2

I managed to get the workers stuck in another way. When a worker processes an idempotent job and it fails it will set its status to NO_SET so that it gets re-enqueued for retry. This will remove the job from running in the broker, but will not decrement the concurrency key.

https://github.com/NicolasLM/spinach/blob/master/spinach/engine.py#L255-L268

I do not think that the solution is to change to Engine to the code below because that would not be atomic:

            elif job.status is JobStatus.NOT_SET:
                self._broker.remove_job_from_running(job)
                self._broker.enqueue_jobs([job])

Instead the enqueue_jobs of both brokers should be adapted to take the concurrency into account.

To reproduce:

@spin.task(name='compute', max_retries=10, max_concurrency=2)
def compute(a, b):
    raise RuntimeError('Error')

# Schedule a job to be executed ASAP
for _ in range(100):
    spin.schedule(compute, 5, 3)

print('Starting workers, ^C to quit')
spin.start_workers(number=5)
bigjools commented 3 years ago

Thanks for the feedback! I had not expected you to test so much yet, so thank you, I was planning on doing that myself next day as it's been a 3-day weekend here. I was not expecting it to be bug free yet.

bigjools commented 3 years ago

I've fixed the smaller points, and issue 2. I am not sure how to recreate issue 1 and I am wondering if it was to do with issue 2. I'll keep poking at it.

NicolasLM commented 3 years ago

Great! Now that I think about it, issue 1 was probably due to issue 2.

Is it ready for a final review?

bigjools commented 3 years ago

You can indeed review if you would like but I will squash it before you consider merging. I also would like to give it a heavier test on my application before merging - while I ran the quickstart example with quite a few variations and tests it would be prudent to see how it behaves in reality. I had to implement a workaround in my code to do the max concurrency before this (with multiple engines and locks), so I can slot in the current branch instead and if it all still works OK I will have good confidence that we can proceed.

On Wed, 5 May 2021 at 18:03, Nicolas Le Manchet @.***> wrote:

Great! Now that I think about it, issue 1 was probably due to issue 2.

Is it ready for a final review?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/NicolasLM/spinach/pull/8#issuecomment-832495224, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACY6FENH7KLCVSBMUVXEZZTTMD3VBANCNFSM43WIK3WA .

bigjools commented 3 years ago

I've been running this on my existing app all morning and it looks good so far. I've seen the concurrency limit observed correctly across multiple worker processes and threads, and I've killed and restarted the whole app/workers and it all recovers correctly too.

NicolasLM commented 3 years ago

Awesome work! Merged and released as 0.0.14.

bigjools commented 3 years ago

Excellent! Thanks for the initial hand-holding and for the nice code base which made it easy to get involved.