nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
16.07k stars 1.42k forks source link

Allow MaxAckPending per subject #4273

Open ajlane opened 1 year ago

ajlane commented 1 year ago

Feature Request

Add MaxAckPendingPerSubject to consumers.

The purpose would be to define the maximum number of unacknowledged messages that can be outstanding with the same subject and suspend delivery for any messages with the same subject.

Use Case:

Using subjects as a queue for a dynamic number of state machines/actors that process messages in-order.

See Slack https://natsio.slack.com/archives/C069GSYFP/p1687911873115719

Proposed Change:

New configuration, server-side accounting for the number of in-flight messages per subject, more flow control logic, testing, and documentation.

Who Benefits From The Change(s)?

Users trying to avoid setting a fixed number of consumers.

Alternative Approaches

Set MaxAckPending, and use Deterministic Subject token Partitioning to distribute messages to a fixed number of consumers.

tpihl commented 1 year ago

This is an interesting suggestion. However, I wonder if an alternative solution would be a consumer with explicit ack but behind the scene apply the OrderedConsumer way to secure correct order. In that case it would be easy to fan-out/queue msgs to parallel workers based on subject partitioning or multifilter consumers with a RePublish-listener to discover new subjects not covered by any multifilter consumer

It would of course require some magic behind the scenes to move part of the AckWait timeout logic to the client to allow buffering up msgs before the AckWait-timer should start (when the msg is made available to the code calling the client. If that magic is possible to create in a reliable/resilient manner it would, imho, bring the best of both worlds.

Any thoughts appreciated

ajlane commented 1 year ago

I'm not particularly concerned about partitioning by subject. It would be fine for me if two subsequent messages with the same subject went to different subscribers as long as it isn't concurrent.

tpihl commented 1 year ago

I see. I'm mostly doing event-streaming solutions and in those cases the order is a must but also that the same worker is handling the same "entity" so I don't need to use some external multi-process-shared db to access the data.

Hope you find a good solution

bliednov commented 1 year ago

This would be very crucial for the event-sourcing type of projects. In the case of an IoT project, there can be millions of devices that produce events onto device.<id>. The events from each device must be handled in an ordered fashion one by one. At the moment, it is challenging to achieve using NATS. One way is to create an ordered push consumer per device to some queue group subject. However, it increases complexity with a very high number of push consumers. Another way is to have one pull consumer and embed an event chain mechanism into the message. So, that event has an ID of a previous event. But it increases the size of the message and the latency because consumer service instances will have to do more work and potentially nack the messages that are out of order to handle them later. Of course, there are other possibilities to mitigate it, but actually, a MaxAckPendingPerSubject would solve the issue in a very natural way.

derekcollison commented 1 year ago

For very large scale of observers or filtered state, like devices, we do allow mixing and matching of JetStream and NATS core. You can configure a stream to republish the any message once sequenced. It will have headers on the message about its sequence and timestamp and source (the stream), but also the last known sequence for that subject. This allows a downstream plain subscriber to detect any loss and acquire the missing messages.

We will continue to push on this a bit since we have lots of folks asking for a very large number of stream or KV observers.

jlarfors commented 1 year ago

I found myself looking for this exact functionality also. My use case is using a control-loop/reconciler pattern over NATS KV. Each key is a unique object in the KV store. I want to prevent the reconciler being called concurrently for any one object, and MaxAckPendingPerSubject would be just perfect.

Is there some sane alternative to achieve this at the moment, or is this feature being considered for implementation?

brigitops commented 4 days ago

If I understand it correctly, this feature has enormous potential for many use cases where subject partitioning would traditionally be used. To the point of utterly crushing Kafka partitions as a feature.

Take the subject mapping from the NATS docs:

nats server mapping "neworders.*" "neworders.{{wildcard(1)}}.{{partition(3,1)}}" neworders.customerid1
> neworders.customerid1.0

Right now you have a consumer for each partition, filtering on the subject. If you need strict ordering, then MaxAckPending=1 is required at the consumer level. This is going to limit the throughput of each partition and you can't really do anything in parallel from inside the consumer subscription. There is a lot of downtime for each stage of message handling -- pulling, unmarshalling, processing, acking -- you can't achieve much of a pipeline. And, with a small number of partitions, problems can really hold up a lot of messages. The only answer right now, that I am aware of, is to increase the number of partitions and consumers significantly. That gets you into the difficult game of guessing how many partitions you'll need, since its hard to change afterwards.

With MaxAckPendingPerSubject, the game changes. Pull a batch of messages. Since the full partition key is already in the subject (customerid1), you know that every message you pull can be processed in parallel. Just hand the messages off to a pool of workers. They're guaranteed to be separate subjects, so you don't even need to pick the same worker from the pool for a given subject each time. That's really easy for the application programmer to do, and has the potential for a huge boost to performance.

And, for most use cases, you can drop subject partitions altogether, unless you need to partition into separate streams for very high throughput. Since each subject is guaranteed to be delivered one at a time, you can achieve deterministic processing by simply sharing a consumer and subscribing from multiple application instances. Each subject has only one message being delivered to one bound subscriber at a time. You can just drop the subject mapping. Basically replace Kafka-style "how many partitions" guessing game and just scale as demand changes, by adding more subscribing application instances to the consumer. It's simpler and scales better.

Imagine that in a Knative channel or broker, what performance could be offered along with order guarantees.

Did I miss anything? It seems... too good to be true. If this feature is feasible and I understood it right, I think it would be absolutely awesome.