akka / akka

Build highly concurrent, distributed, and resilient message-driven applications on the JVM
https://akka.io
Other
13.03k stars 3.59k forks source link

Akka Cluster Sharding handover causes high increases in latencies #28011

Open milanvdm opened 4 years ago

milanvdm commented 4 years ago

We are running Akka 2.5.19 using Akka Cluster, Sharding, Management, and Persistence C*. We have our cluster running with service discovery in Fargate ECS.

Settings for Akka Cluster:

cluster {
    min-nr-of-members = 2
    failure-detector.threshold = 12 // https://doc.akka.io/docs/akka/current/cluster-usage.html#failure-detector

    sharding {
      passivate-idle-entity-after = 20s
      distributed-data {
        majority-min-cap = 2 // https://doc.akka.io/docs/akka/current/distributed-data.html#consistency
      }
      waiting-for-state-timeout = 2s // https://github.com/akka/akka/pull/27025
      rebalance-threshold = 2 // https://github.com/akka/akka/pull/26101
    }
  }

With the following steps:

  1. Cluster of 3 nodes
  2. The cluster is receiving numerous GET requests to get the information of an entity (2-30ms latency)
  3. A rolling restart starts
  4. 2 new nodes are spawn (5 nodes in the cluster)
  5. When those 2 new nodes are healthy (based on AkkaManagementHttp), 2 oldest nodes are downed with coordinated-shutdown
  6. 1 new node is spawned (4 nodes in the cluster)
  7. The latest old node is shut down using Coordinated Shutdown.

What we see is that during the handovers of the Shards/ShardCoordinator, some GET requests now get queued and have a 4s+ latency.

From the documentation, it indeed mentions that when you do a ShardCoordinator handover or Shard handover, requests to those entities will be buffered until the handover is finished.

Are times of 4s+ normal for this process? That seems very high since the number of shards is set to a maximum of 50.

patriknw commented 4 years ago

Hi @milanvdm

I can try with an example so that we can compare latencies, and see if anything can be improved. Will probably not have time until next week.

If you can share logs from the involved nodes it can also be useful. Debug level logging for akka.cluster and akka.cluster.sharding.

In general, I'd recommend using the latest patch version, which is 2.5.25.

milanvdm commented 4 years ago

@patriknw Thanks for your response!

What would your example setup be? I could possibly give you a hand setting it up if needed.

Ill try setting up a test run to get some relevant debug logs for you as well.

I checked the releases between 2.5.19 and 2.5.25 but didn't really find any serious changes around Sharding or handovers so would like to avoid a risky upgrade process until the latency issues are resolved :)

patriknw commented 4 years ago

I'd probably adjust the https://github.com/akka/akka-samples/tree/2.5/akka-sample-sharding-scala

patriknw commented 4 years ago

@milanvdm I have tried the rolling update with the example in branch akka-samples/wip-28011-sharding-rolling-update-patriknw and couldn't see any disaster latency. Around 400 millis.

run with 5 terminals, port 2551, 2552, ... 2555

cd akka-samples/akka-sample-sharding-scala
sbt "runMain sample.sharding.ShardingApp 2555"

Traffic is generated by 2555.

Tried two secarios:

  1. ctrl-c of 2552 and 2553
  2. ctrl-c of 2551, which is the oldest hosting the coordinator

In logs one can see when it starts by searching for "Leaving", and then look for "Starting Device" of a device that was previously located on the leaving node.

Let me know if you have another scenario that illustrates the problem.

milanvdm commented 4 years ago

@patriknw Thanks! Ill play around with your branch this week to try and emulate the issue :)

milanvdm commented 4 years ago

@patriknw Hi Patrik,

I have some debug logs showing the issue quite well. What would be the easiest way to share them with you?

My initial thoughts:

milanvdm commented 4 years ago

@patriknw In the meantime, we already managed to reduce the issue to two specific scenarios:

1) During/After CoordinatedShutdown of a node, we see a new node trying to register itself to the ShardCoordinator.

