nats-io / nats.ts

TypeScript Node.js client for NATS, the cloud native messaging system
https://www.nats.io
Apache License 2.0
178 stars 13 forks source link

Async function for queue subscribe #39

Closed haywirez closed 5 years ago

haywirez commented 5 years ago

Is there a way to use async functions in subscribers? I'm looking for a way to do this:

let sub3 = await nc.subscribe('foo.baz.>', async (err, msg) => {
     // await async task to finish before taking next item in the queue...
}, {queue: 'A'});
aricart commented 5 years ago

@haywirez not quite sure what you are trying to accomplish.

As you can tell, you can do an async function for the handler, but because the API for the handler doesn't specify a return value the implicit promise returned by it is ignored, so from the perspective of the NATS client, the processing of the message is complete when the promise returns regardless of its resolve/rejected status.

On other languages, some clients have the notion a sync subscriptions where you request the next message for the subscription at your own pace, the subscription simply gets the messages added to a queue which get dequeued at the client's request. Typically the need for a sync pattern is for scatter-gather type of requests, where you create a subscription which will handle multiple replies and you want to gather all replies before completing.

Could clarify your use-case, I could do further investigation on what we could do.

haywirez commented 5 years ago

@aricart Thanks for your response! To clarify, my specific use case is a media transcoder worker module that deals with a lot of network I/O. What I'd want is to accept a message and start the process of downloading a file, transcoding it to a large numbers of files, uploading all those (with retries, if needed) and then even checking if uploads to the storage backend were successful and are publicly available. The module should not accept any other messages until this long process that contains a lot of async tasks is done, which could realistically take up to 1-2 minutes. In theory, these modules are self-contained and ready to scale (could spin up multiple transcoder worker processes automatically if the NATS queue gets long), but each of them should take 1 message at a time and hold until the process is complete.

aricart commented 5 years ago

@haywirez thanks for the clarifications, you are effectively describing a work queue. Implicitly it seems that you require in-order processing. The payload I take it is very small. Really what you are looking for is store-and-forward, but have the client be in control of the 'next' message.

Just to make sure we are on the same page, a NATS subscription doesn't hold any sort of queue in the server. So while you want to only 'take' the next message, all messages for the subscription are already in the client's message buffer. If the client were to fail, the messages would be lost.

You could take a look at node-nats-streaming - that will get you somewhat close. You would have to configure the streaming server a bit differently from what a normal configuration is:

Now your client can process the message, and when done processing ack it (if not acknoledged it will receive the message again, or if in a queue group, a different client could be assigned the message).

This will get you much closer to what you are trying to do, but there may be additional gotchas. @kozlovic can probably inject additional wisdom.

aricart commented 5 years ago

Here's an example of what that would look like:

https://gist.github.com/aricart/0e0641468694ac451592765feea84835

haywirez commented 5 years ago

@aricart Thanks so much, that is extremely helpful!!!