etcd-io / jetcd

etcd java client
Apache License 2.0
1.08k stars 314 forks source link

Jetcd watcher is not able to reconnect when etcd leader goes down or when etcd cluster loses its quorum and comes back #1352

Open deekshith-n opened 2 months ago

deekshith-n commented 2 months ago

I am using jetcd library for connecting to etcd in java 8. I was trying a reconnecting mechanism whenever etcd goes down. We have 3 etcd pod cluster where we follow leader follower mechanism. When the etcd pod goes down where watcher is connected, the listener throws the exception asynchronously and there i am calling the same function to retry connecting watcher. The code works fine when a follower goes down that is watch is able to reconnect to available etcd pods. But when the leader etcd pod goes down or the etcd cluster loses quorum, the function keeps on retrying but is never able to reconnect. Please let me know how to fix this issue. Please find the code below. public void watchAndListen(HandlerWrapper<JsonObject> handler) { Watch.Listener listen = Watch.listener(watchHandler(handler), throwable -> { System.out.println("Exception in watch"+ throwable.getCause()); if (throwable instanceof EtcdException) { // Retry mechanism watchAndListen(handler); } }); Watch watchClient = etcdClient.getWatchClient(); watchClient.watch(storeKey, listen); }

To Reproduce Run the etcd cluster. Delete the leader pod. See the watchAndListen keeps on retrying.

Expected behavior Watcher should be able to reconnect to the etcd pods which are alive in every scenario.

Additional context I tried different approach. I tried closing the client and recreating new one. This fixed the issue. However it was throwing RejectedExecutionException when i closed the client.

Error in this case:

2022-05-14 18:40:32.813 ERROR 9972 --- [ault-executor-3] io.grpc.internal.SerializingExecutor     : Exception while executing runnable io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed@1c23d1fc

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@346ee392 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@1037e922[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) ~[na:1.8.0_222]
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) [na:1.8.0_222]
    at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326) ~[na:1.8.0_222]
    at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) ~[na:1.8.0_222]
    at com.google.common.util.concurrent.MoreExecutors$ScheduledListeningDecorator.schedule(MoreExecutors.java:548) ~[guava-20.0.jar:na]
    at io.etcd.jetcd.WatchImpl$WatcherImpl.reschedule(WatchImpl.java:297) ~[jetcd-core-0.5.7.jar:na]
    at io.etcd.jetcd.WatchImpl$WatcherImpl.handleError(WatchImpl.java:286) ~[jetcd-core-0.5.7.jar:na]
    at io.etcd.jetcd.WatchImpl$WatcherImpl.onError(WatchImpl.java:269) ~[jetcd-core-0.5.7.jar:na]
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:478) ~[grpc-stub-1.37.0.jar:1.37.0]
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[grpc-api-1.37.0.jar:1.37.0]
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[grpc-api-1.37.0.jar:1.37.0]
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[grpc-api-1.37.0.jar:1.37.0]
    at io.etcd.jetcd.ClientConnectionManager$AuthTokenInterceptor$1$1.onClose(ClientConnectionManager.java:395) ~[jetcd-core-0.5.7.jar:na]
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:553) ~[grpc-core-1.37.0.jar:1.37.0]
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:68) ~[grpc-core-1.37.0.jar:1.37.0]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:739) ~[grpc-core-1.37.0.jar:1.37.0]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:718) ~[grpc-core-1.37.0.jar:1.37.0]
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.37.0.jar:1.37.0]
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[grpc-core-1.37.0.jar:1.37.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_222]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_222]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_222]
lburgazzoli commented 2 months ago

I don't know if this of any help, but I would recommend to verify if the same behavior exists with the latest code

deekshith-n commented 2 months ago

Hey @lburgazzoli , Thanks this solved the issue. Is there any way we can know whether the watch connection is successful. I know the listener throws exception when there is a connection issue. But is there a way to get successful status from watcher/listener when the connection is successful?

lburgazzoli commented 2 months ago

this is a little bit tricky because as today, the underlying implementation creates an individual stream, for each watcher, but in the future I would love to be able to use a single stream so the concept of a connection is not really something that would make much sense.

Eventually this is something that can be done in general but I don't have much time.

Maybe it would be useful to know when an actual subscription actually succeed, for that I would really appreciated it you can do some research and provide a PR.

giri-vsr commented 2 months ago

@deekshith-n Maybe You use option WithCreatedNotify which will send you watchCreated event once it is connected. https://github.com/etcd-io/jetcd/pull/1187

deekshith-n commented 2 months ago

@giri-vsr Thanks for the suggestion. But i use jetcd version 0.7.5 where the above option you have mentioned is not available. But anyway i am able to add retry mechanism when watcher loses it connection when listener throws exception(I close the old watcher and create new one). But sometimes the watcher cannot reconnect when etcd leader pod goes down(Even if i use round robin as load balancer policy while creating client). Any solution for this issue?

deekshith-n commented 1 month ago

Hello @lburgazzoli , is there a way to get periodic notification as WatchResponse just to track the etcd revision? I know watchOption has something called withProgressNotify() but using that i am not getting any periodic response from it. Is there any code sample how to use it? Please suggest if there is a way.

lburgazzoli commented 1 month ago

I don't have time nowadays to digg into the issue so I woukd recommend to try to debug a little bit the code and provide a Pr with a reproducer so I can take a look