nats-io / nats-streaming-server

NATS Streaming System Server
https://nats.io
Apache License 2.0
2.51k stars 283 forks source link

Feature Request: Delayed Messages #324

Closed firrae closed 5 years ago

firrae commented 7 years ago

Hi there,

I originally opened this idea up on the main gnatsd repo but it was mentioned that it may be better suited as a part of NATS Streaming. I'll copy and paste my original request here:

Use Case:

A scheduled event that you want to put in the queue and have delivered on a specified delay from pub time.

Specifically, my use case is as follows: We have a series of events that users can trigger that have an option for a delay. We currently put the events into a queue with a trigger time and the workers for those events check the time and if it's not at or past that time yet it dumps it back in the queue. This causes an issue where events sometimes get stuck behind other longer events and eat up workers for the amount of time it takes to pull the queue item, check, and put it back. It also opens the potential for a loop that artificially inflates our usage statistics (this is something we could handle except:) and we can no longer effectively auto-scale our worker assignments.

Proposed Change:

I propose an option that is handled by the server where on message creation an additional option is passed by clients, defaulting to 0, that outlines a delay in milliseconds for how long to delay the message from creation to first attempted delivery.

Who Benefits From The Change(s)?

I believe a number of people could see a benefit from this, namely, people and groups that are developing distributed processing systems, and communication systems (email, SMS, phone, etc.).

Alternative Approaches

Like I mentioned in my use case above the application/worker could currently handle this just not as nicely as having it inside the core of the server. In my current use, I'm checking a time in the message structure on the worker and putting it back in the queue if the time hasn't passed yet. Alternatively, if your language of choice has the good time management, you could build a process into your application that internally checks if it's time is met and sends it to the queue then, though this seems to be the least efficient method I've come up with so far.

I since then mentioned that Amazon SQS has a feature that allows this in message timers and that the RabbitMQ had created a kinda elaborate workaround for RMQ to do the same idea.

Thanks for considering, and I'd love to see more discussion,

Steve

kozlovic commented 7 years ago

The issue I have with this is that the delay is on a per message/producer basis, which means that a channel's message log may have messages with different delayed values (set or not). From a subscriber's perspective, when starting to consume, I wonder if that makes sense to have messages that are in the log that are not consumed in sequence. Of course, redelivery causes messages to be received out-of-order, but on the first delivery, it seems strange.

Technically speaking, it may also be a challenge to "skip" message(s) and how the state would be recovered in case of restart. Not impossible, but this needs to be thought through.

From an client API perspective, how would you pass the delay value? Is it a parameter to the publish method, or option on the connect?

firrae commented 7 years ago

Sorry for the delay in response.

Personally, I was hoping to find a solution that allows me to pass the time to deliver, or the delay before delivering, when creating the message on the publish message similar to AWS SQS's Message Timers. I'm not 100% sure how they do it, and I realize they are a slightly bigger team with more money/infrastructure 😝, but I think they have an initial ingestion point where they store all delayed messages and then they scan that store every second to see if there is any to be sent. Personally speaking, I don't see why that scan frequency can't be user defined so they can make sure it suits them and their hardware, and in my case, I wouldn't need to worry much about the added load of scanning every second unless it was an insane performance hit (which it might be).

As I said, in my mockups and demos I've just allowed the message to be delivered and put back on the queue, and while that works right now, I'm worried about that eating up cycles on my workers. I'd prefer to scale up the box(s) holding my queue than creating more worker nodes.

Maybe I'm going about this the wrong way, but I think it's a solid idea if a feasible solution can be designed.

Ubloobok commented 7 years ago

I think, delayed message it's really usefull thing for the scenarios like as send sms/email, or any other notification at concrete time. Alternative way - regular job which selects notifications by planned time and sends them - seems like a crooked nail.

l3x commented 7 years ago

From a high level, NATS supports three messaging patterns, right?

Using the first two for delayed messages would be using the wrong hammer, which would result in crooked nails.

Seems like a small layer of use case logic above the pub/sub pattern infrastructure would be the appropriate place to handle message timers.

On Tue, Jul 25, 2017 at 2:55 PM, Nikita Danilov notifications@github.com wrote:

I think, delayed message it's really usefull thing for the scenarios like as send sms/email, or any other notification at concrete time. Alternative way - regular job which selects notifications by planned time and sends them - seems like a crooked nail.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/nats-io/nats-streaming-server/issues/324#issuecomment-317836955, or mute the thread https://github.com/notifications/unsubscribe-auth/AAA2hAhuCyu8JUDD03WVsn-63nHxDUghks5sRjougaJpZM4N5_dv .

aricart commented 7 years ago

I think the idea of having a messaging client that performs some action with some delay is extremely useful. I actually implemented a commercial version on a different technology many years ago. My contention is whether scheduling is a ‘principal’ functionality for a messaging server or a service that uses messaging.

