dataddo / pgq

Go queue implementation using postgres.
https://dataddo.com
Apache License 2.0
99 stars 6 forks source link

optional message delivery delay #16

Closed mmalcek closed 1 month ago

mmalcek commented 7 months ago

In some cases you may need delay message before it become available. In my usecase I need periodicaly ask external service if state has changed in approximately 60 second intervals until I get the result (error or success).

This is similar approach to "Azure service bus - Scheduled delivery" https://learn.microsoft.com/en-us/azure/service-bus-messaging/advanced-features-overview

I've used "Delay" instead "Schedule" because this way I can use SQL server time. SQL and App can be on different servers with slightly different time (seconds, sometimes minutes) in case of issues with server timeSync and message queue is pretty time sensitive thing so I would say "same timesource policy" should be applied. Schedule to exact time is still possible by calculating "seconds to" ( t2.Sub(t1) ) on App server

BTW: I'm from Prague if you want to discuss :)

kedlas commented 7 months ago

Hi, if I understand it correctly, you want to specify the concrete time in the future which is the the time the time the message can be consumed. E.g. now (UTC 18:23) you publish message with some parameter that this message can be consumed let's say 5am next morning, right?

In this case without any changes to the codebase you can insert the row to the queue table with setting the locked_until field.

mmalcek commented 7 months ago

Hi, to explain my use case: I'm sending some data to external service then I should wait and periodicaly ask the service if data has been pocessed. Once I get possitive or negative final response I should decide the next step Like: msg1: sendData(delay 0) -> receive processingID msg2: checkStatus("processingID",delay 60) -> receive inProgress msg3: checkStatus("processingID",delay 60) -> receive inProgress msg4: checkStatus("processingID",delay 60) -> receive DONE-success msg5: getProcessedData("processingID",delay 0) -> success/taskClosed (But there are many other usecases where message scheduler can be useful)

Yes it would be possible use "locked_until" but I cannot use it in "publisher.Publish()" so it would require write new Insert function outside this package. Also it's kinda abuse of this value

I see my proposal as more elegant (but of course it's proposal)

msg := &pgq.MessageOutgoing{
  Payload: json.RawMessage(`{"foo":"bar"}`),
  Delay:   20,
}

Note: this kind of scheduled messages are natively available in Azure and AWS https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-timers.html https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-sequencing#scheduled-messages but not (natively) available in Apache Kafka nor in RabbitMQ in NATS it's possible to use "NakWithDelay()" but it's also abusing the function (and obviously first visibility of message can never be dalayed) https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#delivery-reliability

I think this can be a nice enhancement for PGQ, if you check the code it pretty fits ;)

prochac commented 7 months ago

Firstly, I wanted to propose that it could be done by a one feature we want to implement soon. And that's "some" way how to extend the consumer's query, and being able to filter by metadata. Postgres also does support indexing for JSONB types. And, at least in our case, the queue is mostly empty (in a sense of unprocessed jobs), so the processed IS NULL index does the heavy lifting.

Anyway, the NackWithDelay (maybe NackAndDelay would be a better name WDYT 😅 it won't be nacked after waiting) sounds like a handy feature 🤔 worth of being "native" param of the queue.

mmalcek commented 7 months ago

So you are planning something like:

consumer, err := NewConsumer(db, queueName, handler,
        WithMetaFilter("key", "value", "eq"), // key, value, comparator
)

That's sounds pretty neat! Can be very useful

however I see two possible issues for "delayed messages"

  1. I should pass "current time" so maybe value by pointer, but it means I have to update time somewhere (that's of course not terrible deal)
    
    var queryTime int
    for {
    queryTime = time.Now().Unix()
    time.Sleep(1*time.Second())
    }

...

consumer, err := NewConsumer(db, queueName, handler, WithMetaFilter("key", &queryTime, "lt"), // key, value, comparator )


And you must be avare that value is changing so you cannot build static query 
which can be an issue (not sure I did not check the code)

2. In case of "delayed messages" it's expectable (by it's nature)  that the 
queue will be much longer as there might be many "invisble" messages. 
So search through JSONBs can be a bottleneck even if they are indexed. 
But that of course will require testing and comparision - can be quite interesting to findout :blush:

About the NATS and naming - yeah that's kinda inconsistent decision from NATS developers.
They have ```Ack(), AckSync()``` then ```Nak(), NakWithDelay()``` .
Here is the [Documentation](https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#delivery-reliability)
 but no idea why they decided this way
prochac commented 7 months ago

The first version of the metadata filter could be like this, but we also want to provide a flexibility that leverages the fact we use Postgres, and we have "unlimited" options. The problem is API. I guess some direct SQL access shouldn't be a problem, since it always will be a Postgres SQL dialect. The other question is how to propagate args for perpared statement etc...

I had some WIP version using @> JSONB operator. Yet, I removed it before publishing pgq, because it wasn't tested and used by us in prod. Same as the idea of JSONB indexes, it's just something we are aware of, but have no real experience AFAIK.

mmalcek commented 7 months ago

"propagate args for perpared statement" for this I would highly recommend deeply look at SQLX package here is a nice guide It's an DB interface that works nicely with various DBs (I'm using it with Postgres, MSSQL and sqlite) it allows you to use "namedStatements" where you can use struct tags e.g. name string `db:"name" But especially for your case "PrepareNamed" - here is a piece from some of my code

paramsMap := make(map[string]interface{})
for _, param := range queryParams {
    paramsMap[param] = c.Query(param)
}
prepQuery, err := db.PrepareNamed(queryData)
if err != nil {
    return err
}
rows, err := prepQuery.QueryxContext(c, paramsMap)
if err != nil {
    return err
}
allResponses := make([]map[string]interface{}, 0)
for rows.Next() {
    results := make(map[string]interface{})
    err = rows.MapScan(results)
    if err != nil {
        return err
    }
    allResponses = append(allResponses, results)
}

It would be nice if you can publish it in test branch. I'll be happy to try it out and give you some feedback

Currently the project where I need "PGQ" not yet started but It's likely that it will go ahead and I'm now choosing the right tools.

PGQ with my "Delay" proposal is currently the best option I found for onPremise Delayed messages. But I like the "metaFilter" idea

kedlas commented 7 months ago

Hi @mmalcek , we discussed the proposed delay internally and it seems to be very handy even for us in some use-cases we have. We want for sure incorporate this feature to pgq. We want to make it in a clean way without any side effects. Please be patient with us :)

prochac commented 1 month ago

Closing in a favor of https://github.com/dataddo/pgq/pull/21

You were right that we will need this feature :D @ddo-radim and @petans24 prepared some off-main-branch PoC we tested internally for some time.

AFAIK, the only and main difference is how the delay is set:

msg := pgq.MessageOutgoing{
    Delay: int(5 * time.Minute),
}

vs.

msg := pgq.MessageOutgoing{
    ScheduledFor: time.Now().Add(5 * time.Minute),
}
// alt
msg := pgq.MessageOutgoing{
    Payload: json.RawMessage(`{"message": "Happy birthday"}`),
    ScheduledFor: time.Date(/* someone's birthday */),
}

We prefer the latter, where you have a clear idea what time is being set, and you can also pick a static date.

mmalcek commented 1 month ago

Great, just one more note: Could you also update main README.md? Not just for me - I've added proposal to PGQ because I found that there is no easy way to set delayed messages in Kafka or Rabbit (it's possible in NATS but little tricky). I think I'm not the only one with similar issue and make DELAY visible could increase PGQ popularity ;)