redis / jedis

Redis Java client
MIT License
11.88k stars 3.87k forks source link

subscribe is not working when the client cache is enabled #4023

Open qingdaoheze opened 8 hours ago

qingdaoheze commented 8 hours ago

Expected behavior

subscribe can work normally.

Actual behavior

Blocked forever and can't receive any message

Steps to reproduce:

Please create a reproducible case of your problem. Make sure that case repeats consistently and it's not random 1.Create an UnifiedJedis with the client cache 2.subscribe

Redis / Jedis Configuration

    private UnifiedJedis buildUnifiedJedis() {
        HostAndPort startNode = HostAndPort.from("localhost:6379");
        JedisClientConfig jedisClientConfig = DefaultJedisClientConfig.builder()
                .resp3()
                .build();
        CacheConfig cacheConfig = getCacheConfig();
        Cache cache = CacheFactory.getCache(cacheConfig);

       return new UnifiedJedis(startNode, jedisClientConfig, cache);
    }

Jedis version:

5.2.0

Redis version:

7.4.1

Java version:

JDK 17

qingdaoheze commented 8 hours ago

Complete reproduction code:

public class JedisSubscribeWithClientCacheTest {
    private static final Logger log = LoggerFactory.getLogger(JedisSubscribeWithClientCacheTest.class);
    private UnifiedJedis buildUnifiedJedis() {
        HostAndPort startNode = HostAndPort.from("localhost:6379");
        JedisClientConfig jedisClientConfig = DefaultJedisClientConfig.builder()
                .resp3()
                .build();
        CacheConfig cacheConfig = CacheConfig.builder()
                .maxSize(10000)
                .build();
        Cache cache = CacheFactory.getCache(cacheConfig);

       return new UnifiedJedis(startNode, jedisClientConfig, cache);
    }
    @Test
    public void pubsubWithClientCache() {
        try (UnifiedJedis client = buildUnifiedJedis()) {
            JedisPubSub jedisPubSub = new JedisPubSub() {
                @Override
                public void onMessage(String channel, String message) {
                    log.info("onMessage. channel:{}, message:{}", channel, message);
                }

                @Override
                public void onPMessage(String pattern, String channel, String message) {
                    log.info("onPMessage. pattern:{}, channel:{}, message:{}", pattern, channel, message);
                }

                @Override
                public void onSubscribe(String channel, int subscribedChannels) {
                    log.info("onSubscribe. channel:{}, subscribedChannels:{}", channel, subscribedChannels);
                }

                @Override
                public void onUnsubscribe(String channel, int subscribedChannels) {
                    log.info("onUnsubscribe. channel:{}, subscribedChannels:{}", channel, subscribedChannels);
                }

                @Override
                public void onPUnsubscribe(String pattern, int subscribedChannels) {
                    log.info("onPUnsubscribe. pattern:{}, subscribedChannels:{}", pattern, subscribedChannels);
                }

                @Override
                public void onPSubscribe(String pattern, int subscribedChannels) {
                    log.info("onPSubscribe. pattern:{}, subscribedChannels:{}", pattern, subscribedChannels);
                }

                @Override
                public void onPong(String pattern) {
                    log.info("onPong. pattern:{}", pattern);
                }
            };
            client.subscribe(jedisPubSub, "hello_world");
            System.in.read();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
qingdaoheze commented 7 hours ago

The thread stack trace is as below:

   sun.nio.ch.Net.poll line: not available [native method]
   sun.nio.ch.NioSocketImpl.park line: 186 
   sun.nio.ch.NioSocketImpl.park line: 195 
   sun.nio.ch.NioSocketImpl.implRead line: 319 
   sun.nio.ch.NioSocketImpl.read line: 355 
   sun.nio.ch.NioSocketImpl$1.read line: 808 
   java.net.Socket$SocketInputStream.read line: 966 
   java.io.InputStream.read line: 218 
   redis.clients.jedis.util.RedisInputStream.ensureFill line: 256 
   redis.clients.jedis.util.RedisInputStream.peek line: 50 
   redis.clients.jedis.Protocol.readPushes line: 242 
   redis.clients.jedis.Protocol.read line: 226 
   redis.clients.jedis.csc.CacheConnection.protocolRead line: 49 
   redis.clients.jedis.Connection.readProtocolWithCheckingBroken line: 392 
   redis.clients.jedis.Connection.getUnflushedObject line: 349 
   redis.clients.jedis.JedisPubSubBase.process line: 115 
   redis.clients.jedis.JedisPubSubBase.proceed line: 92 
   redis.clients.jedis.UnifiedJedis.subscribe line: 3833