redis / lettuce

Advanced Java Redis client for thread-safe sync, async, and reactive usage. Supports Cluster, Sentinel, Pipelining, and codecs.
https://lettuce.io
MIT License
5.41k stars 975 forks source link

Command interface - issues with async array results #2612

Closed nits3392 closed 2 weeks ago

nits3392 commented 9 months ago

Bug Report

Current Behavior

Using Redis in docker on localhost. Issuing command via extending Commands interaface:

@Command("BF.INSERT :filter CAPACITY :capacity ERROR :error ITEMS :items ")
    Flux<Boolean>  insert(@Param("filter") String filter, @Param("capacity") long capacity,
                                @Param("error") double error, @Param("item") String[] items);

when I convert the result of any command issued

Flux<Boolean> res= bloomCommand.insert("myBloomFilter", 10000, 0.01, idsArray). 

When I collect this value or convert to futures I get this exception.

Errors java.util.concurrent.CompletionException: 
   java.lang.UnsupportedOperationException: 
     io.lettuce.core.output.BooleanListOutput does not support set(boolean)

Redis version 7.2.x Lettuce Core 6.3.1 SpringBoot - 2.5.12

However when I use Mono from single item insert :

Mono<Boolean>  insert(@Param("filter") String filter, @Param("capacity") long capacity,
                                @Param("error") double error, @Param("item") String item);

the response is decoded correctly.

I also tried dispatch but for that to work I had to overload set(boolean) method. By default it also throws the same exception.

CommandArgs<String, String> args = new CommandArgs<>(StringCodec.UTF8)
                .addKey("myBloomFilter")
                .add("CAPACITY").add(10000)
                .add("ERROR").add(0.01)
                .add("ITEMS").add("item1").add("item2").add("item3");

class BFInsert implements ProtocolKeyword {
    @Override
    public byte[] getBytes() {
        return "BF.INSERT".getBytes();
    }

    @Override
    public String name() {
        return "hello";
    }
}
      RedisCommands<String, String> syncCommands = redisClient.connect().sync();
        List<Boolean> response = syncCommands.dispatch(new BFInsert(),
                new BooleanListOutput(StringCodec.UTF8),
                args);

        System.out.println("Response: " + response);

class BooleanListOutput extends CommandOutput<String, String, List<Boolean>> {
    private final List<Boolean> booleans = new ArrayList<>();

    public BooleanListOutput(RedisCodec<String, String> codec) {
        super(codec, new ArrayList<>());
    }

    @Override
    public void set(boolean value) {
        booleans.add(value);
    }

    @Override
    public void complete(int depth) {
        this.output = booleans;
    }
}
Stack trace ```java reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.UnsupportedOperationException: io.lettuce.core.output.BooleanListOutput does not support set(boolean) Caused by: java.lang.UnsupportedOperationException: io.lettuce.core.output.BooleanListOutput does not support set(boolean) at io.lettuce.core.output.CommandOutput.set(CommandOutput.java:129) at io.lettuce.core.protocol.RedisStateMachine.safeSet(RedisStateMachine.java:758) at io.lettuce.core.protocol.RedisStateMachine.handleBoolean(RedisStateMachine.java:419) at io.lettuce.core.protocol.RedisStateMachine$State$Type.handle(RedisStateMachine.java:205) at io.lettuce.core.protocol.RedisStateMachine.doDecode(RedisStateMachine.java:339) at io.lettuce.core.protocol.RedisStateMachine.decode(RedisStateMachine.java:300) at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:840) at io.lettuce.core.protocol.CommandHandler.decode0(CommandHandler.java:791) at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:765) at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:657) at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:597) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ```

Input Code

Input Code ```java RedisCommandFactory commandFactory = new RedisCommandFactory(redisClient.connect()); bloomCommand = commandFactory.getCommands(BloomCommandReactive.class); @Command("BF.INSERT :filter CAPACITY :capacity ERROR :error ITEMS :items ") Flux insert(@Param("filter") String filter, @Param("capacity") long capacity, @Param("error") double error, @Param("item") String[] items); when I convert the result of any command issued; Flux res= bloomCommand.insert("myBloomFilter", 10000, 0.01, idsArray). CommandArgs args = new CommandArgs<>(StringCodec.UTF8) .addKey("myBloomFilter") .add("CAPACITY").add(10000) .add("ERROR").add(0.01) .add("ITEMS").add("item1").add("item2").add("item3"); class BFInsert implements ProtocolKeyword { @Override public byte[] getBytes() { return "BF.INSERT".getBytes(); } @Override public String name() { return "Nits"; } } List response = syncCommands.dispatch(new BFInsert(), new BooleanListOutput(StringCodec.UTF8), args); System.out.println("Response: " + response); class BooleanListOutput extends CommandOutput> { private final List booleans = new ArrayList<>(); public BooleanListOutput(RedisCodec codec) { super(codec, new ArrayList<>()); } @Override public void set(boolean value) { booleans.add(value); } @Override public void complete(int depth) { this.output = booleans; } } ```

Expected behavior/code

Environment

Possible Solution

Additional context

nits3392 commented 3 months ago

Hi @tishun - Any update on the this? Possible fix or root cause ?

tishun commented 3 months ago

Hey @nits3392 , as of right now unfortunately I cannot commit to a timeline as we are overloaded with a couple of urgent requests.

chetanmeh commented 2 months ago

A temporary workaround for this issue can be to use ProtocolVersion.RESP2 with ClientOptions till the BooleanListOutput is fixed