tembo-io / pgmq

A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.
PostgreSQL License
2.65k stars 72 forks source link

Support for real-time notifications or event triggers when new tasks are added to the queue. #70

Open pradeepcdlm opened 1 year ago

pradeepcdlm commented 1 year ago

Dear Development Team,

I hope this message finds you well. I have some inquiries regarding the implementation of PGMQ for a database-centric task queue and asynchronous job processing. Could you please provide more information on the following aspects?

  1. Use Case Suitability: Are there specific use case scenarios in which PGMQ is particularly well-suited for task queue management and asynchronous task execution? Similarly, are there situations where PGMQ might be less suitable or where alternative solutions should be considered?

  2. Real-Time Notifications vs. Polling: I am curious to know if PGMQ supports real-time notifications or event triggers when new tasks are added to the queue. Alternatively, do I need to implement a polling mechanism to keep track of new tasks on my own? Understanding how PGMQ handles the notification aspect will influence our approach to integrating it into our system.

  3. Job Monitoring and Processing: In the context of managing jobs/tasks within the PGMQ queue, do we need to employ a scheduler or a daemon-like job monitoring tool? Could you clarify how jobs are processed once they are added to the queue? Any insights into best practices for efficiently monitoring and processing tasks would be greatly appreciated.

I'm look forward to your guidance on these matters.

ChuckHend commented 1 year ago

hello @pradeepcdlm! I'll do my best to answer questions. You can also join us in Slack if you have questions!

Use Case Suitability: Are there specific use case scenarios in which PGMQ is particularly well-suited for task queue management and asynchronous task execution? Similarly, are there situations where PGMQ might be less suitable or where alternative solutions should be considered?

PGMQ is great when you want each messages to go to a single consumer, particularly when you only want that message to be processed a single time without duplicate processing. PGMQ does support use cases where you want multiple consumers to read the same message (e.g. a consumer group, you'd duplicate your queues for each consumer), but it is extra work and overhead. It is great as a job/task queue where you have long-running jobs.

Real-Time Notifications vs. Polling: I am curious to know if PGMQ supports real-time notifications or event triggers when new tasks are added to the queue. Alternatively, do I need to implement a polling mechanism to keep track of new tasks on my own? Understanding how PGMQ handles the notification aspect will influence our approach to integrating it into our system.

Your application should poll PGMQ, similar to how an application polls SQS. We just released a long-poll feature, which will help you avoid having to poll quite as often when there are no messages in a queue. We'd love to implement a push notification system for messages, but it will be tricky to do while also maintaining exactly once guarantees. Help wanted!

Job Monitoring and Processing: In the context of managing jobs/tasks within the PGMQ queue, do we need to employ a scheduler or a daemon-like job monitoring tool? Could you clarify how jobs are processed once they are added to the queue? Any insights into best practices for efficiently monitoring and processing tasks would be greatly appreciated.

Messages will stay in the queue until you explicitly delete them, and generally should be deleted (or archived if you want to retain the log) by the consumer only after the consumer successfully processes. There is no need for a daemon or a background worker due to the mechanics around the visibility timeout. Your consumer should call pgmq_archive() or pgmq_delete() on the message after it has completed the ask, or pgmq_set_vt() if it needs more time to process than the VT it set on the initial pgmq_read(). If it crashes, the message will automatically become available again once the VT expires.

pradeepcdlm commented 1 year ago

Thanks @ChuckHend for your quick response,

Your application should poll PGMQ, similar to how an application polls SQS. We just released a long-poll feature, which will help you avoid having to poll quite as often when there are no messages in a queue.

I'm enthusiastic and interested in understanding the specifics of how the long-poll feature works.

ddorian commented 1 year ago

We'd love to implement a push notification system for messages, but it will be tricky to do while also maintaining exactly once guarantees.

We can easily add NOTIFY as part of the transaction. A worker can do a simple LISTEN that after a notification, just tries to FETCH, and resolve concurrency like it's done currently. Worst case, you have 10 workers listening, you publish 1 message, they all get notified and all try to fetch jobs, but only 1 gets the job.

The problem with making smart notification+routing is that each listener is a separate connection which is very heavy and there's no automatic failover (you will lose messages if you stop listening or on db failover).

But the simple case above should be easy.

pradeepcdlm commented 1 year ago

Thanks @ddorian for your quick response,

We can easily add NOTIFY as part of the transaction. A worker can do a simple LISTEN that after a notification, just tries to FETCH, and resolve concurrency like it's done currently. Worst case, you have 10 workers listening, you publish 1 message, they all get notified and all try to fetch jobs, but only 1 gets the job.

In this context, the worker can either take the form of a middle-tier application or be implemented as a stored function within PostgreSQL. I'm specifically interested in pursuing the latter option. Any guidance or assistance regarding the implementation of this approach would be greatly valued.

ddorian commented 1 year ago

I'm specifically interested in pursuing the latter option.

What's the difference from the perspective of the app between long-polling and doing listen-notify inside the db?

pradeepcdlm commented 1 year ago

We are always looking an SQL first approach, so our first preference is some logic that works inside the db.

v0idpwn commented 1 year ago

We can easily add NOTIFY as part of the transaction. A worker can do a simple LISTEN that after a notification, just tries to FETCH, and resolve concurrency like it's done currently. Worst case, you have 10 workers listening, you publish 1 message, they all get notified and all try to fetch jobs, but only 1 gets the job.

It's not this simple due to the current design of visibility timeout. There's no "action" that makes a message go from invisible to visible, its simply a matter of the timestamp you checked at, so there can be no trigger. It's doable, though.

ddorian commented 1 year ago

It's not this simple due to the current design of visibility timeout. There's no "action" that makes a message go from invisible to visible, its simply a matter of the timestamp you checked at, so there can be no trigger. It's doable, though.

You're correct.

v0idpwn commented 1 year ago

I think this feature would be very useful if we could "multiplex" multiple queues to be watched by a single subscription, then you'd have a way to avoid using connections unnecessarily. For example, one could monitor 100 queues with a single postgres connection, and have application-code to start workers to fetch messages when it receives notifications.

If not like this, it's not better than polling in any way and I don't see a valid reasoning.

But, again, there's the visibility timeout matter to be solved if this would be implemented.

dlight commented 1 year ago

Here's a way to workaround the visibility timeout issue: rather than moving from invisible to visible being a non-event, instead have a column that stores whether a message is temporarily hidden, and have a code similar to pg_cron to schedule a timer that simultaneously make a message visible again [^1] and re-send its notification. (I don't mean to literally depend on pg_cron being installed, but to use the same mechanism)

