etcd-io / jetcd

etcd java client
Apache License 2.0
1.08k stars 314 forks source link

Watcher stop watching after few watching few changes #1379

Open nazmul-prince opened 3 days ago

nazmul-prince commented 3 days ago

Versions

Describe the bug I'm initializing a etcd client like the following: this.etcdClient = Client.builder().endpoints(hosts).executorService(taskExecutor).build(); and then initializing a watcher on a key prefix e.g. "/messages" only on startup like the following:

        Consumer<Throwable> onError = e -> {
            log.error("error ");
        };
        log.info("getting watcher client");
        Watch watchClient = etcdClient.getWatchClient();
        watcher = watchClient.watch(ByteSequence.from(key.getBytes(StandardCharsets.UTF_8)),
                watchOption,
                consumer,
                onError);
        log.info("started watching");

Here is my consumer:

final Consumer<WatchResponse> consumer = watchResponse -> {
            boolean anyMatch = watchResponse.getEvents()
                    .stream()
                    .anyMatch(watchEvent -> Objects.equals(watchEvent.getEventType(), WatchEvent.EventType.PUT)
                            || Objects.equals(watchEvent.getEventType(), WatchEvent.EventType.DELETE));

            if(anyMatch) {
                log.info("reload messages");
                CompletableFuture.runAsync(() -> {
                    reloadMessages();
                }, etcdLongBlockingThreadPoolTaskExecutor);
            }
        };

Now after starting the watcher, it successfully listens for few changes like 10 - 15, then its stops watching. I'm getting the following erorr in onError: o.etcd.jetcd.common.exception.EtcdException: gRPC message exceeds maximum size 4194304: 6619136

then I've increased the size like the following: this.etcdClient = Client.builder().endpoints(hosts).maxInboundMessageSize(8 * 1024 * 1024).executorService(taskExecutor).build(); But nothing happened, the watcher stops watching after 10 - 15 changes, Interestingly its not showing any error now. Note: my with prefix /messages I've only 4-6 keys. And I've checked the watcher.isClose() at that point, its returning false.

