NicolasLM / spinach

Modern Redis task queue for Python 3
https://spinach.readthedocs.io
BSD 2-Clause "Simplified" License
63 stars 4 forks source link

Feature idea: Allow a Task to define its maximum number of concurrent jobs #6

Closed juledwar closed 3 years ago

juledwar commented 3 years ago

I am aware of the named queues into which you can direct Tasks (and thusly limit the number of workers for that queue), but this doesn't quite fit my use case because it involves some painful restructuring of workers, and also would leave some workers idle while there are large queues elsewhere.

What would be really useful is to be able to pass a max_concurrent or similar to the Task so that it won't run more than that many concurrently.

Happy to pitch in and help fix this if you like the idea.

NicolasLM commented 3 years ago

This is an interesting use case, I see a few different directions we could take to tackle it.

Could you describe the problem you are currently facing in more detail? I want to make sure it is not due to the fact that tasks and workers are attached to a single queue at a time.

juledwar commented 3 years ago

Sure thing. Basically I have a Task that goes off and connects to a service on the Internet and does some reasonably heavyweight operations and downloads. That Task could potentially get queued up 100s of times, and I don't want to overwhelm the Internet services with huge numbers of concurrent Tasks - otherwise known as a DoS.

My workers are all threads inside my k8s-deployed app servers, to make it all nice and simple. To have separate workers for some queues would be not only an awkward change, it could leave a lot of workers idle when we have other large queues of jobs waiting. Conversely, if I put the 100s of downloads into a single queue, it will block other jobs.

Hope this makes sense.

NicolasLM commented 3 years ago

Got it, that's helpful. That is basically an issue about locking, a feature that has been on my mind for a while.

In my experience with other task queues there are multiple approaches to the problem:

  1. Preventing adding a task to the queue if a similar one is already there or is already running.
  2. Allowing to add the same task multiple times to the queue but giving them to workers one by one
  3. Allowing to add the same task multiple times to the queue, giving them to workers as soon as they are ready but raising a TaskAlreadyRunningError.

A few questions:

For the sake of discussion, there is also a use case I would like to support: limiting concurrency based on task arguments.

@spin.task(name='sync_with_server')
def sync_with_server(server_name: str, *args, **kwargs):
    # The task should be able to synchronize concurrently multiple servers,
    # but never the same server at the same time.
    pass

Finally, I'm working on #5 which I think is a prerequisite for this feature because one worker dying holding some kind of task lock should not prevent running the task forever.

juledwar commented 3 years ago

I think you cover many different desirable outcomes depending on a user's use case. Having a TaskAlreadyRunningError would be pretty handy for some of mine, actually.

What is the expected behavior when 100 tasks are scheduled at the same time? Should the task run just once or 100 times but one at a time? Do you see a use case for max_concurrent > 1?

These two questions amount to the same thing for me - if I schedule 100 tasks I would want them all to get processed, but with a maximum concurrency limit, so say at most 5 at a time. So given the case where I have 50 workers, I want only 5 at most of this particular Task to run at any given time. Your sync_with_server scenario seems very similar to this.

Getting #5 fixed would be awesome! I've been meaning to dive into that for ages but not found the time yet. We are using Spinach in production code here at Cisco so I should try to prioritise it if you need any help.

juledwar commented 3 years ago

I just filed #7 as a corollary to this. I wanted to do a temporary hack to get extra workers up in the same process, which would deal with different queues. However it seems pretty hard without creating a new engine, and also even harder to do that while using the flask integration.

Ideally I'd like to fix this issue. What can I do to help it along? Did you have a design in mind?

NicolasLM commented 3 years ago

We should figure out which of the three behaviors we want to support. I think that all three have legitimate use cases but different implementations.

You seem to favor option 3, so maybe we can start with this one?

juledwar commented 3 years ago

I think I favour option 2 actually, with 3 being a side goal in case you don't want to block on the queue. In other words, allow clients to schedule as many jobs as they want, and Spinach will observe the max_concurrency set on the Task and only actively execute at most that many at the same time.