This would have a performance hit (which is perhaps very large), but I think this feature may be worth it. But if that is a problem, maybe add a configuration during the creation of the queue to control whether the queue has notification support or not.

An alternative would be to find a more lightweight mechanism for setting up timers. Maybe this could be done in-memory without touching the database. For example, since pgmq uses tokio, maybe tokio timers would be much faster. The problem is that if the database shut down, then all outstanding timers will be forgotten and any old unhandled messages will not be made visible again. This means that at startup, the extension needs to scan unhandled messages and make them visible. (This kind of logic would require a fair bit of testing to have assurance that no message can fall through the cracks)

If there is no visibility column (as in the footnote below), then this error case isn't even possible; messages will become visible again even if the timers are forgotten and there is no notification due to that (you could notify during startup, but there is no point in that, since probably there will be nobody listening anyway). So if the server shut down, notifications sometimes don't fire (and you give up exactly once delivery). Which is okay, since clients can't rely solely on notifications anyway.

And by this I mean, notify/listen can't be the sole mechanism to deliver messages. If no client was listening at the time it is fired, the notification will be lost. But even if a client is always listening, clients still need to poll to grab a message, to sort out race conditions when two clients are trying to grab messages. So listen/notify may have latency benefits but will not simplify client code or anything like that.

Another thing to consider is whether notifications will lead to a thundering herd problem, if many clients are waiting the same queue. (for some applications this is not a problem) A way to mitigate that is maybe to implement a round robin mechanism to deliver notifications, but maybe that would require the server to know upfront how many clients are listening, which seems complicated, and I can't find anyone that actually does this.

[^1]: Actually I think the visibility column is not strictly necessary; one can just schedule the notification without toggling any visibility value, if pgmq is okay with the race condition of someone grabbing a message while the notification timer is being processed (like this: timer fires, someone succeeds in grabbing the message before any message is sent, NOTIFY is sent anyway, but nobody that is LISTENing can receive the message), leading to spurious notifications. Those spurious notifications means that anybody receiving them will attempt to grab a message and fail, but that's probably okay, because they would fail if some other client received the notification first too, so failure after a notification should be a common occurrence.

v0idpwn commented 11 months ago

Hi, @dlight, glad to see you here and thanks for the input :)

I'm very curious about the performance characteristics of both approaches. I have the feeling that the BGW approach may be a bit laggy, so it wouldn't provide a benefit latency-wise in comparison to read_with_poll, but it still has the important bright side of reducing the potential impact of multiple concurrent pollers in slow queues.

An important note is that we probably wouldn't be able to create a postgres subscription on every read. Instead, we would have to create a long running subscription and write some rust code to multiplex the notifications for waiting subscribers. Luckily, this would also make some of the problems you pointed significantly easier to solve.

The extension-side timer suggestion is interesting. I think it has potential to decrease database resources spent on spurious reads + improve latency when using read_with_poll. Also, it would give us the foundations for a "push mode" if we wanted to.

Are you willing to try to implement? Maybe me or @ChuckHend can try at some point if not. We would also need to put some more work into simulating different real world workloads and extracting metrics.