Trying to register to coordinator at [ActorSelection[Anchor(akka.tcp://payments@10.2.144.222:2552/), Path(/system/sharding/payments-shardCoordinator/singleton/coordinator)]], but no acknowledgement. Total [1] buffered messages. [Coordinator [Member(address = akka.tcp://payments@10.2.144.222:2552, status = Up)] is reachable.]

then I see the ShardCoordinator moving to an active-state (this takes around 600ms).

But then it seems to successfully register and goes into some retries of:

Retry request for shard [25] homes from coordinator at [Actor[akka://payments/system/sharding/payments-shardCoordinator/singleton/coordinator#-405486765]]. [1] buffered messages.

2) 1 ms after we send a message to the PersistenceActor, a ShardHandover is started containing that entity. We get a timeout on that request. -> Is this a scenario that we should handle ourselves?

patriknw commented 4 years ago

Could you zip the logs and attached here, or are is it too large? Otherwise share with dropbox, google drive or similar?

milanvdm commented 4 years ago

~

milanvdm commented 4 years ago

@patriknw From what we can see (using your example), we do very similar things. We see that the moment CoordinatedShutdown is triggered, it takes more than 1 seconds before we can contract the ShardCoordinator again. We have set the ddata gossip interval to 100ms so the state itself should be seen quite fast by all the 3 nodes.

At this moment, my feeling is that this latency is added by Akka. Are there strategies to improve this? -> Does Akka do a pre-start of a Coordinator on a new node before the handover, so it is quick to switch?

patriknw commented 4 years ago

Thanks, I'll dig in on Monday

milanvdm commented 4 years ago

Some of our configs that may be handy to know:

akka {
  io.dns.inet-address {
    positive-ttl = never
    negative-ttl = default
  }

  cluster {
    roles = ["payments"]

    gossip-interval = 100ms

    min-nr-of-members = 2
    failure-detector.threshold = 12 // https://doc.akka.io/docs/akka/current/cluster-usage.html#failure-detector

    sharding {
      role = "payments"

      passivate-idle-entity-after = 20s

      retry-interval = 100ms
      shard-start-timeout = 100ms
      rebalance-interval = 10s

      least-shard-allocation-strategy {
        rebalance-threshold = 2
      }

      distributed-data {
        gossip-interval = 100ms
        notify-subscribers-interval = 100ms
        majority-min-cap = 2 // https://doc.akka.io/docs/akka/current/distributed-data.html#consistency
      }
    }
  }

  discovery {
    method = config
    method = ${?DISCOVERY_METHOD}
    aws-api-ecs-async {
      cluster = ${?FARGATE_CLUSTER_NAME}
    }
    config.services = {
      core-payments = {
        endpoints = [
          {
            host = "127.0.0.1"
            port = 8558
          }
        ]
      }
    }
  }

  management {
    http {
      hostname = "127.0.0.1"
      hostname = ${?AKKA_MANAGEMENT_HOSTNAME}
    }
    cluster.bootstrap {
      new-cluster-enabled = off
      contact-point-discovery {
        service-name = "core-payments"
        required-contact-point-nr = 2
      }
      contact-point.fallback-port = 8558 // https://doc.akka.io/docs/akka-management/current/discovery/aws.html#akka-discovery-aws-api-async
    }
  }

  remote.netty.tcp.hostname = "127.0.0.1"
  remote.netty.tcp.hostname = ${?AKKA_MANAGEMENT_HOSTNAME}
}
patriknw commented 4 years ago

Looked at your logs.

Can't understand what to look for in extract-2019-10-25_04-08-12.csv.zip.

In extract-2019-10-25_04-10-33.csv.zip:

Isn't that extremely quick? What do you expect?

For me to be able to investigate further we have to stick to the sample and not logs from production system, because that kind of investigation is what we offer customers of Lightbend support.

It's important that we differentiate between two scenarios.

  1. Rolling update of the coordinator node, the oldest node.
  2. Rolling update of another node, not the oldest.

I expect 1 to be slower.

milanvdm commented 4 years ago

Let's look at extract-2019-10-25_04-10-33.csv.zip:

1) CoordinatedShutdown is triggered on the oldest node 2) New node outputs:

2019-10-24T14:15:53.573Z |  GetShardHome [25] request from [Actor[akka.tcp://core-amazon-payments@10.2.25.165:2552/system/sharding/payments-shard#-894379935]] deferred, because rebalance is in progress for this shard. It will be handled when rebalance is done.
2019-10-24T14:15:53.572Z | Retry request for shard [25] homes from coordinator at [Actor[akka.tcp://core-amazon-payments@10.2.144.222:2552/system/sharding/payments-shardCoordinator/singleton/coordinator#879890240]]. [1] buffered messages.

3) New ShardCoordinator is found:

2019-10-24T14:15:54.850Z | Sharding Coordinator was moved to the active state State(Map(12 -> Actor[akka.tcp://core-amazon-payments@10.2.80.206:2552/system/sharding/payments-shard#409668457], ......

4) Shard is found:

2019-10-24T14:15:54.903Z | Starting entity [25] in shard [25]
2019-10-24T14:15:54.902Z | Shard was initialized 25
2019-10-24T14:15:54.902Z | Starting shard [25] in region
2019-10-24T14:15:54.901Z  | Host Shard [25]
2019-10-24T14:15:54.900Z | Shard [25] located at [Actor[akk......
2019-10-24T14:15:54.900Z | Deliver [1] buffered messages for shard [25]
2019-10-24T14:15:54.900Z | Shard [25] located at [Actor[a....
2019-10-24T14:15:54.900Z | Shard [25] allocated at [Actor[a....
2019-10-24T14:15:54.900Z | The coordinator state was successfully updated with ShardHomeAllocated(25,Actor[akka.t....

5) The payment is found: 2019-10-24T14:15:55.058Z | getPaymentFinishedAfter=1585

This is 1.5s handover time (this varies until 2-3s sometimes)

milanvdm commented 4 years ago

@patriknw The main question is mainly:

-> We found the Akka.NET lighthouse project which seems to partially aim to solve this kind of problems to make sure your Coordinator does not move and cause latency increases.

patriknw commented 4 years ago

Thanks, that is a more interesting scenario than then one I accidentally looked at. Here it is is rolling update of the coordinator node (oldest node). It gives me some ideas of what maybe could be improved.

Since there are typically many GetShardHome requests after the handoff of the region it would be better if those could be served by the old coordinator before it shuts down. Meaning that the rolling update phase may be delayed somewhat but with better response times for these sharding requests.

I'll experiment with that tomorrow.

milanvdm commented 4 years ago

@patriknw My bad, I should have invested more time to make it more clear to you what to look at in the logs.

What about an approach where a new ShardCoordinator is created on a (not-available-yet) modus on a new node. Then the old node can as you mentioned, handle the requests until it sends activate to this already running ShardCoordinator. Just to make the handover time as short as possible.

Thanks for looking into this!

patriknw commented 4 years ago

I can't see same behavior in the sample. There the old coordinator is already handling those requests and it's therefore pretty quick.

In your log I would expect one such for each shard (4, 21, 17, 0, 28, 25), but it's only for 0.

2019-10-24T14:15:53.473Z Rebalance shard [0] done [true]

Something getting stuck when stopping the shards/actors? That could explain the repeated

2019-10-24T14:15:54.073Z,debug,core-amazon-payments/core-amazon-payments/1d9834df2b714bc2bfd47bb96664081b,"GetShardHome [25] request from [Actor[akka.tcp://core-amazon-payments@10.2.25.165:2552/system/sharding/payments-shard#-894379935]] deferred, because rebalance is in progress for this shard. It will be handled when rebalance is done."

Note that there have been several important fixes to Cluster Sharding since version 2.5.19, so I'd recommend you update to latest 2.5.26.

milanvdm commented 4 years ago

@patriknw We spam the nodes with the same GET request by id during a redeploy. This id probably lives in this shard 0.

Would this prevent/slown down stopping the actor?

patriknw commented 4 years ago

We spam the nodes with the same GET request by id during a redeploy. This id probably lives in this shard 0.

Would this prevent/slown down stopping the actor?

It looked like shard 0 was the one that was completed, but missing corresponding log for the other shards.

patriknw commented 4 years ago

I updated the sample branch to use Akka 2.5.26, also to see if any difference with 2.5.19. The sample is still behaving good in the sense that the old coordinator allocates the shards before shutting down. That can be seen in the logs of the old coordinator 2551 before it shuts down:

Shard [9] allocated at [Actor[akka://ShardingSystem@127.0.0.1:2552/system/sharding/Device#-1618489059]]
milanvdm commented 4 years ago

@patriknw Ive debugged the issue together with @manuelbernhardt. A couple of things came out of this, but I think one of the surprising ones was that we added:

coordinatedShutdown
      .addTask(CoordinatedShutdown.PhaseClusterShardingShutdownRegion, "wait-on-shard-hand-over") { () ⇒
        // PhaseClusterShardingShutdownRegion has a timeout of 10s, so we take sleep 9s to make sure we
        // don't trigger the timeout
        Future(Thread.sleep(9.seconds.toMillis)).map(_ ⇒ Done)
      }

We saw that during the shutdown of the node with the coordinator, it didn't give time to the other nodes to finish the ShardHandover. Causing the new nodes to try and register the Shards while the coordinator was already shutdown, adding extra latency. So it seems the ~PhaseClusterShardingShutdownRegion~ PhaseClusterExiting start too early.

Would this be an unexpected behavior in the Akka CoordinatedShutdown?

patriknw commented 4 years ago

I don't think it's PhaseClusterShardingShutdownRegion that starts too early, but rather the Cluster Singleton of the coordinator that starts its shutdown too early. The singleton shutdown is in phase PhaseClusterExiting, which is after PhaseClusterShardingShutdownRegion.

If you add a delay to PhaseClusterShardingShutdownRegion it means that PhaseClusterExiting will run later and therefore keeping the coordinator singleton around longer.

(All tasks within a phase are running in parallel.)

I'm still not sure I see how the coordinator can be shutdown too early. Can you reproduce/simulate that with the example project that I created?

It might also be slightly related to https://github.com/akka/akka/pull/28104, which is fixing a problem when using persistent mode, but it will also improve the coordinator writes during shutdown for ddata mode.

manuelbernhardt commented 4 years ago

I'm not entirely sure why this is - one hypothesis is that the region on the node that hosts the coordinator is faster at handing over its shards than the region on the other node that is being shut down (the redeployment stategy shuts down two nodes at once). As a result the region on the node that also holds the coordinator shuts itself down before the shards on the other node were handed over, causing the delays we observed.

patriknw commented 4 years ago

That sounds plausible. So the rolling update is shutting down more than one node at a time? Could you tweak the rolling update strategy to only shutdown one when it comes to the oldest (coordinator). As you know, the oldest should anyway be treated special since it should be shutdown last for optimal roll.