`To Reproduce Just try to to set up a watcher and then watch for 10 -15 changes

Expected behavior The watcher should work properly once it is initiated.

Any help will be greatly appreciated, as its a blocker for our applicatoin right now, thanks in advance.

nazmul-prince commented 2 days ago

This is actually working, the issue was the logs are not being printed in the console after certain time of watching changes.

nazmul-prince commented 2 days ago

I'm reopening this issue again as I'm getting the following error time to time when I'm reloading all the key values from etcd if any key changes from watcher.watch:

java.util.concurrent.CompletionException: io.grpc.StatusRuntimeException: INTERNAL: Encountered end-of-stream mid-frame
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:636) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[na:na]
    at dev.failsafe.spi.FailsafeFuture.completeResult(FailsafeFuture.java:101) ~[failsafe-3.3.2.jar:3.3.2]
    at dev.failsafe.AsyncExecutionImpl.complete(AsyncExecutionImpl.java:153) ~[failsafe-3.3.2.jar:3.3.2]
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179) ~[na:na]
    at dev.failsafe.internal.RetryPolicyExecutor.lambda$handleAsync$5(RetryPolicyExecutor.java:155) ~[failsafe-3.3.2.jar:3.3.2]
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[na:na]
    at dev.failsafe.internal.RetryPolicyExecutor.lambda$handleAsync$6(RetryPolicyExecutor.java:150) ~[failsafe-3.3.2.jar:3.3.2]
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179) ~[na:na]
    at dev.failsafe.Functions.lambda$toAsync$5(Functions.java:202) ~[failsafe-3.3.2.jar:3.3.2]
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179) ~[na:na]
    at dev.failsafe.Functions.lambda$getPromiseOfStage$2(Functions.java:152) ~[failsafe-3.3.2.jar:3.3.2]
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[na:na]
    at io.vertx.core.Future.lambda$toCompletionStage$3(Future.java:603) ~[vertx-core-4.5.8.jar:4.5.8]
    at io.vertx.core.impl.future.FutureImpl$4.onFailure(FutureImpl.java:188) ~[vertx-core-4.5.8.jar:4.5.8]
    at io.vertx.core.impl.future.FutureBase.emitFailure(FutureBase.java:81) ~[vertx-core-4.5.8.jar:4.5.8]
    at io.vertx.core.impl.future.FutureImpl.tryFail(FutureImpl.java:278) ~[vertx-core-4.5.8.jar:4.5.8]
    at io.vertx.grpc.stub.ClientCalls$1.onError(ClientCalls.java:92) ~[vertx-grpc-4.5.8.jar:4.5.8]
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481) ~[grpc-stub-1.64.0.jar:1.64.0]
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.vertx.grpc.VertxChannelBuilder.lambda$null$0(VertxChannelBuilder.java:308) ~[vertx-grpc-4.5.8.jar:4.5.8]
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:279) ~[vertx-core-4.5.8.jar:4.5.8]
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:261) ~[vertx-core-4.5.8.jar:4.5.8]
    at io.vertx.grpc.VertxChannelBuilder.lambda$build$1(VertxChannelBuilder.java:308) ~[vertx-grpc-4.5.8.jar:4.5.8]
    at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:102) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:95) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closedInternal(ClientCallImpl.java:736) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closed(ClientCallImpl.java:680) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.RetriableStream$4.run(RetriableStream.java:843) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:94) ~[grpc-api-1.64.0.jar:1.64.0]
    at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:126) ~[grpc-api-1.64.0.jar:1.64.0]
    at io.grpc.internal.RetriableStream.safeCloseMasterListener(RetriableStream.java:838) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.RetriableStream.access$2200(RetriableStream.java:55) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.RetriableStream$Sublistener.closed(RetriableStream.java:1041) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.ForwardingClientStreamListener.closed(ForwardingClientStreamListener.java:34) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.InternalSubchannel$CallTracingTransport$1$1.closed(InternalSubchannel.java:708) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.AbstractClientStream$TransportState.closeListener(AbstractClientStream.java:465) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:439) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:404) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.AbstractClientStream$TransportState.deframerClosed(AbstractClientStream.java:276) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.Http2ClientStreamTransportState.deframerClosed(Http2ClientStreamTransportState.java:32) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.MessageDeframer.close(MessageDeframer.java:234) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.MessageDeframer.closeWhenComplete(MessageDeframer.java:192) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.AbstractStream$TransportState.closeDeframer(AbstractStream.java:232) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:448) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:404) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.AbstractClientStream$TransportState.inboundTrailersReceived(AbstractClientStream.java:387) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.internal.Http2ClientStreamTransportState.transportTrailersReceived(Http2ClientStreamTransportState.java:185) ~[grpc-core-1.64.0.jar:1.64.0]
    at io.grpc.netty.NettyClientStream$TransportState.transportHeadersReceived(NettyClientStream.java:348) ~[grpc-netty-1.64.0.jar:1.64.0]
    at io.grpc.netty.NettyClientHandler.onHeadersRead(NettyClientHandler.java:383) ~[grpc-netty-1.64.0.jar:1.64.0]
    at io.grpc.netty.NettyClientHandler.access$1300(NettyClientHandler.java:95) ~[grpc-netty-1.64.0.jar:1.64.0]
    at io.grpc.netty.NettyClientHandler$FrameListener.onHeadersRead(NettyClientHandler.java:941) ~[grpc-netty-1.64.0.jar:1.64.0]
    at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:435) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:350) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onHeadersRead(Http2InboundFrameLogger.java:54) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.handler.codec.http2.DefaultHttp2FrameReader$2.processFragment(DefaultHttp2FrameReader.java:475) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:483) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:247) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:164) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:39) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:186) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:391) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:451) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
    at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: io.grpc.StatusRuntimeException: INTERNAL: Encountered end-of-stream mid-frame
    at io.grpc.Status.asRuntimeException(Status.java:533) ~[grpc-api-1.64.0.jar:1.64.0]
    ... 69 common frames omitted

Then after sometime this errors goes away, and comeback again after sometime. @lburgazzoli do you have any idea? Thanks in advance