redis / lettuce

Advanced Java Redis client for thread-safe sync, async, and reactive usage. Supports Cluster, Sentinel, Pipelining, and codecs.
https://lettuce.io
MIT License
5.39k stars 970 forks source link

Connection remains in INACTIVE state for prolonged period of time. #2930

Open himanshu0791 opened 3 months ago

himanshu0791 commented 3 months ago

Bug Report

I noticed that some of the cached write connection from getWriteConnection(slot) from PooledClusterConnectionProvider.java remain in inactive state for indefinite period of time. This implementation is different from getReadConnection(slot) where we check if a connection is active before returning it to the caller.

Current Behavior

Connections remains in inactive state leading to Currently not connected. Commands are rejected for all requests based on config rejectCommandsWhenInactive.

In order to rule out any infra related issues where a netty channel never becomes active marking a connection as inactive, I tried Jedis client in parallel which do not run into any connection related issues. This is primarily to see if there are variables outside of the client affecting it. I tried versions 5.3.7, 6.2.2 and they both observed same issue.

Possible Solution

I made some changes in the getWriteConnection(slot) similar to read path and that effectively fixed the issue. Essentially whenever a node connection is requested, I check for the status and return a new one instead if its marked as inative. I understand this is a bit inefficient in the sense that actual connections which may become active as also closed as part of this code fix. below are the code snippets of the code I have modified.

From PooledClusterConnectionProvider.java

        private CompletableFuture<StatefulRedisConnection<K, V>> getWriteConnection(int slot) {

        CompletableFuture<StatefulRedisConnection<K, V>> writer;// avoid races when reconfiguring partitions.
        synchronized (stateLock) {
            writer = writers[slot];
        }

        if (writer == null) {
        // writer instance is already completed. We can call get() without any blocking.
        if (writer == null || !writer.join().isOpen()) {
            synchronized (stateLock) {
                // reset slot connection so a new one can be saved. Connection if open, will be closed by
                // the underlying async connection provider.
                writers[slot] = null;
            }
            RedisClusterNode master = partitions.getMasterBySlot(slot);
            if (master == null) {
                clusterEventListener.onUncoveredSlot(slot);
                return Futures.failed(new PartitionSelectorException("Cannot determine a partition for slot " + slot + ".",
                        partitions.clone()));
            }

            // Use always host and port for slot-oriented operations. We don't want to get reconnected on a different
            // host because the nodeId can be handled by a different host.
            RedisURI uri = master.getUri();
            ConnectionKey key = new ConnectionKey(ConnectionIntent.WRITE, uri.getHost(), uri.getPort());

            ConnectionFuture<StatefulRedisConnection<K, V>> future = getConnectionAsync(key);

            return future.thenApply(connection -> {

                synchronized (stateLock) {
                    if (writers[slot] == null) {
                        writers[slot] = CompletableFuture.completedFuture(connection);
                    }
                }

                return connection;
            }).toCompletableFuture();
        }

        return writer;
    }

   protected ConnectionFuture<StatefulRedisConnection<K, V>> getConnectionAsync(ConnectionKey key) {

        ConnectionFuture<StatefulRedisConnection<K, V>> connectionFuture = connectionProvider.getConnection(key);
        ConnectionFuture<StatefulRedisConnection<K, V>> connectionFuture = connectionProvider.getConnection(key,
                StatefulConnection::isOpen, StatefulConnection::close);
        CompletableFuture<StatefulRedisConnection<K, V>> result = new CompletableFuture<>();

        connectionFuture.handle((connection, throwable) -> {

            if (throwable != null) {

                result.completeExceptionally(
                        RedisConnectionException.create(connectionFuture.getRemoteAddress(), Exceptions.bubble(throwable)));
            } else {
                result.complete(connection);
            }

            return null;
        });

        return ConnectionFuture.from(connectionFuture.getRemoteAddress(), result);
    }

    public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(ConnectionIntent connectionIntent,
           String host, int port) {

        try {
            beforeGetConnection(connectionIntent, host, port);

            return connectionProvider.getConnection(new ConnectionKey(connectionIntent, host, port)).toCompletableFuture();
            return connectionProvider.getConnection(new ConnectionKey(connectionIntent, host, port),
                            StatefulConnection::isOpen, StatefulConnection::close)
                    .toCompletableFuture();
        } catch (RedisException e) {
            throw e;
        } catch (RuntimeException e) {
            throw new RedisException(e);
        }
    }

From AsyncConnectionProvider.java

     public F getConnection(K key, Predicate<T> isActive, Consumer<T> closeConnection) {
        return getSynchronizer(key, isActive, closeConnection).getConnection();
    }

    private Sync<K, T, F> getSynchronizer(K key, Predicate<T> isActive, Consumer<T> closeConnection) {

        if (closed) {
            throw new IllegalStateException("ConnectionProvider is already closed");
        }

        Sync<K, T, F> sync = connections.get(key);

        if (sync != null) {
            if (sync.isInProgress() || (sync.isComplete() && isActive.test(sync.connection))) {
                return sync;
            }
            // closing to avoid leak in case connection is deactivated and not closed.
            closeConnection.accept(sync.connection);
        }
        // reset key so a new connection can be cached.
        connections.remove(key);

        AtomicBoolean atomicBoolean = new AtomicBoolean();

        sync = connections.computeIfAbsent(key, connectionKey -> {

            Sync<K, T, F> createdSync = new Sync<>(key, connectionFactory.apply(key));

            if (closed) {
                createdSync.cancel();
            }

            return createdSync;
        });

        if (atomicBoolean.compareAndSet(false, true)) {

            sync.getConnection().whenComplete((c, t) -> {

                if (t != null) {
                    connections.remove(key);
                }
            });
        }

        return sync;
    }

   //Added a new method   
   private boolean isInProgress() {
       return PHASE.get(this) == PHASE_IN_PROGRESS;
   }  

    protected CompletableFuture<StatefulRedisConnection<K, V>> getConnection(RedisNodeDescription redisNodeDescription) {

        RedisURI uri = redisNodeDescription.getUri();

        return connectionProvider.getConnection(toConnectionKey(uri), StatefulConnection::isOpen, StatefulConnection::close).toCompletableFuture();
    }  

Additional context

We baked this fix for over a month on an industry standard code with QPS in order of millions without any issues. Logging did show us that there were multiple instances where this fix helped a rquest go through instead of failing with above mentioned exception.

I would like your thoughts on this and if this is something we can turn into a pull request.

himanshu0791 commented 3 months ago

@mp911de Appreciate your comment on this.