scylladb / java-driver

ScyllaDB Java Driver for ScyllaDB and Apache Cassandra, based on the DataStax Java Driver
Apache License 2.0
61 stars 37 forks source link

Calling `session.refreshSchema()` may hang #343

Open Bouncheck opened 2 weeks ago

Bouncheck commented 2 weeks ago

It seems that in some cases refreshSchema() can hang the thread calling it and not recover. Easy way to trigger this is to call it when connection to the cluster was just lost. It is expected that the refresh won't succeed in such case, but I'm assuming it should not hang the thread.

Steps to reproduce

Create the following test method (it's easier as a test because reproducer uses test-infra dependency):

  @Test
  public void reproducer() {
    DriverConfigLoader loader =
        new DefaultProgrammaticDriverConfigLoaderBuilder()
            .withBoolean(TypedDriverOption.RECONNECT_ON_INIT.getRawOption(), true)
            .withStringList(
                TypedDriverOption.CONTACT_POINTS.getRawOption(),
                Collections.singletonList("127.0.1.1:9042"))
            .build();

    CqlSessionBuilder builder = new CqlSessionBuilder().withConfigLoader(loader);
    CqlSession session = null;
    Collection<Node> nodes;
    int counter = 0;
    while (counter < 2) {
      counter++;
      try (CcmBridge ccmBridge =
               CcmBridge.builder().withNodes(3).withIpPrefix("127.0." + counter + ".").build()) {
        ccmBridge.create();
        ccmBridge.start();
        if (session == null) {
          session = builder.build();
        }
        long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
        while (System.currentTimeMillis() < endTime) {
          try {
            nodes = session.getMetadata().getNodes().values();
            int upNodes = 0;
            for (Node node : nodes) {
              if (node.getUpSinceMillis() > 0) {
                upNodes++;
              }
            }
            if (upNodes == 3) {
              break;
            }
            System.out.println("Before refreshSchema call (n==" + counter +")");
            session.refreshSchema();
            System.out.println("#   After refreshSchema call (n==" + counter +")");
            Thread.sleep(1000);
          } catch (InterruptedException e) {
            break;
          }
        }
      }
    }
  } 

Add following logger to logback-test.xml to see the exception:

  <logger name="com.datastax.oss.driver.internal.core.metadata" level="DEBUG"/>

And run the method

Result

The test will hang at some point at refreshSchema() call. The following exception should be visible in the logs

12:06:23.280 [main] INFO  c.d.o.d.api.testinfra.ccm.CcmBridge - Executing: [ccm, remove, --config-dir=/tmp/ccm447078114819940528]
12:06:23.586 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(CLOSED, Node(endPoint=/127.0.1.2:9042, hostId=ef860132-71e5-4d28-b89b-77ce6003e37f, hashCode=7811d31e))
12:06:23.587 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(RECONNECTION_STARTED, Node(endPoint=/127.0.1.2:9042, hostId=ef860132-71e5-4d28-b89b-77ce6003e37f, hashCode=7811d31e))
12:06:23.588 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(CLOSED, Node(endPoint=/127.0.1.2:9042, hostId=ef860132-71e5-4d28-b89b-77ce6003e37f, hashCode=7811d31e))
12:06:23.588 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Transitioning Node(endPoint=/127.0.1.2:9042, hostId=ef860132-71e5-4d28-b89b-77ce6003e37f, hashCode=7811d31e) UP=>DOWN (because it was reconnecting and lost its last connection)
12:06:23.594 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(CLOSED, Node(endPoint=/127.0.1.3:9042, hostId=d6fd5558-7343-4368-9b90-9448a66e57a1, hashCode=2bd8a170))
12:06:23.595 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(RECONNECTION_STARTED, Node(endPoint=/127.0.1.3:9042, hostId=d6fd5558-7343-4368-9b90-9448a66e57a1, hashCode=2bd8a170))
12:06:23.595 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(CLOSED, Node(endPoint=/127.0.1.3:9042, hostId=d6fd5558-7343-4368-9b90-9448a66e57a1, hashCode=2bd8a170))
12:06:23.596 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Transitioning Node(endPoint=/127.0.1.3:9042, hostId=d6fd5558-7343-4368-9b90-9448a66e57a1, hashCode=2bd8a170) UP=>DOWN (because it was reconnecting and lost its last connection)
12:06:23.596 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(CLOSED, Node(endPoint=/127.0.1.1:9042, hostId=2e908145-3172-46e6-9206-a27583e1f867, hashCode=3a40b8e2))
12:06:23.596 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(RECONNECTION_STARTED, Node(endPoint=/127.0.1.1:9042, hostId=2e908145-3172-46e6-9206-a27583e1f867, hashCode=3a40b8e2))
12:06:23.596 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(CLOSED, Node(endPoint=/127.0.1.1:9042, hostId=2e908145-3172-46e6-9206-a27583e1f867, hashCode=3a40b8e2))
12:06:23.597 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Processing ChannelEvent(CLOSED, Node(endPoint=/127.0.1.1:9042, hostId=2e908145-3172-46e6-9206-a27583e1f867, hashCode=3a40b8e2))
12:06:23.597 [s0-admin-1] DEBUG c.d.o.d.i.c.m.NodeStateManager - [s0] Transitioning Node(endPoint=/127.0.1.1:9042, hostId=2e908145-3172-46e6-9206-a27583e1f867, hashCode=3a40b8e2) UP=>DOWN (because it was reconnecting and lost its last connection)
12:06:24.649 [main] INFO  c.d.o.d.api.testinfra.ccm.CcmBridge - Executing: [ccm, create, ccm_1, -i, 127.0.2., -n, 3:0, -v, release:6.0.1, --scylla, --config-dir=/tmp/ccm7441250503244669492]
12:06:25.089 [Exec Stream Pumper] INFO  c.d.o.d.api.testinfra.ccm.CcmBridge - ccmout> Current cluster is now: ccm_1
12:06:25.154 [main] INFO  c.d.o.d.api.testinfra.ccm.CcmBridge - Executing: [ccm, updateconf, auto_snapshot:false, --config-dir=/tmp/ccm7441250503244669492]
12:06:25.568 [main] INFO  c.d.o.d.api.testinfra.ccm.CcmBridge - Executing: [ccm, start, --wait-for-binary-proto, --wait-other-notice, --config-dir=/tmp/ccm7441250503244669492]
Before refreshSchema call (n==2)
12:06:32.124 [s0-admin-0] DEBUG c.d.o.d.i.c.metadata.MetadataManager - [s0] Starting schema refresh
12:06:32.124 [s0-admin-0] DEBUG c.d.o.d.i.c.m.SchemaAgreementChecker - [s0] Checking schema agreement
12:06:32.126 [s0-io-3] DEBUG c.d.o.d.i.c.m.SchemaAgreementChecker - [s0] Error while checking schema agreement, completing now (false)
java.util.concurrent.CompletionException: io.netty.channel.StacklessClosedChannelException
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1102)
    at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)
    at java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1034)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    at com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler.setFinalError(AdminRequestHandler.java:206)
    at com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler.onWriteComplete(AdminRequestHandler.java:150)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
    at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:629)
    at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:118)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:999)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:860)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:889)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
    at io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1015)
    at io.netty.channel.AbstractChannel.write(AbstractChannel.java:301)
    at com.datastax.oss.driver.internal.core.channel.DefaultWriteCoalescer$Flusher.runOnEventLoop(DefaultWriteCoalescer.java:102)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:750)
Caused by: io.netty.channel.StacklessClosedChannelException: null
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)