IMHO, the messaging server really should only be preoccupied with storage and distribution of messages, not necessarily on applying additional business logic that triggers the ‘release’/‘reordering’ of the messages - a.k.a. a Scheduler.

With that said it would be very useful for a service to 'access' the data on the messaging service in different order, because it would allow a regular client to note things like the ID of a message and implement a custom ordering for the processing of a message.

I wonder if what is missing is the ability to request a particular message id from a channel on the server to perform its post processing. This operation wouldn't be associated with the streaming subscription it would simply be a request from the streaming server for a particular payload by ID.

ColinSullivan1 commented 7 years ago

Would functionality similar to the delay queues in Amazon SQS meet your use cases?

That would still guarantee source order delivery while providing a delay, although granularity would be at the subject, not per message.

firrae commented 7 years ago

@ColinSullivan1 that is a form that would work well with my use case. Our cloud version will likely utilize that, but our on prem version needs its own local queue service as it'll be running behind a clients firewall with little to no external access.

alexellis commented 6 years ago

I'm also interested in this and have implemented a dead-letter queue in Go as a separate service. Ideal solution would be part of NATS Steaming.

unisqu commented 6 years ago

@alexellis Can you share the code for dead-letter queue?

I would like to have priority queue and scheduled delay for nats.io too.

tpihl commented 6 years ago

Some thoughts;

dead-letter makes sense but is easy to create as part of the solution. Have a subscriber with manual ack subscribing on the msg in the same queue. It will check for Redelivered = true and Timestamp + [timeout] < [now>]. If found, it will publish an copy of the msg in the dead-letter subject and then ack the original msg. The msg cannot be dead-letter if it's not Redelivered :)

A delay may be trickier depending on what properties the delay should have. But any solution should probably write the msg into a delay-queue, together with information about what delay and what outbound subject to use (to make sure it's not lost) and then ack the original msg.

We could now have a subscriber to the delay-queue with manual ack and a reasonable ack-wait. If the delay have been passed, publish the msg on the outbound subject and then ack the msg.

If we want to remove the strain on constantly retrying to deliver the msg from natss, we could instead ack the delayed queue and manage the delay outside natss.

Often, at least for me, a delay is also used to batch things together, so I'm fairly sure that i would need some own logic anyhow to manage delays even if natss had delays.

just some thoughs

theromis commented 6 years ago

May I ask what the status of this feature request?

alexellis commented 6 years ago

Hi @unisqu please take a look at the code here:

https://github.com/alexellis/mailbox

It was working sufficiently well for https://twitter.com/colorisebot/ but I suspect it may need some refinement for production usage. Your mileage may vary.

Alex

ssoroka commented 5 years ago

This feature request might make sense as a separate service, eg: nats-streaming-scheduler.

Instead of sending the message to the nats-streaming server, you send it to the scheduler with a delivery time, and at a later time the scheduler connects to the streaming server and sends it on.

firrae commented 5 years ago

@ssoroka I don't mind this idea and it could be a good way to handle. Though I'm not sure where this request sits at this point. My project still would greatly benefit from this feature but we've since done some hacky stuff to make it work in the meantime.

unisqu commented 5 years ago

I'm looking for this function / feature for a long time. can anyone help with an example deployment that works so far? Is it still under development or is there are 3rd party plugin i can use now? i'm actually using rabbitmq for this section. really hope nats streaming have this feature now.

kozlovic commented 5 years ago

We have no plan to implement this feature at this time. Closing so that people don't assume it is in the works.

VadimZhiltsov commented 1 month ago

@kozlovic I apologize for necro-posting, but this is very-very important feature for some event based systems, especially which manage some inventories.

Just image you're building any ecomm project (lets say ticket sales system), so you may need to have a "hold a booking" functionality, but after 24 hours if there were not purchase of inventory, you just need to release if for other users.

In my "perfect world", for my event based system I would have hold event, where in handler I make inventory unavailable for others and post a holded event to jet stream with 24h availability delay. So I may have another consumer which listens to holded with 24h delay.

I really enjoyed this feature in Flink's Stateful Functions API Nowadays I don't know how to implement this with Nats only such functionality, - I would go for some cron jobs processing, but if we would have such realy great feature in Nats, I believe lots of hearts will love Nats project even more!

alexellis commented 1 month ago

@VadimZhiltsov you should be able to build this with NATS JetStream and one of the excellent SDKs, alternatively, if you're open to using faasd, it includes a cron-connector that runs invocations into functions on a scheduled basis.

VadimZhiltsov commented 1 month ago

@alexellis sorry, as I may miss something, but how can I make it with Nats JetStream only? I don't see any delay options in PubOpt

VadimZhiltsov commented 1 month ago

Alright, for those onces who came here through the google (this thread shows first)

The problem is solved in this feature request: https://github.com/nats-io/nats-server/issues/2846

But ensure you read it carefully, it requires from consumer to consume the message first, than make negative acknowledgment and specify the delay on how long would you like to wait until message is redelivered. NATS unlike many systems puts on consumer responsibility to decide when to consume the message.