nats-io / stan.go

NATS Streaming System
https://nats.io
Apache License 2.0
706 stars 117 forks source link

Support message partitioning by Key #326

Closed quintans closed 4 years ago

quintans commented 4 years ago

Hi. I am repeating a request that I have done also in JsetStream in the hopes that there is some interest in picking this up here, since JetStream may not be available "soon".

I don't think this is the same as exclusive receivers. If I understand correctly, in exclusive receivers one consumer is selected to consume all the events.

What I am looking for is, considering two consumers A and B, that some messages go to consumer A and others go to consumer B. The rule to balance would be based on the hash of a key (eg: aggregate ID), provided when publishing. So, messages published with the same key hash would go to the same consumer. When a rebalancing happens due to the change of the number of consumers, another consumer might be chosen. In my case, my requirements are:

To guarantee the order messages for the same key should be handled by the same consumer. Looking it naively it seems a straight forward implementation. Using something like consistent hashing applied to a key, provided by when publishing, the NATS serve could easily route the message to corresponding consumer. I would say that the NATS server seems better positioned to this balancing than a client, because it already tracks the number of consumers. I would say that this is a very useful feature for event sourcing+CQRS.

Apache Pulsar has this same feature http://pulsar.apache.org/docs/en/concepts-messaging/#key_shared and I think it would be a killer feature for NATS, making NATS even better than Apache Kafka ;)

kozlovic commented 4 years ago

@quintans This has been asked in the past and rejected: https://github.com/nats-io/nats-streaming-server/issues/524

But a correction to what I commented on that issue, is that we would not really need headers per-se, but if PubMsg (the protobuf used by the client to publish message) has a new field "Key", that would do it.

messages published with the same key hash would go to the same consumer. When a rebalancing happens due to the change of the number of consumers, another consumer might be chosen.

Not sure what you mean here. You state as a requirement: "Messages with the same key from the same producer should be consumed in order. To guarantee the order messages for the same key should be handled by the same consumer."

I don't see how the server would pick a consumer without any identification, consistently, just based on the key hash. I could see a consumer provide an option saying "I want to consume only messages with key 1", but then, as we have argued in the other issues, then why not publish to foo.1, etc..

I think I understand the difference though, if all messages are on "foo" with a key that the server hashes based on the number of available consumers and distribute appropriately, is better than having to "pre" select the key/channel name. But what would happen with pending messages that are not yet acked and when a new consumer is added (or one removed) and it is time to redeliver? The hash will be different than the one that was computed during the original delivery.

We can discuss, but again, I don't think we plan to implement this in NATS Streaming.

quintans commented 4 years ago

I assumed that the NATS server tracked somehow the consumers using some kind of identifier (connection, uuid, ...). Since the consumer ID is not import, we could use a uuid every time the consumer is instantiated.

Thinking out loud here... If we consider consumers of a Queue group, and assuming that we know what messages in that consumer group were delivered, every time a new consumer is added or dropped we just need to get the smallest sequence processed and reset all the consumer to that sequence. Then we would reapply the balancing for each consumer, skipping the messages already delivered.

Example of using a mod over the hash strategy: Consumer / Sequence A = 1, 3, 5, 7, 9 B = 2, 4

new consumer C is added, all restart from 4 A = 4, 7, 10, 13, 15 B = 5, 8, 11, 14 C = 6, 9*

consumer B removed, all restart from 9 A = 9, 11, 13, 15, 17 C = 10, 12, 14

* skipped because they were already consumed

kozlovic commented 4 years ago

From the example above, I don't think you take into consideration that not all messages are always acknowledged and so in the case of a message assigned previously to a consumer but that message not being acknowledged, what would happen to it? It would possibly be reassigned to a different consumer? What does that say about a consumer processing messages with the same key? (that message was processed by a consumer and now reassigned to another due to the change of key hash).

There is a lot of state (that is replicated in the cluster) that would need to be changed when a consumer come/go. Which message is assigned to which consumer and which message has been ack'ed is persisted and all that would possibly need to be altered. I don't think that NATS Streaming will ever support that. It sounds like this is something that would have had to be designed in at the beginning of the project.

With that in mind, I am sorry but I am going to close this issue because I don't see this being implemented and want you to plan differently (either other option or product) instead of hoping for something that will never come, delaying your plans. Thank you.