Option 3 would come into it via an arg to schedule which says block_on_concurrent_limit=False, or similar.

I hope that makes sense?

Given that you are in Europe and I in Australia, I will try to stay around later in my evening today to discuss a bit more!

NicolasLM commented 3 years ago

Got it, my thoughts about the implementation of option 2.

Because the feature will have to be able to detect dead brokers (otherwise the current concurrency of a task could get stuck) I propose that we start by allowing max_concurrency only for retryable tasks.

Each task would get two int keys in Redis:

  1. _max_concurrency_{task_name}
  2. _current_concurrency_{task_name}

Things to implement:

@juledwar what do you think about this plan? Do you think you can give it a try?

juledwar commented 3 years ago

I don't know the code well enough to say that it's a good plan or not, but I can give it a whirl. Also, while I know Python very well, I do not know Lua at all :smile:

juledwar commented 3 years ago

Using the broker's register_periodic_task (or similar) to set in Redis _maxconcurrency{task_name} for each retryable task

I just started looking in more detail at the code. I was wondering why you mentioned register_periodic_task specifically? I was thinking this needed to be attach_tasks on the Engine, but then looking at that it doesn't do a lot.

NicolasLM commented 3 years ago

I mentioned register_periodic_task because it is a method that is called once when the machinery (Workers/Broker) starts to insert data into Redis. It creates the data structures used to schedule periodic tasks.

For the current feature we need something similar: inserting into Redis data structure that holds the max concurrency limit for each task. Feel free to reuse register_periodic_task or come up with another approach if you think that it would be better.

As you noticed, attach_task is purely a Python thing to help structure large applications, it won't help for this use case.

juledwar commented 3 years ago

Ok we need a sentinel (unless there's a better way?) to mark infinite concurrency (the current default) in the _max_concurrency_{task_name} key. Or should we just not set that key if there's no limit to concurrency?

NicolasLM commented 3 years ago

Right, I'd stay None in Python and -1 in Redis. It's important to have a key even if there is no limit to allow to change a task from having say a limit of 10 to no limit.

juledwar commented 3 years ago

Great, that works, although it's easier to use -1 in Python too. I aim to use a single hash key for each of max/current, whose fields are named after the task.name, unless you think that's wrong... ?

NicolasLM commented 3 years ago

That's completely fine.

juledwar commented 3 years ago

I have a completely untested first version here https://github.com/bigjools/spinach/tree/max_concurrency Can you let me know if you think this is heading in the right direction. I have some concerns around atomicity of operations when incrementing and decrementing the current concurrency values as Redis doesn't seem to have a decrement operation for hash field values, and I didn't want to stray from your other conventions of storing numbers as strings. The parts that increment and decrement are racy.

If it looks ok so far I'll start adding tests.

(BTW the pep8 checks are full of bad errors for all the default args after function parameter type declarations - I think flake8 does a better job)

NicolasLM commented 3 years ago

It looks good so far ! I made a couple of comments.

One thing to keep in mind is that atomicity is provided by Lua scripts. From the point of view of redis clients, a script either did not run or it ran fully. This is why read-update-write is fine as long as it is part of a Lua script and the reason why Spinach uses Lua so much.

The other thing is that for Redis everything is a byte-string, so storing 4, "4" or b"4" will result in the same data in Redis. This is why it is possible to call INCR on a string that contains a number.

About PEP8, the CI runs pycodestyle --ignore=E252,W503,W504 spinach tests.

bigjools commented 3 years ago

(posting from my personal account) Good info, thanks very much. I'll continue with it next week.

bigjools commented 3 years ago

FWIW have you considered Black to format the code? I use it on all of mine and forget about most PEP8 errors any more.

NicolasLM commented 3 years ago

Regarding Black, you are right, let's run the code through it once this feature is complete.

juledwar commented 3 years ago

Also have a look at isort to keep the imports consistent.