tembo-io / pgmq

A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.
PostgreSQL License
2.23k stars 42 forks source link

Enhancements for PGMQueue Python Library #216

Open tavallaie opened 1 month ago

tavallaie commented 1 month ago

I am considering adding new functionalities to the PGMQueue library to enhance its capabilities. Below is a list of potential functions. I am seeking input from other developers on which functions should be prioritized, and any additional suggestions.

Potential New Functions

How You Can Help

Please provide feedback on the following:

  1. Which functions do you think are most critical to add?
  2. Are there any additional functions you believe would be beneficial?
  3. Any other suggestions or comments?
ChuckHend commented 1 month ago

Great ideas @tavallaie . Here's some thoughts:

For all of these features that already have a corresponding SQL function, I think it makes a lot of sense to just implement the function in the python lib. pgmq.metrics(), pgmq.metrics_all() have queue length, and some other stats about the queue. pgmq.delete() and pgmq.archive() both support batch, but this is not implemented in the python lib yet.

I like the requeue idea, and we can probably implement that using pgmq.set_vt(). Also update existing message would be useful. A separate feature people have asked for is something like pgmq.delete_where() that would delete all messages in a queue that have messages containing some value.

Pause and resume would be nice features too. Are you thinking this would prevent new messages from being able to reach the queue, or just prevent existing messages from being consumed?

tavallaie commented 1 month ago

I'm sorry for being late :) I was occupied with PR #222. I'll add the metrics soon, and then we can decide what's next and evaluate its profitability.

markbalazon commented 1 month ago

I think the above are great ideas as well, I love the requeue/set_vt() idea and would benefit from a corresponding python implementation as well!

tavallaie commented 1 month ago

I am working on features, but lately I was busy with my daily job. So I don't have enough time to implement all. any contributions more than welcome.

tavallaie commented 3 weeks ago

Pause and resume would be nice features too. Are you thinking this would prevent new messages from being able to reach the queue, or just prevent existing messages from being consumed?

I think having both options would be very useful:

  1. Pausing new messages from entering the queue.
  2. Pausing the consumption of existing messages.

This will be very helpful for machine learning or other heavy tasks.

By stopping new messages from entering the queue, we can avoid overloading the system. For example, during system maintenance or upgrades, we can pause new messages to ensure no new data interferes with the process.

By pausing the consumption of existing messages, we can manage the processing load better. This is useful when we need to allocate resources to more urgent tasks or when performing intensive computations. These features will help keep the system stable and efficient.

v0idpwn commented 3 weeks ago

I think both concerns are probably of the application using the library, not of the library itself.

ChuckHend commented 3 weeks ago

probably of the application using the library, not of the library itself.

I think I agree. But, @tavallaie could you explain a bit more about the conditions or scenario where something sets the queue as "paused" for both concerns? In the case of consumers that are executing heavy tasks and are getting overloaded, what I typically see designs where the workers are told to stop consuming, rather than the queue told to stop accepting / distributing messages. A very common use case of a queue is to buffer the messages from the workers, so that the workers can decide when to pull in new work rather than having the works force fed the work, regardless of their ability to take on new work.

Although as an admin functionality, I could see having the ability to set the visibility of all existing messages at once, like a pgmq.set_vt_all() or something.

By stopping new messages from entering the queue, we can avoid overloading the system. For example, during system maintenance or upgrades, we can pause new messages to ensure no new data interferes with the process.

@tavallaie , in this scenario, what are you thinking the producers of the messages would need to do during "paused" periods? Would they receive errors when publishing? It seems like to producers, a "paused" queue would be no different than the database being down?

tavallaie commented 3 weeks ago

@ChuckHend, I will test different scenarios to understand these features better. This will show us if they are useful and if they should be in the SDK or done by the user.

Maybe it is better to have a separate issue for this discussion so other things solved in this issue can be closed.