etcd-io / jetcd

etcd java client
Apache License 2.0
1.11k stars 316 forks source link

While client closing, the election listener throws an exception #1409

Open sefamertkaya opened 1 month ago

sefamertkaya commented 1 month ago

Versions

Describe the bug

While etcd client is closing, the onError method of the listener that I registered to the observer in the election class is triggered. This problem only occurs when client closing. When I put a breakpoint, I saw that it was happening while the ConnectionManager was closing.

To Reproduce

Client:

this.client = Client.builder()
    .connectTimeout(Duration.ofSeconds(this.sessionTimeout))
    .endpoints(this.endpoints.toArray(new String[]{}))
    .build();

Observer:

this.election.observe(this.electionByteSequence, new ElectionListener());

ElectionListener

private static class ElectionListener implements Listener {

  @Override
  public void onNext(LeaderResponse response) {
    System.out.println("Leader lease id:" + response.getKv().getLease());
  }

  @Override
  public void onError(Throwable throwable) {
    throwable.printStackTrace();
  }

  @Override
  public void onCompleted() {
    System.out.println("Completed");
  }
}

Close

public void disconnect() {
  this.leaseKeepAliveClient.close();
  this.lease.revoke(this.leaseId);
  this.client.close();
}

Exception

io.etcd.jetcd.common.exception.EtcdException: Channel shutdownNow invoked
    at io.etcd.jetcd.common.exception.EtcdExceptionFactory.newEtcdException(EtcdExceptionFactory.java:35)
    at io.etcd.jetcd.common.exception.EtcdExceptionFactory.fromStatus(EtcdExceptionFactory.java:83)
    at io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException(EtcdExceptionFactory.java:79)
    at io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException(EtcdExceptionFactory.java:74)
    at io.etcd.jetcd.impl.ElectionImpl.lambda$observe$3(ElectionImpl.java:119)
    at io.vertx.grpc.stub.StreamObserverReadStream.onError(StreamObserverReadStream.java:44)
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at io.vertx.grpc.VertxChannelBuilder.lambda$null$0(VertxChannelBuilder.java:305)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:276)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:258)
    at io.vertx.grpc.VertxChannelBuilder.lambda$build$1(VertxChannelBuilder.java:305)
    at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:102)
    at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:95)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closedInternal(ClientCallImpl.java:750)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closed(ClientCallImpl.java:690)
    at io.grpc.internal.RetriableStream$4.run(RetriableStream.java:840)
    at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:94)
    at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:126)
    at io.grpc.internal.RetriableStream.safeCloseMasterListener(RetriableStream.java:835)
    at io.grpc.internal.RetriableStream.access$2200(RetriableStream.java:55)
    at io.grpc.internal.RetriableStream$Sublistener.closed(RetriableStream.java:1028)
    at io.grpc.internal.ForwardingClientStreamListener.closed(ForwardingClientStreamListener.java:34)
    at io.grpc.internal.InternalSubchannel$CallTracingTransport$1$1.closed(InternalSubchannel.java:691)
    at io.grpc.internal.AbstractClientStream$TransportState.closeListener(AbstractClientStream.java:458)
    at io.grpc.internal.AbstractClientStream$TransportState.access$400(AbstractClientStream.java:221)
    at io.grpc.internal.AbstractClientStream$TransportState$1.run(AbstractClientStream.java:441)
    at io.grpc.internal.AbstractClientStream$TransportState.deframerClosed(AbstractClientStream.java:278)
    at io.grpc.internal.Http2ClientStreamTransportState.deframerClosed(Http2ClientStreamTransportState.java:31)
    at io.grpc.internal.MessageDeframer.close(MessageDeframer.java:234)
    at io.grpc.internal.AbstractStream$TransportState.closeDeframer(AbstractStream.java:199)
    at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:444)
    at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:400)
    at io.grpc.netty.NettyClientHandler$6.visit(NettyClientHandler.java:791)
    at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.forEachActiveStream(DefaultHttp2Connection.java:1007)
    at io.netty.handler.codec.http2.DefaultHttp2Connection.forEachActiveStream(DefaultHttp2Connection.java:209)
    at io.grpc.netty.NettyClientHandler.forcefulClose(NettyClientHandler.java:782)
    at io.grpc.netty.NettyClientHandler.write(NettyClientHandler.java:347)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:895)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:863)
    at io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1010)
    at io.netty.channel.AbstractChannel.write(AbstractChannel.java:296)
    at io.grpc.netty.NettyClientTransport$6.run(NettyClientTransport.java:341)
    at io.grpc.netty.WriteQueue$RunnableCommand.run(WriteQueue.java:176)
    at io.grpc.netty.WriteQueue.flush(WriteQueue.java:128)
    at io.grpc.netty.WriteQueue.access$000(WriteQueue.java:35)
    at io.grpc.netty.WriteQueue$1.run(WriteQueue.java:47)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:1583)

Expected behavior

I think the oncomplete method should be triggered when closing. Maybe I did something wrong. Is there a different order for closing?

lburgazzoli commented 1 month ago

@sefamertkaya do you have any time to digg into this ?