msemys / esjc

EventStore Java Client
MIT License
108 stars 27 forks source link

CatchUpSubscription reconnect failure #29

Closed dpolivaev closed 5 years ago

dpolivaev commented 7 years ago

Consider the following test program

import java.util.concurrent.TimeoutException;

import com.github.msemys.esjc.CatchUpSubscription;
import com.github.msemys.esjc.CatchUpSubscriptionListener;
import com.github.msemys.esjc.EventStore;
import com.github.msemys.esjc.EventStoreBuilder;
import com.github.msemys.esjc.ResolvedEvent;
import com.github.msemys.esjc.SubscriptionDropReason;

public class EsTest {
    static public void main(String... strings) throws InterruptedException, TimeoutException {
        final EventStore eventstore = EventStoreBuilder.newBuilder()
                .singleNodeAddress("127.0.0.1", 1113)
                .userCredentials("admin", "changeit")
                .build();

        final CatchUpSubscriptionListener catchUpSubscriptionListener = new CatchUpSubscriptionListener() {
            @Override
            public void onLiveProcessingStarted(CatchUpSubscription subscription) {
                System.out.println("Live processing started!");
            }

            @Override
            public void onEvent(CatchUpSubscription subscription, ResolvedEvent event) {
                System.out.println(event.originalEvent().eventType);
            }

            @Override
            public void onClose(CatchUpSubscription subscription, SubscriptionDropReason reason, Exception exception) {
                System.out.println("Subscription closed: " + reason);
                eventstore.subscribeToStreamFrom("foo", 0L, this);

            }
        };
        CatchUpSubscription catchupSubscription = eventstore.subscribeToStreamFrom("foo", 0L, catchUpSubscriptionListener);
        Thread.sleep(100000);
        catchupSubscription.close();
    }
}

If during its execution EventStore v 4.0.2 is stopped and restarted three times, it produces the following output:

Live processing started!
Subscription closed: ConnectionClosed
Live processing started!
Live processing started!
Subscription closed: ConnectionClosed
Subscription closed: ConnectionClosed
Live processing started!
Live processing started!
Live processing started!
Live processing started!
Subscription closed: ConnectionClosed
Subscription closed: ConnectionClosed
Subscription closed: ConnectionClosed
Subscription closed: ConnectionClosed

It happens because the subsription listener is reconnected automatically using CatchUpSubscription.reconnectionHook after the connection is established. From the output you see that number of subscriptions doubles each time.

Unfortunately if line eventstore.subscribeToStreamFrom("foo", 0L, this); is removed from onClose , the client does not reconnect at all. After the EventStore is stopped the first time and the client outputs text Subscription closed: ConnectionClosed at the first time no futher text is output to the console when the server is restarted.

This behaviour does not change even if I add

                            .maxOperationRetries(-1)
                .maxReconnections(-1)

to client configuration.

I have not found any way to let the client restore the connection correctly (it means just one time) after the EventStore server is restarted. Could you please help me to fix it?

Used EventStore client version : 2.0.0 User EventStore server version: 4.0.2 The problem can be reproduced always on both Linux and Windows OS.

dpolivaev commented 7 years ago

The issue could be related to https://github.com/msemys/esjc/issues/28 , but I am not sure about it.

msemys commented 7 years ago

Catchup subscriptions already has resubscribe feature (automatically resubscribes on client reconnect), so you don't need to resubscribe it manually when client connection is closed.

it should be enough to update your listener code with:

...
@Override
public void onClose(CatchUpSubscription subscription, SubscriptionDropReason reason, Exception exception) {
  System.out.println("Subscription closed: " + reason);

  if (reason != SubscriptionDropReason.ConnectionClosed) {
    System.out.println("Resubscribing...");
    eventstore.subscribeToStreamFrom("foo", 0L, this);
  }
}
...

but my recommendation will be to improve subscription close handling this way:

...
@Override
public void onClose(CatchUpSubscription subscription, SubscriptionDropReason reason, Exception exception) {
  System.out.println("Subscription closed: " + reason);

  if ((reason != SubscriptionDropReason.ConnectionClosed &&
       reason != SubscriptionDropReason.UserInitiated) ||
       (exception instanceof SubscriptionBufferOverflowException)) {
    System.out.println("Resubscribing...");
    eventstore.subscribeToStreamFrom("foo", subscription.lastProcessedEventNumber(), this);
  }
}
...

now subscription will be resubscribed if drop reason is not connection close and it's not initiated by user (e.g.: catchupSubscription.close()) or it was closed due buffer overflow exception. The subscription will be resubscribed from the last processed position.

dpolivaev commented 7 years ago

Thank you very much for your response.

Unfortunately, the automatic resubscription does not work at all. It fails. It does not even try to reconnect. It was exactly my point.

The client does not reconnect to the server if line eventstore.subscribeToStreamFrom("foo", 0L, this); is either removed or enclosed by any of the if-conditions you suggested.

msemys commented 7 years ago

