spring-projects / spring-integration-extensions

The Spring Integration Extensions project provides extension components for Spring Integration
http://www.springintegration.org/
281 stars 266 forks source link

Issues with leader election if Hazelcast cluster nodes failed #183

Closed philnate closed 6 years ago

philnate commented 6 years ago

Our application is connected to a separate hazelcast connection to the cluster as clients. We experienced outages to some of the hazelcast nodes leading to no leader elected.

Scenario:

I've investigated the LeaderInitiator and found a couple of issues with the current implementation.

  1. the node being elected leader will never again check if it's still leader due to

    Thread.sleep( Long.MAX_VALUE );

    after the client node became leader. This can be problematic if the cluster lost any information about the lock or someone force_unlocked the lock. This then likely results in regranting the lock to another node (having 2 leaders). It might be better to regularly check if the cluster is still hold by the node and if not fire the onRevoked event and try to lock again.

  2. Client Nodes not being elected leader are exiting the tryLock cycle if the server node which received the lock request is dying. This effectively blocks the client node from becoming Leader ever again until restart. The exception thrown in such a disconnect event is:

ClientInvocation{clientMessageType=1800, target=partition 258, sendConnection=ClientConnection{alive=false, connectionId=3, socketChannel=DefaultSocketChannelWrapper{socketChannel=java.nio.channels.SocketChannel[closed]}, remoteEndpoint=[10.100.72.10]:5701, lastReadTime=2018-01-22 14:50:08.804, lastWriteTime=2018-01-22 14:50:08.811, closedTime=2018-01-22 14:50:08.804, lastHeartbeatRequested=2018-01-22 14:06:36.623, lastHeartbeatReceived=2018-01-22 14:06:36.625, connected server version=3.8.6}} timed out by 2486800 ms",
      "message": "ClientInvocation{clientMessageType=1800, target=partition 258, sendConnection=ClientConnection{alive=false, connectionId=3, socketChannel=DefaultSocketChannelWrapper{socketChannel=java.nio.channels.SocketChannel[closed]}, remoteEndpoint=[10.100.72.10]:5701, lastReadTime=2018-01-22 14:50:08.804, lastWriteTime=2018-01-22 14:50:08.811, closedTime=2018-01-22 14:50:08.804, lastHeartbeatRequested=2018-01-22 14:06:36.623, lastHeartbeatReceived=2018-01-22 14:06:36.625, connected server version=3.8.6}} timed out by 2486800 ms",
      "name": "com.hazelcast.core.OperationTimeoutException",
      "cause": {
        "commonElementCount": 0,
        "localizedMessage": "Connection closed by the other side",
        "message": "Connection closed by the other side",
        "name": "com.hazelcast.spi.exception.TargetDisconnectedException",
        "cause": {
          "commonElementCount": 0,
          "localizedMessage": "Remote socket closed!",
          "message": "Remote socket closed!",
          "name": "java.io.EOFException",
          "extendedStackTrace": [
            {
              "class": "com.hazelcast.internal.networking.nonblocking.NonBlockingSocketReader",
              "method": "handle",
              "file": "NonBlockingSocketReader.java",
              "line": 153,
              "exact": false,
              "location": "hazelcast-3.8.6.jar!/",
              "version": "3.8.6"
            },
            {
              "class": "com.hazelcast.internal.networking.nonblocking.NonBlockingIOThread",
              "method": "handleSelectionKey",
              "file": "NonBlockingIOThread.java",
              "line": 349,
              "exact": false,
              "location": "hazelcast-3.8.6.jar!/",
              "version": "3.8.6"
            },
            {
              "class": "com.hazelcast.internal.networking.nonblocking.NonBlockingIOThread",
              "method": "handleSelectionKeys",
              "file": "NonBlockingIOThread.java",
              "line": 334,
              "exact": false,
              "location": "hazelcast-3.8.6.jar!/",
              "version": "3.8.6"
            },
            {
              "class": "com.hazelcast.internal.networking.nonblocking.NonBlockingIOThread",
              "method": "selectLoop",
              "file": "NonBlockingIOThread.java",
              "line": 252,
              "exact": false,
              "location": "hazelcast-3.8.6.jar!/",
              "version": "3.8.6"
            },
            {
              "class": "com.hazelcast.internal.networking.nonblocking.NonBlockingIOThread",
              "method": "run",
              "file": "NonBlockingIOThread.java",
              "line": 205,
              "exact": false,
              "location": "hazelcast-3.8.6.jar!/",
              "version": "3.8.6"
            }
          ]
        },
        "extendedStackTrace": [
          {
            "class": "com.hazelcast.client.spi.impl.ClientInvocationServiceSupport$CleanResourcesTask",
            "method": "notifyException",
            "file": "ClientInvocationServiceSupport.java",
            "line": 229,
            "exact": false,
            "location": "hazelcast-client-3.8.6.jar!/",
            "version": "3.8.6"
          },
          {
            "class": "com.hazelcast.client.spi.impl.ClientInvocationServiceSupport$CleanResourcesTask",
            "method": "run",
            "file": "ClientInvocationServiceSupport.java",
            "line": 214,
            "exact": false,
            "location": "hazelcast-client-3.8.6.jar!/",
            "version": "3.8.6"
          },
          {
            "class": "java.util.concurrent.Executors$RunnableAdapter",
            "method": "call",
            "file": "Executors.java",
            "line": 511,
            "exact": false,
            "location": "?",
            "version": "1.8.0_112"
          },
          {
            "class": "java.util.concurrent.FutureTask",
            "method": "runAndReset",
            "file": "FutureTask.java",
            "line": 308,
            "exact": false,
            "location": "?",
            "version": "1.8.0_112"
          },
          {
            "class": "java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask",
            "method": "access$301",
            "file": "ScheduledThreadPoolExecutor.java",
            "line": 180,
            "exact": false,
            "location": "?",
            "version": "1.8.0_112"
          },
          {
            "class": "java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask",
            "method": "run",
            "file": "ScheduledThreadPoolExecutor.java",
            "line": 294,
            "exact": false,
            "location": "?",
            "version": "1.8.0_112"
          },
          {
            "class": "com.hazelcast.util.executor.LoggingScheduledExecutor$LoggingDelegatingFuture",
            "method": "run",
            "file": "LoggingScheduledExecutor.java",
            "line": 140,
            "exact": false,
            "location": "hazelcast-3.8.6.jar!/",
            "version": "3.8.6"
          },
          {
            "class": "java.util.concurrent.ThreadPoolExecutor",
            "method": "runWorker",
            "file": "ThreadPoolExecutor.java",
            "line": 1142,
            "exact": false,
            "location": "?",
            "version": "1.8.0_112"
          },
          {
            "class": "java.util.concurrent.ThreadPoolExecutor$Worker",
            "method": "run",
            "file": "ThreadPoolExecutor.java",
            "line": 617,
            "exact": false,
            "location": "?",
            "version": "1.8.0_112"
          },
          {
            "class": "java.lang.Thread",
            "method": "run",
            "file": "Thread.java",
            "line": 745,
            "exact": false,
            "location": "?",
            "version": "1.8.0_112"
          },
          {
            "class": "com.hazelcast.util.executor.HazelcastManagedThread",
            "method": "executeRun",
            "file": "HazelcastManagedThread.java",
            "line": 64,
            "exact": false,
            "location": "hazelcast-3.8.6.jar!/",
            "version": "3.8.6"
          },
          {
            "class": "com.hazelcast.util.executor.HazelcastManagedThread",
            "method": "run",
            "file": "HazelcastManagedThread.java",
            "line": 80,
            "exact": false,
            "location": "hazelcast-3.8.6.jar!/",
            "version": "3.8.6"
          }
        ]
      },
      "extendedStackTrace": [
        {
          "class": "com.hazelcast.client.spi.impl.ClientInvocation",
          "method": "notifyException",
          "file": "ClientInvocation.java",
          "line": 203,
          "exact": false,
          "location": "hazelcast-client-3.8.6.jar!/",
          "version": "3.8.6"
        },
        {
          "class": "com.hazelcast.client.spi.impl.ClientInvocationServiceSupport$CleanResourcesTask",
          "method": "notifyException",
          "file": "ClientInvocationServiceSupport.java",
          "line": 234,
          "exact": false,
          "location": "hazelcast-client-3.8.6.jar!/",
          "version": "3.8.6"
        },
        {
          "class": "com.hazelcast.client.spi.impl.ClientInvocationServiceSupport$CleanResourcesTask",
          "method": "run",
          "file": "ClientInvocationServiceSupport.java",
          "line": 214,
          "exact": false,
          "location": "hazelcast-client-3.8.6.jar!/",
          "version": "3.8.6"
        },
        {
          "class": "java.util.concurrent.Executors$RunnableAdapter",
          "method": "call",
          "file": "Executors.java",
          "line": 511,
          "exact": false,
          "location": "?",
          "version": "1.8.0_112"
        },
        {
          "class": "java.util.concurrent.FutureTask",
          "method": "runAndReset",
          "file": "FutureTask.java",
          "line": 308,
          "exact": false,
          "location": "?",
          "version": "1.8.0_112"
        },
        {
          "class": "java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask",
          "method": "access$301",
          "file": "ScheduledThreadPoolExecutor.java",
          "line": 180,
          "exact": false,
          "location": "?",
          "version": "1.8.0_112"
        },
        {
          "class": "java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask",
          "method": "run",
          "file": "ScheduledThreadPoolExecutor.java",
          "line": 294,
          "exact": false,
          "location": "?",
          "version": "1.8.0_112"
        },
        {
          "class": "com.hazelcast.util.executor.LoggingScheduledExecutor$LoggingDelegatingFuture",
          "method": "run",
          "file": "LoggingScheduledExecutor.java",
          "line": 140,
          "exact": false,
          "location": "hazelcast-3.8.6.jar!/",
          "version": "3.8.6"
        },
        {
          "class": "java.util.concurrent.ThreadPoolExecutor",
          "method": "runWorker",
          "file": "ThreadPoolExecutor.java",
          "line": 1142,
          "exact": false,
          "location": "?",
          "version": "1.8.0_112"
        },
        {
          "class": "java.util.concurrent.ThreadPoolExecutor$Worker",
          "method": "run",
          "file": "ThreadPoolExecutor.java",
          "line": 617,
          "exact": false,
          "location": "?",
          "version": "1.8.0_112"
        },
        {
          "class": "java.lang.Thread",
          "method": "run",
          "file": "Thread.java",
          "line": 745,
          "exact": false,
          "location": "?",
          "version": "1.8.0_112"
        },
        {
          "class": "com.hazelcast.util.executor.HazelcastManagedThread",
          "method": "executeRun",
          "file": "HazelcastManagedThread.java",
          "line": 64,
          "exact": false,
          "location": "hazelcast-3.8.6.jar!/",
          "version": "3.8.6"
        },
        {
          "class": "com.hazelcast.util.executor.HazelcastManagedThread",
          "method": "run",
          "file": "HazelcastManagedThread.java",
          "line": 80,
          "exact": false,
          "location": "hazelcast-3.8.6.jar!/",
          "version": "3.8.6"
        },
        {
          "class": "------ submitted from ------",
          "line": -1,
          "exact": false,
          "location": "?",
          "version": "?"
        },
        {
          "class": "com.hazelcast.client.spi.impl.ClientInvocationFuture",
          "method": "resolveAndThrowIfException",
          "file": "ClientInvocationFuture.java",
          "line": 95,
          "exact": false,
          "location": "hazelcast-client-3.8.6.jar!/",
          "version": "3.8.6"
        },
        {
          "class": "com.hazelcast.client.spi.impl.ClientInvocationFuture",
          "method": "resolveAndThrowIfException",
          "file": "ClientInvocationFuture.java",
          "line": 32,
          "exact": false,
          "location": "hazelcast-client-3.8.6.jar!/",
          "version": "3.8.6"
        },
        {
          "class": "com.hazelcast.spi.impl.AbstractInvocationFuture",
          "method": "get",
          "file": "AbstractInvocationFuture.java",
          "line": 155,
          "exact": false,
          "location": "hazelcast-3.8.6.jar!/",
          "version": "3.8.6"
        },
        {
          "class": "com.hazelcast.client.spi.ClientProxy",
          "method": "invokeOnPartition",
          "file": "ClientProxy.java",
          "line": 170,
          "exact": false,
          "location": "hazelcast-client-3.8.6.jar!/",
          "version": "3.8.6"
        },
        {
          "class": "com.hazelcast.client.proxy.PartitionSpecificClientProxy",
          "method": "invokeOnPartition",
          "file": "PartitionSpecificClientProxy.java",
          "line": 47,
          "exact": false,
          "location": "hazelcast-client-3.8.6.jar!/",
          "version": "3.8.6"
        },
        {
          "class": "com.hazelcast.client.proxy.ClientLockProxy",
          "method": "tryLock",
          "file": "ClientLockProxy.java",
          "line": 145,
          "exact": false,
          "location": "hazelcast-client-3.8.6.jar!/",
          "version": "3.8.6"
        },
        {
          "class": "com.hazelcast.client.proxy.ClientLockProxy",
          "method": "tryLock",
          "file": "ClientLockProxy.java",
          "line": 135,
          "exact": false,
          "location": "hazelcast-client-3.8.6.jar!/",
          "version": "3.8.6"
        },
        {
          "class": "customized.LeaderInitiator$LeaderSelector",
          "method": "call",
          "file": "LeaderInitiator.java",
          "line": 251,
          "exact": true,
          "version": "?"
        }
artembilan commented 6 years ago

Sounds reasonable, @philnate .

Any chances to see a contribution from your side?

Otherwise I will look into this some where in the end of week.

Thanks for bringing this up though!

artembilan commented 6 years ago

Please, consider to use LockRegistryLeaderInitiator based on the HazelcastLockRegistry instead meanwhile I'm looking to the fix here in the org.springframework.integration.hazelcast.leader.LeaderInitiator.

Although I feel like the solution should be similar we have there in the LockRegistryLeaderInitiator.

artembilan commented 6 years ago

@philnate ,

I have just pushed the fix into the master. The artifact with the version 1.0.1.BUILD-SNAPSHOT should be available shortly.

Please, let me know how it works for you.

Thanks to your report I see that mentioned above LockRegistryLeaderInitiator has the same issue number 2 you are mentioning. Indeed when target server is disconnected we just go out from the loop and exit without any new attempt to re-select the leader.

philnate commented 6 years ago

@artembilan sorry for the late reply, I finally managed to look into your changes. And there's one thing I noticed, which seems not correct to me:

https://github.com/spring-projects/spring-integration-extensions/blob/f9c7cccf7572e70b8bf47d909047919299651903/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/leader/LeaderInitiator.java#L289

There if the lock was acquired and already owned by the process, we unlock the lock, but don't reset the locked boolean. The same happens for the LockRegistryLeaderInitiator. If I got it right this means that each time the loop is executed it successfully acquire the lock, detect that it should unlock it (guess application lock got stale) and unlocks it. But still keeps the state that it owns the lock. So next iteration it as well is able to lock the lock, just to unlock it again.

Both else if and else should be identical except the unlock part, both calling handleRevoke() and locked=false. Or if the process already owns the lock and was able to relock it, it could just simply do nothing? Or is this to avoid that increase the lock count for reentrant locks?

I tested this with a one node Hazelcast Cluster and one node application cluster, where I restarted Hazelcast. The likelihood of this scenario should likely just decrease with the number of application nodes, but still persist if the whole Hazelcast cluster was down.

Maybe some additional check that after the unlock the process still owns the lock would solve it in a generic way.

artembilan commented 6 years ago

Or is this to avoid that increase the lock count for reentrant locks?

That's correct. The logic is exactly about reentrance. We got to that else if (acquired) { only if we have locked already once before and we are leader already. See when we say this.locked = true;.

The point is when we unlock during yield(), we need to call unlock on the target native lock implementation already, but not just decrease reentrance count. In other words everything we have here in the LeaderInitiator is about holding only the native lock and no re-locks on each while cycle.

I think we don't need extra check because when we come back to the tryLock() (pretty fast after that Thread.sleep(LeaderInitiator.this.heartBeatMillis)) on the next iteration, we will figure out that there is something wrong with the target native lock.

philnate commented 6 years ago

In case of the whole Hazelcast cluster being restarted and all state was lost this doesn't hold true for hazelcast. See this table:

locked acquired lockCount comment
false true 0 -> 1 lock was acquired for the first time, native/non-reentrant lock
true true 2 -> 1 lock already holded and acquired again. reentrant lock unlocked to hold native lock
true true 1 -> 0 after cluster outage the remote state for the lock was gone, app determines it's reentrant lock, but for hazelcast it's not

So this leads to the point that the app is holding the lock only for a short amount of time constantly flipping between granted/revoked. Though now thinking about it, this seems to be more of a corner case (for one instance app clusters), as with multiple app instances likely another instance took the lock rectifying the bad state of the previous owner.

artembilan commented 6 years ago

Sorry, I don't understand your third situation. From the LeaderInitiator perspective it is just a third iteration and the state must be 2 -> 1, like in the second iteration. There is just no difference. Just because we have just stepped into the lock.tryLock() with successful result. If you say it's not a case for Hazelcast and it just increases a lockCount without checking the cluster state, then it must be fixed on the Hazelcast side.

We recently had similar problem with the RedisLockRegistry before:

Thread currentThread = Thread.currentThread();
if (currentThread.equals(this.thread)) {
    this.reLock++;
    return true;
}

But now we also go to Redis for entry update or failure because of some connectivity issue or TTL expiration.

Looks like issue is somewhere here in the LockResourceImpl:

boolean lock(String owner, long threadId, long referenceId, long leaseTime, boolean transactional,
                 boolean blockReads, boolean local) {
        if (lockCount == 0) {
            this.owner = owner;
            this.threadId = threadId;
            this.referenceId = referenceId;
            lockCount = 1;
            acquireTime = Clock.currentTimeMillis();
            setExpirationTime(leaseTime);
            this.transactional = transactional;
            this.blockReads = blockReads;
            this.local = local;
            return true;
        } else if (isLockedBy(owner, threadId)) {
            if (!transactional && !local && this.referenceId == referenceId) {
                return true;
            }
            this.referenceId = referenceId;
            lockCount++;
            setExpirationTime(leaseTime);
            this.transactional = transactional;
            this.blockReads = blockReads;
            this.local = local;
            return true;
        }
        return false;
    }

This method is called from the LockStoreImpl.lock(Data key, String caller, long threadId, long referenceId, long leaseTime) with the transactional and local as false. Therefore in case of re-lock we end up in the positive result for the if (!transactional && !local && this.referenceId == referenceId) {. From here we never get lockCount++; and never update TTL with the setExpirationTime(leaseTime);.

Am I missing anything ?

May be we should call some extra operation (e.g. extend lease) after unlock in the mention block for reentrance?

philnate commented 6 years ago

The third situation occurs when the Hazelcast cluster was down, in this situation there's no more information left on granted locks from the point of Hazelcast. It's like a fresh start for Hazelcast. But the application is still running and was master before the Hazelcast outage, at least on my machine the request to Hazelcast seemed to have stalled until the cluster recovered. Thus the application never revoked the Leader role. So the application thinks that we operate on a reentrant Lock while, due to the outage of Hazelcast it's a native lock.

Even if tryLock might fail correctly with some kind of exception stating that Hazelcast isn't available or the trylock timed out. The invocation of unlock might yield the very same exception and then locked=false won't be called. Looking at the ClientLockProxy of Hazelcast and the call chain we can get any kind of unchecked exception during this invocation.

In my eyes the corner case that we lost the whole remote cluster with the locking information and the application is still running.

The same kind of issue I could imagine for Redis.

artembilan commented 6 years ago

Yeah... I think I know what you mean: https://jira.spring.io/browse/INT-4447.

We need to back port that fix from the LockRegistryLeaderInitiator.

Will fix soon.

Thanks

artembilan commented 6 years ago

@philnate ,

Thank you for your patience!

Please, take a look into the latest fix on the matter.