Closed ducdigital closed 7 years ago
Hi @ducdigital.
I don't fully understand your question. You could add properties to Jobs that you test in your code to decide to process or not.
Please expand if you need more help.
Hello, i am currently trying to archive something close to a message grouping definition.
Basically what I want to archive is something like this:
The graph might not be clear but I need to support some kind of a session, where each session is a FIFO queue by itself, running one after another only if the previous task is finish. worker can die anytime and another worker can pickup the job. And there will be a lot of queue like this across the system.
In example:
User A join the network, he pushed 2 jobs. User B join the network, he pushed 3 jobs.
But user A and B created those jobs at the same exact time. we have something like this on a queue:
A1 B1 A2 B2 B3
Let say I have 3 workers: W1, W2, W3
Here's the time serie how workers would pick up in the queue:
W1: A1 W2: B1 W3: null -> since A1 and B2 is still processing and W3 will not pickup anything else from the queue W1: A2 W2: B2 W1/W2: B3 -> in here which ever worker free first will pick up the next in queue.
The reason I need a filter is because I need the worker not to pickup the message, instead of pickup and ignore so the queue will be intact.
An example in my case would be each worker will be subscribing a stream of queue and filtering by unique ID for 10 - 20 people.
OK, that's good information @ducdigital.
I think rethinkdb-job-queue
may do the job for you with the right mix and number of Queues.
There is a query within the queue that will get the next job from the queue. Here is the query: https://github.com/grantcarthew/node-rethinkdb-job-queue/blob/master/src/queue-get-next-job.js#L14
It is based on an index which is ordering the jobs by priority, dateEnabled, and dateCreated. Here is the index definition: https://github.com/grantcarthew/node-rethinkdb-job-queue/blob/master/src/db-assert-index.js#L44
These two features mean that jobs will be processed in the order they are created if you don't change the priority. If you change the priority, the higher priority jobs will be processed first.
Problem is if W1 starts working on A1, W2 could start working on A2.
I can think of one solution however it is dependent on a feature that is being discussed in #47.
Rather than adding A1
and A2
as jobs to the rethinkdb-job-queue
processing queue, add UserA to the queue.
The job you add to the queue will have data listed in it about the number of jobs UserA has, what order they should be processed in, and the completed status of each job. Quite easy to do with an Array added to the queue job.
Here is the flow of work:
UserA
is added to the queue.UserA
job from the queue for processing.UserA
being A1
within the job data.A1
.A1
is complete.completed
to waiting
.dateCreated
value if need be to prevent it slotting in-front of other user jobs.UserA
job and the process continues untill all UserA
jobs are complete.After writing the steps above it occurred to me that this will not work due to step 7. As it is at the moment, the queue must finish a job process by updating the job as completed or failed. If you just save the job back to the queue, the process timeout function will update the job to a failed status.
So you could use rethinkdb-job-queue
if there where two new features added to the project:
Queue.process
processing function.I like the idea of being able to work on a job without having to fail or complete the job. I will add a new issue as a feature to add to the queue.
So, after all of that, at this point in time, no I don't think rethinkdb-job-queue
is a good tool for the job.
I have created #49. Will think on it.
Thanks for your reply @grantcarthew
Just for clarification, in this way, number of items in a Queue
is always = number of Users
?
For instance lets say a Publisher push something like the following to the queue:
A: [1, 2]
B: [1, 2]
A: [3, 4]
Isn't it that the problem remain the same as you said
Problem is if W1 starts working on A1, W2 could start working on A2.
Not at all. While UserA
job is being worked on it is set to an active
status and will no be picked up until after A1 has been completed. The queue will only find jobs to work on if they are at an waiting
or failed
status.
Hey @ducdigital. If you are following along, #49 is complete.
OK @ducdigital, both features are added to the project now. Have a look at the Change Log.
Let me know how you go.
Please close this issue if you are happy with how to proceed.
I'm going to close this @ducdigital, re-open it if you have more questions.
Hi,
Is there anyway possible to have a single queue and multiple worker filtering on jobs, choosing what to run and what not to run?
Thanks!