redis / lettuce

Advanced Java Redis client for thread-safe sync, async, and reactive usage. Supports Cluster, Sentinel, Pipelining, and codecs.
https://lettuce.io
MIT License
5.42k stars 979 forks source link

SimpleBatcher apparent deadlock #2196

Open desert-tear opened 2 years ago

desert-tear commented 2 years ago

I am using version 6.2.0.RELEASE. I would like to ask about the Batch Execution using asynchronous commands like wiki.

@BatchSize(50)
interface StringCommands extends Commands {
    RedisFuture<String> get(String key, CommandBatching batching);
}

StringCommands commands = …
commands.get("key1", CommandBatching.queue());
commands.get("key2", CommandBatching.flush()); 

If the above codes run with multi threads, sometimes RedisFuture.get() will block for ever, actually some commands are not send to the Redis Server. It might caused by io.lettuce.core.dynamic.SimpleBatcher#flush(boolean), "while (flushing.compareAndSet(false, true))".

Thank you!

mp911de commented 2 years ago

Can you provide a reproducer that can recreate the problem?

desert-tear commented 2 years ago
// redis server 6.2
// lettuce 6.2.0.RELEASE
class 1:
import io.lettuce.core.RedisFuture;
import io.lettuce.core.dynamic.Commands;
import io.lettuce.core.dynamic.batch.CommandBatching;

public interface BatchCommands extends Commands {
    RedisFuture<Long> pfcount(String key, CommandBatching batching);
}

class 2:
import io.lettuce.core.RedisFuture;
import io.lettuce.core.dynamic.batch.CommandBatching;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class BatchOperations {
    private BatchCommands batchCommands;

    public BatchOperations(BatchCommands batchCommands) {
        this.batchCommands = batchCommands;
    }

    public List<Long> mpfcount(String... keys) {
        // check keys ...
        List<RedisFuture<Long>> futures = new ArrayList<>(keys.length);
        int len = keys.length - 1;
        if (len > 0) {
            for (int i = 0; i < len; i++) {
                futures.add(batchCommands.pfcount(keys[i], CommandBatching.queue()));
            }
        }
        futures.add(batchCommands.pfcount(keys[len], CommandBatching.flush()));
        return this.get(futures);
    }

    private <T> List<T> get(List<RedisFuture<T>> futures) {
        List<T> list = new ArrayList<>(futures.size());
        try {
            for (RedisFuture<T> future : futures) {
                list.add(future.get());
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return list;
    }
}

class 3:
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.dynamic.RedisCommandFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MainTest {
    public static void main(String[] args) {
        RedisClusterClient redisClient = RedisClusterClient.create(RedisURI.create("host", 6380));
        RedisCommandFactory redisCommandFactory = new RedisCommandFactory(redisClient.connect());
        BatchCommands batchCommands = redisCommandFactory.getCommands(BatchCommands.class);
        BatchOperations batchOperations = new BatchOperations(batchCommands);
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        final int keyCount = 30;
        for (int i = 0; i < 2; i++) {
            int finalI = i;
            threadPool.execute(() -> {
                List<String> keys = new ArrayList<>(keyCount);
                for (int j = 1; j <= keyCount; j++) {
                    keys.add("key" + j + finalI);
                }
                List<Long> values = batchOperations.mpfcount(keys.toArray(new String[keyCount]));
                System.out.println(String.format("params:%s, values:%s", keys, values));
            });
        }
        threadPool.shutdown();
    }
}