akkadotnet / akka.net

Canonical actor model implementation for .NET with local + distributed actors in C# and F#.
http://getakka.net
Other
4.7k stars 1.04k forks source link

ClusterClientReception Ask missing response to Client after a few thousand #3417

Closed leo12chandu closed 6 years ago

leo12chandu commented 6 years ago

When creating a new issue, please make sure the following information is part of your issue description. (if applicable). Thank You!

Is there other ways to send/receive/ask messages to a cluster apart from ClusterClientReceptionist from outside the cluster?

We are currently registering actors with ClusterClientReceptionist and use ClusterClient to "ask" messages to cluster from outside. However, when we test with 10,000 (small) messages after a few thousand even though the server node processes those messages, the client does not receive a reply for them through Ask. Is there a way to flush the messages back to cluster after Client.Tell?

ClusterClientReceptionist with 2 seed and 2 worker (non-seed) nodes seem to evenly distribute the load between both the worker nodes. I am curious if ClusterClientReceptionist is the recommended way to distribute load across nodes or if there is a better way to handle it (including the ability to send/ask messages from outside of cluster).

leo12chandu commented 6 years ago

I have additional details on this. This is reproducible only if 1) the seed node is separated out from the actual worker node. 2) have custom HyperionSerializer.

When we make the actual worker node a seed node onto itself, we are unable to reproduce this even though we continue to use custom HyperionSerializer. I.E, we are not missing any message responses back to cluster client when all our nodes are seed nodes by themselves.

If it helps to know, when seed nodes are separated out, due to custom serializer the message is actually bouncing around between seed nodes and worker nodes twice. Once when the message arrives, it first goes to seed node, deserializes and then is sent to the actual worker node which is again deserialized. And then when the result is sent back through ClusterClientReceptionist, the message is sent first to the seed node, deserializes and then seed node inturn sends it to the clusterclient. My thought is, this is probably because the initial contacts we defined are for seed nodes. Hence I am guessing the message has to always pass through the seed node. But i donno if this leads to dropped responses (back to client).

leo12chandu commented 6 years ago

More information:- When seed node and worker node are the same, the cluster clients that are in the same geography as the seed/worker nodes are working fine. They are able to receive all the messages back even when I try to send 10,000 messages at once. However, when we try have cluster clients reside in a different geography than the seed/worker nodes that do the work, after 1000 requests, we are missing some message responses.

Horusiath commented 6 years ago

@leo12chandu have you experienced any network disconnections in during that test?

leo12chandu commented 6 years ago

Yes and No. Oddly, I see all 3 combinations. Even with same geography. Because I am running with a volume of 4000-10000, I do see some disconnects but they get reconnected and the responses flow back through to the client just fine (all of them). However, in some cases (when nodes and client in different geography like US and India), I see no disconnects but the response messages from server to the cluster client are missing/dropped. I know the server is receiving and running all the requests because I have logged them on the server side.