I see... it's something wrong either with client or server v4. I've just tested (es single-node):

I will look into this issue further.

dpolivaev commented 7 years ago

Thank you for checking it.

I think I could find a workaround: I have added to the main method this code:

        eventstore.addListener(new EventStoreListener() {
            @Override
            public void onEvent(Event event) {
                System.out.println(event.getClass().getSimpleName());
                if(event instanceof ClientDisconnected) 
                    eventstore.connect();
            }
        });

Alternatively the following could be called from onClose:

                if (reason != SubscriptionDropReason.UserInitiated) {
                    eventstore.connect();
                }

but I think the first one is better because we do not like connect to be called from each listener, do we?

Do you think you can fix it within a couple of days or rather weeks? Could you confirm one of the suggested workarounds to use until the problem is fixed?

dpolivaev commented 7 years ago

Could you also tell me if exception instanceof SubscriptionBufferOverflowException from your above code snippet can occur when reason == SubscriptionDropReason.ConnectionClosed or reason == SubscriptionDropReason.UserInitiated ? If not, we do not need this condition.

msemys commented 7 years ago

Due the lack of time I don't think the fix will be in a couple of days. The workaround probably will work mostly, but not always.

dpolivaev commented 7 years ago

We were going to use the client in the business system which should always work and recover.

Why will the workaround fail and when? Do you need my help to fix the problem? Should I try to investigate it?

dpolivaev commented 7 years ago

esjc 1.8.1 + es v3.8.1 and esjc 1.8.1 + es v3.9.4 does not work either, no automatic reconnect when server is restarted. Which configuration has worked for you?

msemys commented 7 years ago

You are right, after running the same test more times (esjc v1.8.1 + es v3.9.2) sometimes it ends with client disconnect. The issue exists in esjc v1 as well.

as temporary workaround - remove channel error handling https://github.com/msemys/esjc/blob/v1.8.x/src/main/java/com/github/msemys/esjc/EventStoreTcp.java#L139

dpolivaev commented 7 years ago

I have tested your suggestion based on code für v2.0.0 . I commented line .whenChannelError(EventStoreTcp.this::onChannelError) out Unfortunately it does not work reliable: after some number of restarts of EventStore server (usually less than 40) the subscription connection is lost and not restored again

dpolivaev commented 7 years ago

Could you please explain why the workaround suggested by me is not safe in your opinion? It seemed to work, but I need to be absolutely sure about it. Could you please help us and give me your recommendation how to handle the problem?

eventstore.addListener(new EventStoreListener() {
            @Override
            public void onEvent(Event event) {
                System.out.println(event.getClass().getSimpleName());
                if(event instanceof ClientDisconnected) 
                    eventstore.connect();
            }
        });
jezovuk commented 6 years ago

Any news regarding this?

@dpolivaev how did you resolve this in the end? With explicit eventstore.connect() in EventStoreListener / CatchUpSubscriptionListener#onClose() or some other way?

If I understand correctly, commenting-out the .whenChannelError(EventStoreTcp.this::onChannelError) line in EventStoreTcp implies re-building esjc? Does it have any up/down-sides compared to above listener-based approach?

jezovuk commented 6 years ago

Thought I should share our final solution, in case somebody stumbles upon this issue. Basically, we went with EventStoreListener implementation very similar to what @dpolivaev suggested. That takes care of re-establishing the connection. In addition, we have CatchUpSubscriptionListener for all our CatchUpSubscription with onClose similar to this:

@Override
public void onClose(CatchUpSubscription subscription, SubscriptionDropReason reason, Exception exception) {
    if (reason == SubscriptionDropReason.UserInitiated &&
            !(exception instanceof SubscriptionBufferOverflowException)) {
        logger.trace("Subscription to '{}' dropped (user initiated).", subscription.streamId, exception);
    }
    else if (reason == SubscriptionDropReason.ConnectionClosed) {
        logger.info("Subscription to '{}' dropped (connection closed). Automatic resubscribe upon reconnect is expected.",
                    subscription.streamId,
                    exception);
    }
    else {
        logger.error("Subscription to '{}' dropped unexpectedly! Reason: {}. Manual recovery required.",
                     subscription.streamId,
                     reason,
                     exception);
               ... shut-down related part of the system, clean-up etc ...
    }
}

So, basically, we reconnect upon each client disconnect (except on application shut-down). We then rely on that reconnect (and built-in resubscribe in CatchUpSubscription) for recovering subscriptions in case of closed connection. We do not attempt to recover subscriptions from various overflows etc. (never happened thus far, and probably useless without dynamic reconfiguration).

The only downside that I'm aware of is that all errors during catch-up phase (until live processing starts) cause subscription close with reason SubscriptionDropReason.CatchUpError, thus masking the real drop reason. In those cases, we abandon the resubscribe attempt and turn-off related part of the system (requiring manual intervantion). Large catch-up processing batches are super-rare and always supervised / controlled, so this works for us.

This approach solves #28 as well.

HTH

msemys commented 5 years ago

fixed in #44