timgit / pg-boss

Queueing jobs in Postgres from Node.js like a boss
MIT License
1.95k stars 153 forks source link

Work assignment #306

Closed codingedgar closed 1 year ago

codingedgar commented 2 years ago

Hi, amazing lib!

I have a silly question, but I couldn’t find it in the docs or issues.

Can other assignment strategies be implemented?

I would like to divide a queue among a pool of workers with consistent hashing due to the need to have sequential consistency per user but I see workers just pull jobs, is there a way to specify which jobs to pull? I see I can use some wildcards for queues, does it mean it is intended to create multiple queues instead of distribute the load of a single queue to multiple workers?

timgit commented 2 years ago

thanks! You can specify the queue as a string in functions fetch() and work(). If I'm not understanding your question, please include more details.

codingedgar commented 2 years ago

fetch

Sorry maybe I was too vague, I mean, is there any way to implement consumer strategies for a group of workers? like consistent hashing or round robing based on the id of the job or something similar?

I read https://github.com/timgit/pg-boss/issues/290 but the use case is different, I need to distribute a queue's work among workers much like rabbitmq consistent hashing. In the sense that I want to parallelize the work of a queue but certain messages of the queue must not be processed concurrently, therefore only one worker of the group of workers should process a group of messages on the queue at any given time, logically "sharding" the queue, but at reading time.

codingedgar commented 2 years ago

A further example: if I have a queue with messages [A, B, A, B, C, D, E, F] and I have 3 workers, could I implement a work assignment strategy such that the worker 1 should always process messages of type A, B and worker 2 the ones of C, D, and worker 3 E, F?

Is not a priority queue and it is bounded to the number of workers, meaning if they grow to 6 then each process one type of message each, and so on.

timgit commented 2 years ago

Thanks for the examples. Can you simplify this by creating a separate queue per type? "A" would be a queue, for example, then you would run as many workers on "A" as desired. You can use the wildcard patterns if needed, but I would start with simple isolated queues first.

codingedgar commented 2 years ago

Thanks for the reply @timgit, I'll consider adding queues as an inexpensive operation, much like Event Sourcing then.

Yet I still have the doubt on how to distribute multiple queues among workers? If the queues are 1:1 to users, there will be new queues created constantly, how could I assign them to a user? is there a way to read the queues with messages to then apply a consistent-hashing algorithm to them and distribute them among a number of workers?

codingedgar commented 2 years ago

Maybe I'm talking in riddles, my concrete use case is the following:

Users need to perform some stats analysis that is split into multiple small analyses, each small analysis can run concurrently between users, but sequentially for any user. It does not matter if the responses are out of order, or even duplicated, the only thing I have to ensure is that not two analyses will run at the same time for the same user.

To solve this, a common pattern would be to have a number of workers to run the analyses concurrently, yet to assign the same messages to the same worker all the time, if the assignment is done with consistent-hashing, based on a field of the message (like the user-id) it would ensure not two workers are ever processing the same user message.

I'm sure there might be a good way to do it with pg-boss, but I'm not sure how, maybe consistent-hashing is not the answer and there's another way to respect the constraint with another method?

timgit commented 2 years ago

I'll read a bit more into how other queue products implement this for my own research and see what I learn in turns of overlap with pg-boss. While I'm doing that, as a thought exercise, could you port this implementation over to how it would work in a public cloud service such as AWS SQS? pg-boss has a similar architecture and this may give you much better search engine hits as well.

codingedgar commented 2 years ago

Thank you so much for looking into this, I think a way to view it on SQS would be message groups, it is quite a match.

The message group ID is the tag that specifies that a message belongs to a specific message group. Messages that belong to the same message group are always processed one by one, in a strict order relative to the message group (however, messages that belong to different message groups might be processed out of order).

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html

codingedgar commented 2 years ago

Also from https://aws.amazon.com/sqs/faqs/

Q: Do Amazon SQS FIFO queues support multiple consumers?

By design, Amazon SQS FIFO queues don't serve messages from the same message group to more than one consumer at a time. However, if your FIFO queue has multiple message groups, you can take advantage of parallel consumers, allowing Amazon SQS to serve messages from different message groups to different consumers.

timgit commented 2 years ago

From https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues-understanding-logic.html

You can't request to receive messages with a specific message group ID.

and

When receiving messages from a FIFO queue with multiple message group IDs, Amazon SQS first attempts to return as many messages with the same message group ID as possible. This allows other consumers to process messages with a different message group ID. When you receive a message with a message group ID, no more messages for the same message group ID are returned unless you delete the message or it becomes visible.

I agree, message groups actually sound like exactly what you're looking for. All other messages in the group become unavailable to all other workers if at least 1 message from the group is fetched.

I'm thinking in order to pull this type of thing off in pg-boss, we'd need to start tracking in-flight unique keys that should be excluded from job fetching while a job with that key is in active state. Once the job is completed, that key becomes available for fetching from any worker. This doesn't permanently assign a specific worker to a key like boss.fetch(queue-key) would, for example, since the key becomes available to any worker after completing the job. Is this how your system would work?

codingedgar commented 2 years ago

Sorry for the late response, somehow I wrote it on mobile and was never sent.

Yes, it is one way to comply with my constraint 👍

I thought if it was possible to do that in in a queue consumer acting as a router or something, but I fail to see how, because I cannot scan an pick messages at will, but maybe I'm wrong about that? If not, then it would need to be implemented in the library itself.