In rare cases (again when nodes and client in different geography like US and India), I do see the following error on the server side with 9 deadletters. But this error does not always show up. Sometimes, there is no error but the cluster client does not receive the messages. [INFO][5/14/2018 3:36:03 PM][Thread 0129][akka://MTSActorSystem/system/reception ist/akka.tcp%3A%2F%2FMTSClientActorSystem%40127.0.0.1%3A34758%2Ftemp%2FBk] Messa ge BatchResult from akka.tcp://MTSActorSystem@chomimtsdev01:4053/user/coordinato r/c4/batchRouter/c1/$Ee to akka://MTSActorSystem/system/receptionist/akka.tcp%3A %2F%2FMTSClientActorSystem%40127.0.0.1%3A34758%2Ftemp%2FBk was not delivered. 9 dead letters encountered.

Is there an option to flush the messages after Client.Tell so the client definitely receives? It kinda feels like ClusterClientReceptionist is the problem child but I could be wrong.

leo12chandu commented 6 years ago

Hi, I have put together a test application to reproduce this issue if it makes it easier. A zipped project can be found at the below location.

ClusterClientMsgDrop.zip

Instructions to reproduce:- 1) Run the SPGMI.ErrorChecks project. (It will run a cluster node with pool of actors running in that app.) 2) Run the SPGMI.ErrorChecks.UI project. This will open up a UI. Now open 4 instances of this UI. 3) In each of the instances, select ClusterClient from "Run On" dropdown. Change the "No of Batches" checkbox to 2000. 4) Click "Run Batch" in each of them as simultaneously as you can. 5) You will notice that cluster node from #1 processes the messages and returns the results back to the UI Results boxes. At the end of the Results textbox, you will see the count of how many are processed. 6) Some of them will show 2000, but others are just waiting for results to finish. Now, the UIs that are not printing anymore, just click "Cancel Batch". It shows how many are processed and how many faulted. Essentially some of those messages did not make it back from the cluster node to client even though they are processed. 7) Now if you changed the RunOn to "ClusterDirect" and run all 2000 messages in each UI again, it will work fine. All 4 UIs will return results for all 2000 batches submitted.

You can try this above exercise with 1 instance of UI app too but just make sure to choose 10000 batches in "No of Batches" textbox.

Aaronontheweb commented 6 years ago

I've been able to reproduce the result that not all of these messages get processed successfully via the ClusterClient when working with larger batch sizes of messages.

I haven't dug into the code yet, but I have a couple of theories on what is going on here...

leo12chandu commented 6 years ago

Ah. Thanks for looking. You don't think there is such a thing as flushing the socket in the distributedPubSubMediator or something, do you? Assuming ClusterClient uses DistributedPubSub

Aaronontheweb commented 6 years ago

I found the issue - problem is that the default buffer size allowed by the ClusterClient can only contain 1000 messages.

https://github.com/akkadotnet/akka.net/blob/6f32f6a772e917e1abfed6efeb05234bf5feeac4/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf#L77-L83

I was able to bump this value up to 10000 via the following C# code:

var settings = ClusterClientSettings.Create(actorSystem).WithBufferSize(10000).WithInitialContacts(initialContacts);
                    clusterClientActor = actorSystem.ActorOf(Akka.Cluster.Tools.Client.ClusterClient.Props(settings), "client");

And that fixed the issue. What's happening here is you're filling up the outbound buffer before the ClusterClient has a chance to establish a connection to the mediator, and once the buffer gets full, we start overwriting older elements that haven't been delivered.

https://github.com/akkadotnet/akka.net/blob/6f32f6a772e917e1abfed6efeb05234bf5feeac4/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClient.cs#L467-L484

So a larger buffer size might be all you need, but a better way of doing this would be to not send anything until your ClusterClient has made contact with at least one ClusterClientReceptionist somewhere in the cluster. Let me see if I can come up with a decent way of doing that.

Aaronontheweb commented 6 years ago

Should note that reversing the order of the ClusterClientSettings.WithBufferSize and WithInitialContacts resulted in an NRE at startup. Going to submit a PR for that momentarily.

Aaronontheweb commented 6 years ago

Here's some code I came up with to help delay the sending of any initial messages until the ClusterClient is able to connect with at least one receptionist the very first time, from the samples' ExecuteBatch function:

// spin until we've made at least one contact point
            while (true)
            {
                var points = await ClusterClientActor
                    .Ask<ContactPoints>(GetContactPoints.Instance, TimeSpan.FromSeconds(1)).ConfigureAwait(false);
                if (points.ContactPointsList.Count > 0)
                    break;
            }

            var taskAsk = this.ClusterClientActor.Ask<BatchResult>(
                new Akka.Cluster.Tools.Client.ClusterClient.Send("/user/coordinator",
                    new BatchRequest() { BatchToExecute = obj, EnablePersistence = enablePersistence, ExceptionAtOperation = exceptionAtOperation }),
                cancelToken);

