rq / rq

Simple job queues for Python
https://python-rq.org
Other
9.82k stars 1.41k forks source link

Removing a job from a queue only removes the first occurrence #1208

Open mattjegan opened 4 years ago

mattjegan commented 4 years ago

Given a Queue that has multiple of the same Job when we call Queue.remove(job) it only removes one occurrence of the job from the queue. This is particularly annoying when the callable in the job no longer exists such as when the callable has been renamed in the codebase.

For example:

>>> test_queue = Queue('test', connection=redis_conn)
>>> test_queue.get_jobs()
[]
>>> test_queue.enqueue_job(j)
Job('3e938d37-db81-49ae-9a6c-7a4a144f9d31', enqueued_at=datetime.datetime(2020, 3, 4, 4, 45, 10, 137250))
>>> test_queue.enqueue_job(j)
Job('3e938d37-db81-49ae-9a6c-7a4a144f9d31', enqueued_at=datetime.datetime(2020, 3, 4, 4, 45, 11, 42424))
>>> test_queue.get_jobs()
[Job('3e938d37-db81-49ae-9a6c-7a4a144f9d31', enqueued_at=datetime.datetime(2020, 3, 4, 4, 45, 11, 42424)), Job('3e938d37-db81-49ae-9a6c-7a4a144f9d31', enqueued_at=datetime.datetime(2020, 3, 4, 4, 45, 11, 42424))]
>>> test_queue.remove(j)
1
>>> test_queue.get_jobs()
[Job('3e938d37-db81-49ae-9a6c-7a4a144f9d31', enqueued_at=datetime.datetime(2020, 3, 4, 4, 45, 11, 42424))]
>>> test_queue.remove(j)
1
>>> test_queue.get_jobs()
[]

Does RQ support a quick way to remove all occurrences of a job in a queue, or better yet, all occurrences of a job in all queues?

selwin commented 4 years ago

May I know your use case?

You shouldn't enqueue the same job multiple times as that would lead to unexpected behaviors, like they'd store the result in the same Redis key.

mattjegan commented 4 years ago

For recurring jobs, they may occasionally be queued multiple times if the queue was long enough. For example, we have a job that runs every hour and if the queue depth has grown such that it is more than an hour long we may have 2 of the same job in the queue (I'm using rq-scheduler for this). The job always returns None so I'm not concerned with the result in redis.

selwin commented 4 years ago

RQ works on the assumption that a job ID is unique, andd this leads to an optimization made in this PR. This optimization may improve the performance of removing a job from a queue with millions of jobs since it only has to look for one job instance before quitting, without having to scan the whole queue.

For your use case, I think if needed we can add a limit argument to queue.remove(job, limit=5) so you can more easily maintain your queue. Do you mind opening a PR for this?

mattjegan commented 4 years ago

Ok thanks for linking that PR, I was unaware of that assumption. Perhaps this is not an issue for RQ core if that's a deliberate limitation but rather a bug with rq-scheduler for allowing multiples of a job to be in the queue at one time? I'm happy to raise any PRs required to help with this but would like to find the specific issue here first. Either:

  1. RQ should accept multiple of the same job on a queue and then we should add limit/remove_duplicates or similar to queue.remove.
  2. RQ should not accept multiple of the same job on a queue and therefore should raise an error if a user tries to enqueue the same job multiple times and rq-scheduler should obey this and not enqueue duplicates if one exists in the queue already.

What is your view on this? I don't want to go against existing design choices if they have led to dependencies on those assumptions elsewhere.

selwin commented 4 years ago

So I've been thinking about this a lot.

rq-scheduler is my first crack at attempting job scheduling and it allows cron style job scheduling like enqueing a job every minute, using the same job.id. This creates all sort of issues because all these jobs are saving their state information into the same Redis hash, overwriting each other.

So if you have a job that runs for longer than 1 minute and have a cron that requeues the same job every minute, there's a chance that something will go wrong. Storing job results is also iffy.

If I had to re-implement something similar, I'd have something along the lines of a job type/template, which is uniquely identified by the func, args and kwargs supplied to enqueue(). Each time this job type gets enqueued, it will create a new job with different IDs and everything should be taken care of since each job is a regular RQ job.

When this concept of job type is implemented, we should also get the benefit of being able to track how many job types are in one single queue, enforce uniqueness and other things.

dralley commented 4 years ago

@selwin, is it safe to have multiple jobs with the same ID, if they don't actually co-exist with each other?

In our case, we enqueue jobs onto a special "resource manager" worker that manages a set of resource locks and re-enqueues the job onto the queue of a particular worker which owns the locks. We want to be able to cancel the task regardless of whether it's on the resource manager queue (Job #1), or on the worker queue (Job #2)

I suppose in theory there could be conflict if the teardown of the original Job and the setup of the new one, do you see this as being something that is unsafe to do?

selwin commented 4 years ago

My advise would be to have the resource manager create a new job everytime and saves the new Job ID. Whenever a job is canceled, the resource manager can then cancels Job#1 along with Job#2 (whose ID it already saved).

Is this a change you can easily make? If not, what features do you need in RQ that will make your job easier?

dralley commented 4 years ago

If this isn't safe, then the easiest thing for us to do if this isn't safe would be to have Job #1 check whether our "task" has been cancelled before queuing Job#2. Once Job#2 is queued it'd just be cancelled the normal way via RQ.

spetoolio commented 2 years ago

I'm in a similar position here (more than a year later) wrestling with job IDs and their uniqueness. I have an rq-scheduler recurring job that every minute enqueues a batch of jobs to check certain objects. The job ids for those checks are check_object_{object_uuid}. I only want to add the job to the queue if the job is not already in the queue. So, I generate the job ID check_object_{object_uuid}, check the queue to see if the job exists, and only enqueue the job if one of the following is true:

  1. No job exists with that ID
  2. The job does exist, but is finished

With this logic, in theory, the queue length should be about the same size as the number of objects, depending on how many finish between the recurring job cycles. But, for some reason, after a few hours of this running, the jobs start getting duplicated, and my queue is showing duplicates of the same job, and it keeps growing. When left running for a few days, I see 10+ instances of the same job ID in my queues.

Did anyone is this thread figure out a good solution for this? My guess is that it's a race condition while the job is moving between registries, and a job with the same ID is queued, but I can't seem to accurately reproduce the problem. It seems related to the questions asked above...