EventStore / EventStore-Client-Dotnet

Dotnet Client SDK for the Event Store gRPC Client API written in C#
Other
140 stars 38 forks source link

Automatic reconnection to new leader #263

Closed smatsson closed 11 months ago

smatsson commented 11 months ago

Hi! We have run into a odd case where the client does not fallback to the new leader when restarting the current leader node. I'm not really sure if this is due to the server, the client or our code so any help would be appreciated.

We have a cluster running ES 23.6.0. The cluster consists of three Windows machines. One leader, two followers. EventStore.Client.Grpc.Streams 23.1.0 is used as the client. When doing Windows update on the machine we start with a follower and wait for ES to be completely up and running before going forward with the next node. This means waiting for sync with the other nodes and waiting for index rebuilding + chunk verification.

Restarting each follower seems to work just as it should. Once we restart the leader node some requests to our API fail with a "RpcException" indicating that the leader node cannot be connected to, does not respond etc. This seems reasonable as the leader node machine is now restarting. I'm curious why the client does not reconnect to the new leader instead of retrying over and over again for the old, now down, leader? I would assume that since we feed the client with the complete connection string (containing all nodes) it would change to another node when it determines that the current node is down? We previously ran ES 5.x for many years and never had any similar issues.

Connection string looks like this:

esdb://{UserName}:{Password}@192.168.129.51:2113,192.168.129.52:2113,192.168.129.53:2113?tls=true&tlsVerifyCert=false

Our code looks like this. The EventStore class is injected as a singleton.

public class EventStore
{
    private readonly ILogger _logger;
    private readonly global::EventStore.Client.EventStoreClient? _client;
    public EventStore(EventStoreConfiguration configuration, Logger logger)
    {
        _logger = logger.ForContext(GetType());
        var eventStoreClientSettings = EventStoreClientSettings.Create(configuration.ConnectionString);
        eventStoreClientSettings.DefaultDeadline = TimeSpan.FromSeconds(20.0);
        _client = new EventStoreClient(eventStoreClientSettings);
    }
    public async Task Append(StreamId id, int version, ReadOnlyCollection<DomainEvent> events)
    {
        var expectedRevision = version == 0
            ? StreamRevision.None
            : new StreamRevision((ulong)(version - 1));

        var eventsToSave = events
                           .Select(ToEventData)
                           .ToArray();
        try
        {
            await AppendLocal();
        }
        catch (Exception re) when (re is RpcException or InvalidOperationException or NotLeaderException)
        {
            try
            {
                _logger.Information(re, "GrpcException: Retry Append...");
                await Task.Delay(2_000);
                await AppendLocal();
                _logger.Information(re, "GrpcException: Retry Append Successful");
            }
            catch (WrongExpectedVersionException ve)
            {
                _logger.Information(ve, "GrpcException: Retry Append failed, WrongExpectedVersion. Actual {Count}, Expected {MessageCount}", ve.ActualVersion, ve.ExpectedVersion);
                throw re;
            }
        }
        async Task AppendLocal()
        {
            await _client.AppendToStreamAsync(id.ToString(), expectedRevision, eventsToSave);
        }
    }
}

We can see in our logs that we have logged "GrpcException: Retry Append..." and that both the first call and the retry fails for similar reasons. For example:

Grpc.Core.RpcException: Status(StatusCode="Unavailable", Detail="Server Is Not Ready")
   at async Task<bool> EventStore.Client.Interceptors.TypedExceptionInterceptor+AsyncStreamReader<TResponse>.MoveNext(CancellationToken cancellationToken)
   at async IAsyncEnumerable<T> EventStore.Client.AsyncStreamReaderExtensions.ReadAllAsync<T>(IAsyncStreamReader<T> reader, CancellationToken cancellationToken)+MoveNext()
   at async EventStore.Client.EventStoreClient+ReadStreamResult(Func<CancellationToken, Task<CallInvoker>> selectCallInvoker, ReadReq request, EventStoreClientSettings settings, TimeSpan? deadline, UserCredentials userCredentials, CancellationToken cancellationToken)+PumpMessages(?) x 2

Followed by on retry:

Grpc.Core.RpcException: Status(StatusCode="Unavailable", Detail="Error starting gRPC call. HttpRequestException: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond. (192.168.129.51:2113) SocketException: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.", DebugException="[System.Net](http://system.net/).Http.HttpRequestException: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond. (192.168.129.51:2113)
 ---> [System.Net](http://system.net/).Sockets.SocketException (10060): A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.
   at [System.Net](http://system.net/).Sockets.Socket.AwaitableSocketAsyncEventArgs.ThrowException(SocketError error, CancellationToken cancellationToken)
   at [System.Net](http://system.net/).Sockets.Socket.AwaitableSocketAsyncEventArgs.System.Threading.Tasks.Sources.IValueTaskSource.GetResult(Int16 token)
   at [System.Net](http://system.net/).Sockets.Socket.<ConnectAsync>g__WaitForConnectWithCancellation|281_0(AwaitableSocketAsyncEventArgs saea, ValueTask connectTask, CancellationToken cancellationToken)
   at [System.Net](http://system.net/).Http.HttpConnectionPool.ConnectToTcpHostAsync(String host, Int32 port, HttpRequestMessage initialRequest, Boolean async, CancellationToken cancellationToken)
   --- End of inner exception stack trace ---
   at [System.Net](http://system.net/).Http.HttpConnectionPool.ConnectToTcpHostAsync(String host, Int32 port, HttpRequestMessage initialRequest, Boolean async, CancellationToken cancellationToken)
   at [System.Net](http://system.net/).Http.HttpConnectionPool.ConnectAsync(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
   at [System.Net](http://system.net/).Http.HttpConnectionPool.AddHttp2ConnectionAsync(QueueItem queueItem)
   at System.Threading.Tasks.TaskCompletionSourceWithCancellation`1.WaitWithCancellationAsync(CancellationToken cancellationToken)
   at [System.Net](http://system.net/).Http.HttpConnectionPool.HttpConnectionWaiter`1.WaitForConnectionAsync(Boolean async, CancellationToken requestCancellationToken)
   at [System.Net](http://system.net/).Http.HttpConnectionPool.SendWithVersionDetectionAndRetryAsync(HttpRequestMessage request, Boolean async, Boolean doRequestAuth, CancellationToken cancellationToken)
   at [System.Net](http://system.net/).Http.DiagnosticsHandler.SendAsyncCore(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
   at [System.Net](http://system.net/).Http.RedirectHandler.SendAsync(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
   at [System.Net](http://system.net/).Http.HttpClient.<SendAsync>g__Core|83_0(HttpRequestMessage request, HttpCompletionOption completionOption, CancellationTokenSource cts, Boolean disposeCts, CancellationTokenSource pendingRequestsCts, CancellationToken originalCancellationToken)
   at [Grpc.Net](http://grpc.net/).Client.Internal.GrpcCall`2.RunCall(HttpRequestMessage request, Nullable`1 timeout)")
   at async Task<bool> EventStore.Client.Interceptors.TypedExceptionInterceptor+AsyncStreamReader<TResponse>.MoveNext(CancellationToken cancellationToken)
   at async IAsyncEnumerable<T> EventStore.Client.AsyncStreamReaderExtensions.ReadAllAsync<T>(IAsyncStreamReader<T> reader, CancellationToken cancellationToken)+MoveNext()
   at async EventStore.Client.EventStoreClient+ReadStreamResult(Func<CancellationToken, Task<CallInvoker>> selectCallInvoker, ReadReq request, EventStoreClientSettings settings, TimeSpan? deadline, UserCredentials userCredentials, CancellationToken cancellationToken)+PumpMessages(?) x 2

In one or two entries we also got the following on a retry after Grpc.Core.RpcException: Status(StatusCode="Unavailable", Detail="Server not ready"):

EventStore.Client.WrongExpectedVersionException: Append failed due to WrongExpectedVersion. Stream: checkout-e845b701-1589-46e2-ae9a-8a697e9ee552, Expected version: 8, Actual version: 12
   at IWriteResult EventStore.Client.WriteResultExtensions.OptionallyThrowWrongExpectedVersionException(IWriteResult writeResult, EventStoreClientOperationOptions options)
   at async Task<IWriteResult> EventStore.Client.EventStoreClient.AppendToStreamAsync(string streamName, StreamRevision expectedRevision, IEnumerable<EventData> eventData, Action<EventStoreClientOperationOptions> configureOperationOptions, TimeSpan? deadline, UserCredentials userCredentials, CancellationToken cancellationToken)

As pointed out in the beginning, I'm not sure if this is an issue with ES or our code. Any help is greatly appreciated :slightly_smiling_face:

timothycoleman commented 11 months ago

Hi @smatsson retrying the operation should result in the connection being established to the new leader and succeeding. Locally I am sometimes having to retry twice (off hand I'm actually not certain why that is) - but are you seeing continue to fail after lots of retries?

Something that has changed since v5 is the clients in v5 could be configured to automatically issue lots of retries but in the new grpc clients the approach has been to leave retries up to the user. It's possible that we might refine this in the future though

smatsson commented 11 months ago

Hi @timothycoleman and thanks for the reply!

From a user perspective I assumed that the client would do automatic retries since we feed it all the nodes but on the other hand I can definitely see the complexity that this brings to the client code. Doing our own retries is OK with us 👍

From our logs it seems that our retry does not connect to the new leader. I can only guess that this is because the exception isn't threated as a "node is down" kind of exception, so potentially an additional retry here would solve the issue? Are there any other inputs to how the client determines which node to connect to that we need to take into consideration?

timothycoleman commented 11 months ago

I looked a bit more carefully into why it might fail twice. Essentially when it loses the connection to the leader it starts picking nodes from the connection string to ask what nodes are in the cluster and which is the leader. it can be the case that the node it picks hasn't realised that the leader has gone down yet, resulting in the client attempting to connect again to the old leader. I expect we can refine this process, but it does stabilize on the new leader.

Since you're using the client as a singleton (which is good), you should find that either the retry or subsequent calls to your Append(...) method start succeeding. If you're seeing that the subsequent calls to Append start succeding, then its likely that just retrying some more would allow the failed write to succeed. If, however, the subsequent calls to Append all continue to fail then there is something going wrong that we haven't identified yet.

Reconnection is triggered on a NotLeaderException and also on a RpcException with StatusCode Unavailable, so it looks like it should be reconnecting

smatsson commented 11 months ago

Thank you very much for investigating further. I put together a small console app to try how the reconnection occurs when doing retries and as you pointed out, it seems to reconnect to the new leader once the cluster is done with leader election. This can of course vary depending on network latency between the nodes etc but it seems in my test that the new leader is connected to sooner or later (most often sooner). In our real code we opted to do three retries at delays of 500 ms, 2 sec and 5 sec.

A friendly request would be to put in the docs somewhere that we need to do our own retries as this wasn't clear.

Once again, thank you for your help and time!

timothycoleman commented 11 months ago

You're welcome! and thanks for the feedback - we're actually just beginning a review of the documentation in general so i'll make sure this gets passed on