onebeyond / rascal

A config driven wrapper for amqp.node supporting multi-host connections, automatic error recovery, redelivery flood protection, transparent encryption / decryption and channel pooling.
MIT License
451 stars 69 forks source link

FEATURE: Add NodeJS Streams support to subscriptions #235

Closed JesseDocken closed 6 months ago

JesseDocken commented 6 months ago

NodeJS has the concept of a Stream (i.e., Readable and Writable) which allows for processing a stream of data in an efficient manner, supporting transformations, multiplexing, and backpressure. This would be useful for easily writing queue subscriptions that, for example, read in messages, perform some transformations, then publish them to a separate queue, or write them to a database.

Detailed Description

I would propose offering a function on the broker called subscribeStream(key) that returns an object-mode Readable of an object in the form of:

{
  message: Message,
  content: string | Buffer | any,
  ackOrNak: AckOrNack
}

This can then be piped to downstream consumers for further processing and eventually piped to a terminator that calls the ackOrNak function successfully. The pipes would need to implement error handlers that call the ackOrNak() with the appropriate error and remediation strategy, but overall the usage would be identical to existing consumers of the subscribe() API.

Context

We're piping data from RabbitMQ into our ElasticSearch cluster after doing some post-processing to make it more indexable. The ElasticSearch client supports reading from a stream to do bulk inserts into the database, which would simplify a lot of our processing, and overall the architecture of the program would be more straightforward if we could model this as a series of data streams, from the subscription to the database.

I'm certain this would be beneficial to other users who similarly use APIs that support ingesting data from streams, since this is a major feature of NodeJS and pretty prevalent for backend servers, which this library generally targets.

Possible Implementation

I'm not familiar enough with the internals of Rascal to suggest one, unfortunately.

cressie176 commented 6 months ago

Hi @JesseDocken,

Thank you for the suggestion. I'm not sure rascal is a good fit for a stream based subscriber. While I haven't worked with streams extensively, I suspect it would be unusual to pass a function such as ackOrNack down the pipe as well as the data. Pausing the stream would involve cancelling the consumer, and resuming would require reconsuming too. Overall, it sounds quite complicated. I'll give it a bit more consideration though.

cressie176 commented 6 months ago

I've given this feature some more thought and still don't think that a streams based interface is a natural fit for rascal. Sorry @JesseDocken.