I was able to successfully send thousands of messages using this without changing the buffer size. However, I still noticed some dropped messages when I tried sending 20,000 messages down the pipe here, so this might merit me digging into it a little deeper...

Aaronontheweb commented 6 years ago

For future reference, both of these logging statements should probably be warnings so it's easier for developers to understand why their messages aren't being delivered:

https://github.com/akkadotnet/akka.net/blob/dev/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClient.cs#L471

https://github.com/akkadotnet/akka.net/blob/dev/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClient.cs#L476

Going to submit a second PR to address those.

leo12chandu commented 6 years ago

Thanks a ton for looking into this and coming up with solutions!!! The solution with increasing the buffer size alone works wonderfully (although a buffer size of 20,000 fails but not a biggie). However, the delay until cluster client finds atleast one receptionist is not working for me. About 40-50% of the messages dropped when I tried 10,000. And the code I tried is below.

Not working:-

while (true)
            {
                var points = ClusterClientActor
                    .Ask<ContactPoints>(GetContactPoints.Instance, TimeSpan.FromSeconds(1)).Result;
                if (points.ContactPointsList.Count > 0)
                    break;
            }

            var taskAsk = this.ClusterClientActor.Ask<BatchResult>(
                new Akka.Cluster.Tools.Client.ClusterClient.Send("/user/coordinator",
                    new BatchRequest() { BatchToExecute = obj, EnablePersistence = enablePersistence, ExceptionAtOperation = exceptionAtOperation }),
                cancelToken);

Working:- var settings = ClusterClientSettings.Create(actorSystem).WithBufferSize(10000).WithInitialContacts(initialContacts);

I am going to end up just increasing the buffer size for now.

Thank You again!!!!! This eliminates a big blocker for us.

Aaronontheweb commented 6 years ago

@leo12chandu the 10000 buffer size limit is a hard-coded constraint built into the ClusterClient itself. I know the JVM has this limit too. It seems arbitrary to me though.

I'll keep digging into this issue and see why the code for waiting until a receptionist is available didn't work...

leo12chandu commented 6 years ago

@Aaronontheweb - I think I found another issue with the ClusterClient dropping messages when the actors are performing some blocking operations for a second like database call. To reproduce this, you can use the same test application I put together above except, in the BatchActor.cs, add Thread.Sleep(1000) to mimic the database call. Now follow the same instructions as above. You will see out of 1000 messages sent, we get only about 150 responses, rest of them are dropped.

BatchActor.cs

Receive<BatchRequest>(batchRequest =>
            {
                Thread.Sleep(1000);
                var batchOperations = BatchManager.CreateOperations(batchRequest.BatchToExecute);
                var aggregatorActor = Context.ActorOf(Props.Create(() => new AggregatorActor(_schemaManager, Sender, ErrorCheckActorRef, batchRequest.BatchToExecute.BatchID, batchOperations, batchRequest.EnablePersistence, batchRequest.ExceptionAtOperation)));
                aggregatorActor.Tell(new BatchExecutionStart());
            });
Danthar commented 6 years ago

Please dont use thread.sleep to mimic "work" its not representative. At all. Use Task.Delay. As database actions utilize actual IO, So work is rescheduled. Possibly on the same thread. In a async/await TPL world. Thread.Sleep is like a nuke. It gets the job done, but there will be lots of side effects.

In this case any task scheduled to run on that Thread will be blocked.

Aaronontheweb commented 6 years ago

@Danthar in the case @leo12chandu is describing, it's probably ok. Sounds like the ClusterClient's contact stopped sending heartbeats and that's why the messages were dropped - could happen if the actor was doing complex processing of some kind.

leo12chandu commented 6 years ago

I think I may have found another situation where ClusterClient is dropping messages. When the message size is large both on the receiving and sending side, it drops after processing certain messages. I've removed the Thread.Sleep(). Instead the changes I made with the tool now are

