nats-io / nats-streaming-server

NATS Streaming System Server
https://nats.io
Apache License 2.0
2.5k stars 285 forks source link

Load-balanced message co-ordination with distributed lock #1015

Open ghost opened 4 years ago

ghost commented 4 years ago

I have a ONAP use-case to block/wait for the dependent messages processing between load balance queue group consumers.

I am trying to build correlation messaging system, with complex business logic in correlation key identification, With NATS current implementation same correlated messages, may land in different load balance consumers(replicas) and processed concurrently, without understanding their message dependencies, In that case I need to process one by one and co-ordinate between consumers.

It is similar to distributed locking use case.

Right now, I am trying to solve with adding Atomix lib on consumer implementation, The problem with Atomix is, it requires complex static cluster configuration, programing language dependent and some cases both share some common functionalities.

I feel NATS can handle this better, with simple approach ( lock based on subjects )

https://atomix.io/docs/latest/user-manual/primitives/DistributedLock/

Do we have an api to lock/unlock on particular subject with lock name, so that we can block other consumers based on the lock name.

I see, this requirement will be applied in many use cases in Software Defined Network device managements & device control and it will avoid lot of complexity on subject naming, correlation subject based consumers and improved performances.( Ie not locking all messages at subject consumer level, only certain messages level, if necessary)

ONAP Reference code : https://gerrit.onap.org/r/gitweb?p=ccsdk/cds.git;a=blob;f=ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt;h=a6963d83f18628fb8c3dc7b9af371b0def6ceef8;hb=refs/heads/master#l46

Sample implementation, with message lock. `
MessageHandler { message ->

            runBlocking {

                println("Loadbalance Message Handler: ${message.strData()}")

                message.sub.lock("correlation-key") // wait to acquire lock

                process(message ) // 

                message.sub.unlock("correlation-key") // release lock

            }
        }

`

kozlovic commented 4 years ago

Do we have an api to lock/unlock on particular subject with lock name, so that we can block other consumers based on the lock name.

No, there is no such feature and no plan to add one at this time.

With current implementation, even if available, it may not even be enough. Let me explain: the server normally delivers a given message to a single member of the queue group. However, if the message is not acknowledged in the AckWait interval, it is going to be redelivered, possibly to another queue member. But the user callback may not see that message at the same time (lock would not help) but still be processed twice.

For instance, say there are 2 members, server sends messages like this:

member 1: m1, m3 member 2: m2, m4

Suppose that member 1 processes m1 but take a long time (more than AckWait). The server is going to redeliver this, say to member 2. But member 2 is itself processing m2 and is very slow. The message m1 may again be redelivered and is now in 2 members client library internal queue waiting to be dispatched.

ghost commented 4 years ago

Thanks for your response.

For discussion point,

We usually design AckTime is always greater than processing time plus maximum lock acquiring time. In that case this won't be an issue.

As an alternate way, we have an option to ack before getting lock, if the ack time too long.

We can keep control of this, with try lock with timeout parameters also.

message.sub.trylock("lock-name", 10s)

In my case, the actually lock processing logic is in the range of (50 ms to 100 ms). So I think it won't be an issue, having AckTime 30s.

kozlovic commented 4 years ago

So again, if you make sure that there is no redelivery possible, then I am not sure what you are asking since by nature the server sends the message to ONLY one of the queue member (of the same group, that is queue name).

ghost commented 4 years ago

Let me put the assumptions:

Before processing any dependent message, it has to creates a lock with unique name( correlation-key as lock name), If other consumer try to create with same lock name, then it has to wait. If no one has created with same lock name, then it can process imediately. So here wait is based on lock name. not on subscriber level.

Note: Generic Subscriber for multiple orders. Example : ( Item Add and Cancel request from different produce, but generic subscriber ) message 1: Order Item1 add. message 2 : Order Item2 add. message 3 : Order Cancel.

In this case assume, if message 1 lands on first replica and message 2 on 2nd replica and message 3 on 3rd replica with very small time interval. There is a possibility to process the message concurrently,(Ie before completing message 2, Message 3 will be processed) which is an issue. We need to process the message one by one. so in this case we can use orderId as lock name. so with respect to time we can sequence the message in order, with the help of lock, without blocking other orders flow.

By this, Avoid multiple subscription based on orderId. Process independent orders concurrently. Sequence the dependent order with respect to consumed time.

kozlovic commented 4 years ago

Sorry for the delay, and again, to make sure that I am not wasting your time, please note that there is no plan to implement distributed lock. That being said, to continue on this discussion, you wrote:

In this case assume, if message 1 lands on first replica and message 2 on 2nd replica and message 3 on 3rd replica with very small time interval. There is a possibility to process the message concurrently,(Ie before completing message 2, Message 3 will be processed) which is an issue. We need to process the message one by one. so in this case we can use orderId as lock name. so with respect to time we can sequence the message in order, with the help of lock, without blocking other orders flow.

But from previous description, you say that the lock would behave this way:

it has to creates a lock with unique name( correlation-key as lock name), If other consumer try to create with same lock name, then it has to wait. If no one has created with same lock name, then it can process immediately.

So in the example you describe, suppose that process that received message 2 is actually executing its code before process that received message 1, then when it ask for the lock for this order ID, nobody has already claim it and so it would accept processing of message 2. Then, say that process that received message 1 request the lock (and is blocked until process that has message 2 completes), then gets the lock, it will then process message 1, then finally message 3 is processed. So the order would be 2, 1 and 3. So you are not guaranteeing the order for this OrderID.

ghost commented 4 years ago

Thanks for your response, I am not expecting this to be in current or near releases. I am trying to fit NATS for most of the uses cases we are trying in Software Defined Networks, instead combining with other solutions inside our architecture.

I think, I made a confusion. The expected solutions is simple, Message should send/process to/in multiple replicas concurrently, before processing a message, application code will check if there any dependent messages are processing on other replicas. If processing, need to sequence only dependent messages will be processed one by one, order is not important, other Non-dependend messages can process concurrently, because they are independent.

Here dependent can be easily identified from lock name, If we could create in NATS context.

Lock context can be at any level, It can be at NATS connection or at subscriber level. Only Lock capability is expected ( similar to Hazelcast Distributed lock). We can group the subscribers by including the subscriber name in lock name.

I believe manualAck and their wait timings shall be handled by the subscribers configurations effectively based on the consumer nature.