Open stepin opened 3 years ago
We've discussed this one a bunch, I think we will need a solution to this but do not have an elegant one today - almost there but we have no way to rebalance consumers.
One for the back burner for future releases I think
Ok, I will close this as duplicate.
could this be reopened?
https://github.com/nats-io/jetstream/issues/103 and the repo itself were archived because jetstream is now in GA and part of the nats server, which leads back to here again?
We have been discussing this one quite a bit. No resolutions yet but it's definitely on our radar and part of active discussions.
Hi, I have an idea. What will say community?
We can use distributed atomic lock system (for example ZooKeeper or ETCD).
key
inside each payload (e.g. order_id
) and tries to set lock in zookeeper say in /orders/{order_id}
.msg.Nak()
(not acknowledged) because that means a message with same key
already handling by another consumer.This logic can be implemented also inside the nats-server
. But I don't know how this affects on performance.
JetStream already does this. When you pull 20 messages you have a lock on that message for AckWait time.
Is it that you want to arrange it so that one consumer always handle all messages related to a given order ID? If that’s what you are asking then we are looking at adding something to the server itself. Currently discussed here https://github.com/nats-io/nats-architecture-and-design/pull/36
@ripienaar not really, when we have GroupSubscription and 3 parallel consumers we can receive different events simultaneously which in microservice arch should be handled one by one. Example:
So what the nats-server
offers to handle this situation?
There are two ways as I see to resolve this:
Yes, this is the feature we are working on fleshing out in the link given https://github.com/nats-io/nats-architecture-and-design/pull/36
Any news on the feature design? I am also looking for this capability.
We have some updates in 2.8 for folks that are more aligned to the way other tech does partitioning. @jnmoyne might be able to give some insight.
We are still working on a the solution though described above.
Yes, the new feature introduced in 2.8 is called deterministic subject token partitioning as an extension to the existing subject mapping functionality (which can be applied at the account level and during import/exports).
Deterministic token partitioning allows you to use subject based addressing to deterministically divide (partition) a flow of messages where one or more of the subject tokens make up the key upon which the partitioning will be based, into a number of smaller message flows.
For example: new customer orders are published on neworders.<customer id>
, you can partition those messages over 3 partition numbers (buckets), using the partition(number of partitions, wildcard token positions...)
function which returns a partition number (between 0 and number of partitions-1) by using the following mapping "neworders.*" : "neworders.{{wildcard(1)}}.{{partition(3,1)}}"
.
This particular mapping means that any message published on neworders.<customer id>
will be mapped to subject.<customer id>.<a partition number 0, 1, or 2>
. i.e.:
Published on | Mapped to |
---|---|
neworders.customerid1 | neworders.customerid1.0 |
neworders.customerid2 | neworders.customerid1.2 |
neworders.customerid3 | neworders.customerid3.1 |
neworders.customerid4 | neworders.customerid4.2 |
neworders.customerid5 | neworders.customerid5.1 |
neworders.customerid6 | neworders.customerid6.0 |
The mapping is deterministic because (as long as the number of partitions is 3) 'customerid1' will always map to the same partition number. The mapping is hash based, it's distribution is random but tending towards 'perfectly balanced' distribution (i.e. the more keys you map the more the number of keys for each partition will tend to converge to the same number).
You can partition on more than one subject wildcard token at a time, e.g.: {{partition(10,1,2)}}
distributes the union of token wildcards 1 and 2 over 10 partitions.
Published on | Mapped to |
---|---|
foo.1.a | foo.1.a.1 |
foo.1.b | foo.1.b.0 |
foo.2.b | foo.2.b.9 |
foo.2.a | foo.2.a.2 |
What this deterministic partition mapping enables is the distribution of the messages that are subscribed to using a single subscriber (on neworders.*
) into three separate subscribers (respectively on neworders.*.0
, neworders.*.1
and neworders.*.2
) that can operate in parallel.
@derekcollison @jnmoyne this is extremely good and probably one of the biggest features that people migrating from kafka may be looking for. Thanks so much for this
However, I just wanted to say that I'm a bit puzzled by not being able to find anything about this in the docs at nats.io. It would be a shame if this feature went unnoticed
Thanks. I also believe that this was one of the few things some people coming from Kafka to NATS have been wanting that we didn't have until now.
The feature is absolutely brand new (2.8.0 was literally released last week) and the doc update will happen very soon (what I wrote here is actually a preview of what the doc will say about that feature (comments welcomed:))
@cortopy that was all @jnmoyne ! We will get docs updated as soon as we can.
Yeah I have a large doc PR in the works
FYI docs have been updated now. I think it may be ok to close this issue now?
Thank you for the updates. Sorry for disappearing after asking a question 😅
While the partitioning is nice, and the docs look good, Ideally, I would want to be able to horizontally scale the consumers for a stream and still get in-order delivery by some key. With partitioning, I would be restricted to run a single consumer per partition to get the ordering I need.
In the ADR linked above, I also see this:
we might require a header to be set for example
NATS-Consumer-ID: 1
for sensor 1
Which I will be tricky. It requires the publisher to be aware of the topology of the consumers, but that topology should ideally be dynamic.
I'm wondering if something like message de-duplication could be repurposed for this? If the publisher adds something like a NATS-Convoy-ID: 1
header (note that the ID is for the sensor, not the consumer), and a message is considered duplicate if there are in-flight messages with the same convoy id.
At least for my use-case, it would be acceptable to break the ordering guarantee if a message has not been ACK'ed, NACK'ed or marked In-progress for a few minutes.
@jnmoyne can you provide the link to documentation? I think it would be good to have direct link from this issue. Thank you.
@timsolov Of course, here is the link: https://docs.nats.io/nats-concepts/subject_mapping#deterministic-subject-token-partitioning
@rbkrabbe Partitioning does indeed allow you to scale processing while still getting in-order delivery per key, you just need to use more partition to scale further.
You can have guaranteed strict in-order delivery per partition (even if you happen to have more than one subscriber per partition consumer) simply by setting "Max acks pending" to 1 on the consumer for the partition, no need to use any headers for that (the key the partitioning happens on is one (or more) token(s) in the subject name) or to try to re-use message de-duplication.
Hi, thank for this feature, but I have a question, in kafka we have migration for partition if our instance of counsumer gone down, but in this case are there any solutions to this problem?
@jnmoyne That would cause head-of-line blocking on the partition right? It would effectively mean that only 1 consumer in the group would be active at a time while the others idled.
How easy is it to change the deterministic partitioning to add more partitions and is there a limit to how many partitions it is practical to have?
I have 2 use-cases in mind for this:
Note on the terminology in use when talking about JS: 'streams' are what record messages, and you create 'consumers' on streams, those consumers are a bit like a DataBase 'view' in the sense that they let applications receive all or a subset of the messages in a stream, consumers have some internal state (which is kept and maintained by the JS servers) which consists of sequence number counters and tables which are updated as the messages are sent by the consumer and acknowledged by the subscribers. The actual applications that are going to process the messages 'subscribe' to a consumer. So those applications although they can be seen as 'consuming messages' are not consumers, but subscribers to consumers. There are no 'consumer groups' like you have on Kafka and no 'rebalancing', rather you have subscribers to consumers and typically you would rely on your container orchestration system of choice to start/stop/monitor and restart your applications such that you have whatever number of subscriber per consumer (partition) you want, each subscriber being passed the consumer name (i.e. partition number) that it should subscribe to as part of it's runtime information (e.g. command line argument or environment variable, which is set by the container orchestration system you are using).
If you want a guaranteed strictly ordered processing then you indeed need to set the 'max acks pending' value for the partition's consumer to 1, meaning that only one of the subscribers to that consumer gets a message to process at a time. If you are using partitioning for some other reason than strictly ordered processing, then you can increase the number of max acks pending to more than 1 and have more than one subscriber to the consumer receiving a message at the same time.
Even with max acks pending set to 1, you can have more than one subscriber at a time on the consumer (but only one of them would get a message to process at a time) which means that if one of those subscribers were to die, processing would still continue as long as you have at least one subscriber to that consumer.
The partition mapping (e.g. the number of partitions) can be changed at any time, to increase the number of partitions you would: first create streams/consumers for the new partitions you are going to add, then change the mapping to increase the number of partitions, then start workers to subscribe to those new partitions (if you don't need strict ordering, you can even start them before changing the mapping). To decrease the number of partitions you would: first change the mapping to decrease the number of partitions, then monitor the consumers for the partitions you removed and once those partitions have delivered all the messages they may still have buffered to their subscribers you can stop them and remove the streams/consumers for those partitions you removed.
@jnmoyne I understand your point of view, but if I have 3 instance I want to balance my work and have 1 consumer per 1 instance of my service, but in your case I can get 3 working consumers on 1 instance, and 2 instance will rest
You control (for example using your container orchestration system of choice) how many instances of your worker you want to have per partition. So you can have 3 partitions and one subscriber/worker per partition and they will all be doing work.
There is a disconnect between the term "consumer" as it is used in Kafka (coming from the "consumer group" term) and the meaning of that term in JetStream. To simplify: a "consumer" in JetStream is like a "partition" in Kafka and a "subscriber" in JetStream is like a "consumer" in Kafka.
If I have one subscriber/worker per partition and my instance go down, and there is no more free resources in my cluster, work will stop. Moreover for example in k8s we need StatefulSet for instance numbering and when the instance crashes - k8s will not start another. In kafka this case will work
There is no concept of "consumer groups" in JetStream where that functionality happens at the administrative/ops level and can be easily implemented using container orchestration. And I would argue that it is the right model rather than having something in the streaming service itself because for example your container orchestration system can start or restart new instances (containers) of your workers automatically for if they die while the consumer group feature of Kafka will never be able to start new worker containers automatically.
That's true, but in real life there are many different situations (some about resources i said) and changing consumers on the fly is good feature. We will try to make this like a framework with nats cluster api, thanks for your support
Just wanted to chime in and say you can still have multiple subscribers (workers) deployed for a given consumer (for a given partition) and maintain ordering. You need to set MaxAckPending
to 1 which means there will only be a single message in flight across all workers for that partition. The benefit is that if one of the workers goes offline or you need to do a rolling deployment, you will always have one worker up.. so its like HA for your workers per partition while still maintaining order.
The partition mapping (e.g. the number of partitions) can be changed at any time, to increase the number of partitions you would: first create streams/consumers for the new partitions you are going to add, then change the mapping to increase the number of partitions, then start workers to subscribe to those new partitions (if you don't need strict ordering, you can even start them before changing the mapping). To decrease the number of partitions you would: first change the mapping to decrease the number of partitions, then monitor the consumers for the partitions you removed and once those partitions have delivered all the messages they may still have buffered to their subscribers you can stop them and remove the streams/consumers for those partitions you removed.
Hey @jnmoyne I'm trying to implement this process you described above for an auto-scaling partition proof-of-concept I'm working on. But, as far as I know this all needs to be done via CLI right? I can't seem to find any methods on the go client library for modifying the mappings in a NATS cluster. I do see that the CLI uses the server
package. Can I do this myself if I want to program something or do you recommend sticking to the CLI? Or alternatively, is there a chance mappings can be implemented in the client library?
Cheers!
So in this case here you are trying to scale the consumption of the messages from a stream and not trying to scale the storing of messages into a stream, so this means you use one stream and then leverage subject transformation within the stream (a feature that was introduced by version 2.10 of nats-server) to then create multiple consumers on the stream (i.e. one consumer per partition, or set of partitions).
This means that the mapping is part of the stream definition, and you can define it from the client application using the same calls that you would use to create streams normally. This is in comparison with the Core NATS subject transformation feature, which are defined as part of the account configuration (and therefore it depends on what security mode you are using: for example with static security the mappings will be defined in the server configuration file, while with operator mode security they are defined as part of the account JWT).
In any case you can do elasticity without having to 're-partition'
I actually have implemented all this (i.e. elastic 'consumer groups' for JetStream) already and I hope to be able to release it to the public soon, but you can read about how it's done in the ADR PR https://github.com/nats-io/nats-architecture-and-design/pull/263
@jnmoyne this is awesome!!!
I'm more than happy to help with testing / trialling this once this is released.
@jnmoyne Hey, it's been a few months so I wanted to ask if the stuff you described in https://github.com/nats-io/nats-architecture-and-design/pull/263 has made it to NATS yet? I can't find any docs or examples for it.
I need to process messages in parallel but in some cases, they should be processed in sequence. A good simple example is the order case: messages for different order ids should be processed in parallel but for the same should be processed in sequence. This pattern is described in more details at Microsoft https://docs.microsoft.com/en-us/azure/architecture/patterns/sequential-convoy and at ActiveMQ http://activemq.apache.org/message-groups.html . It's a common use case for Event Sourcing scenarios.
Requirements:
I see several options:
In this case, it's unclear how my code will receive notification that some message was processed (to check if there are more messages to process in the same sequence).
It's clear how to implement this option but too many queue-related code outside of the queue.
Is this possible to implement the original pattern with storage inside JetStream? Or maybe even without custom code (maybe I missed or misunderstood some options like MaxConsumers)? Is this pattern planned for implementation in the future?