1) In the MainWindowViewModel.cs, I've added a LargeString property and set it as shown below. This is on the sending side.

//Create Batch and its contents.
                var finlEOP = new FinlEOP()
                {
                    KeyFinlEOP = (i + 1),
                    TotalAssets = 80,
                    TotalLiabilities = 20,
                    Expenditures = 10,
                    InterestIncome = 100,
                    LargeString = new string('*', 114056)
                }.Check().Persist().Approve();

2) In the AggregatorActor.cs, I've added the same LargeMessage property as below within the RunErrorChecks method. This is on the response side.

                `mtsErrorCheck.LargeMessage = new string('*', 114056);`

3) Now, make sure both the server and client side we increase the message sizes as below. This can be found in App.Config of ErrorChecks project and ActorConfigConstants.cs in the Shared project.

message-frame-size =  30000000b
                send-buffer-size =  30000000b
                receive-buffer-size =  30000000b
                maximum-frame-size = 30000000b

4) Run ErrorChecks and UI projects up. In the UI, select ClusterClient from the dropdown and 9000 for number of batches. When you run, you can see server side running. When its done, client does not receive all the messages. You can hit cancel batch to see the number of messages faulted usually somewhere around 4000.

This works fine when I use ClusterDirect which essentially sends messages directly to actors without ClusterClient. So something is definitely up with ClusterClient.

leo12chandu commented 6 years ago

@Aaronontheweb - Any luck reproducing this?

Aaronontheweb commented 6 years ago

@leo12chandu so I made the changes you suggested and ran everything using the latest Akka.NET 1.3.9 nightlies.

One solution made it to completion (9000) and another was missing about ~50 messages. The repro solution is currently running Akka.NET v1.3.3 so I'd recommend upgrading to at least Akka.NET v1.3.8.

Aaronontheweb commented 6 years ago

So I've run this several times now and I always make it to completion. I think the ~50 missing on that one run may have been an issue with me copying the result set from the repro app too early.

Try upgrading to at least Akka.NET v1.3.8, and failing that, try the Akka.NET Nightly builds for 1.3.9 - I think whatever was causing this issue has been fixed.

leo12chandu commented 6 years ago

Interesting, Let me try updating it to 1.3.8 and also with 1.3.9 and try both the Large message issue and blocking operation issue.

You didn't happen to increase the number of messages on TCP port to avoid port exhaustion or anything like that have you?

Aaronontheweb commented 6 years ago

We upgraded the version of DotNetty we're using in 1.3.8, but we also have made some bugfixes to DistributedPubSub in Akka.NET v1.3.5 which may have also contributed to fixing this issue: https://github.com/akkadotnet/akka.net/blob/dev/RELEASE_NOTES.md#135-february-21-2018

leo12chandu commented 6 years ago

That is really weird. I've updated and tried both 1.3.8 as well as 1.3.9 nightly build, and I am able to reproduce dropped messages in both the blocking call case as well as large messages case. Here is the complete code with the large message and blocking call implemented. (without packages and bin/debug to reduce file size) SPGMI.ErrorChecksBlockingCall.zip

Aaronontheweb commented 6 years ago

So the issue ended up being a couple of things:

When the Thread.Sleep call was enabled it could take the actors on the other side of the ClusterClientReceptionist slightly over 32 seconds on average to produce the first response due to some design choices with this app (by the looks of it.) That would cause the receive tunnel to time itself out:

https://github.com/akkadotnet/akka.net/blob/6f32f6a772e917e1abfed6efeb05234bf5feeac4/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf#L11-L24

Prior to adding the Thread.Sleep call, it was random whether or not the actors could get their work done in-time. But after adding that call, they almost always failed to meet it. So the right solution here is to allow the actors doing the work to start responding sooner or to lengthen that timeout value via configuration.

Please let me know if there are any other issues here!