quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.73k stars 2.67k forks source link

Error when using withTransaction in RedisDataSource in clustered mode #32361

Open stefanorg opened 1 year ago

stefanorg commented 1 year ago

Describe the bug

Hi all, i want to execute di code (basically a ratelimit algorithm) and everything work fine


// redisDataSource is of type import io.quarkus.redis.datasource.RedisDataSource;

     KeyCommands<String> keys = redisDataSource.key();
        SortedSetCommands<String, Long> z = redisDataSource.sortedSet(String.class, Long.class);
        // remove all element expired (not in current window)
        z.zremrangebyscore(_key, ScoreRange.from(0, interval));
        // add this request
        z.zadd(_key, now, now);
        // count all request
        List<ScoredValue<Long>> scoredValues = z.zrangeWithScores(_key, 0, -1);
        // expire
        keys.expire(_key, INTERVAL_SECONDS);

based on documentation https://quarkus.io/guides/redis-reference i want to execute those commands in a transaction (using MULTI) so i changed the code to use withTransaction

TransactionResult transactionResult = redisDataSource.withTransaction(tx -> {
            TransactionalKeyCommands<String> keys = tx.key();
            TransactionalSortedSetCommands<String, Long> z = tx.sortedSet(String.class, Long.class);
            // remove all element expired (not in current window)
            z.zremrangebyscore(_key, ScoreRange.from(0, interval));
            // add this request
            z.zadd(_key, now, now);
            // count all request
            z.zrangeWithScores(_key, 0, -1);
            // expire
            keys.expire(_key, INTERVAL_SECONDS);
        });

but this way i get different kind of exceptions trying to execute the code:

sometime this:

Request failed: java.lang.IllegalStateException: Unable to add command to the current transaction
        at io.quarkus.redis.runtime.datasource.AbstractTransactionalCommands.queuedOrDiscard(AbstractTransactionalCommands.java:20)
        at io.smallrye.context.impl.wrappers.SlowContextualConsumer.accept(SlowContextualConsumer.java:21)
        at io.smallrye.mutiny.operators.uni.UniOnItemConsume$UniOnItemComsumeProcessor.invokeEventHandler(UniOnItemConsume.java:77)
        at io.smallrye.mutiny.operators.uni.UniOnItemConsume$UniOnItemComsumeProcessor.onItem(UniOnItemConsume.java:42)
        at io.smallrye.mutiny.vertx.AsyncResultUni.lambda$subscribe$1(AsyncResultUni.java:35)
        at io.smallrye.mutiny.vertx.DelegatingHandler.handle(DelegatingHandler.java:25)
        at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
        at io.vertx.core.impl.future.FutureImpl$ListenerArray.onSuccess(FutureImpl.java:262)
        at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
        at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
        at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
        at io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)
        at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)
        at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)
        at io.vertx.redis.client.impl.RedisClusterConnection.lambda$send$7(RedisClusterConnection.java:330)
        at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
        at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
        at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
        at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
        at io.vertx.redis.client.impl.RedisStandaloneConnection.handle(RedisStandaloneConnection.java:409)
        at io.vertx.redis.client.impl.RESPParser.handleResponse(RESPParser.java:316)
        at io.vertx.redis.client.impl.RESPParser.handleNumber(RESPParser.java:147)
        at io.vertx.redis.client.impl.RESPParser.handle(RESPParser.java:91)
        at io.vertx.redis.client.impl.RESPParser.handle(RESPParser.java:24)
        at io.vertx.core.net.impl.NetSocketImpl.lambda$new$1(NetSocketImpl.java:100)
        at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:239)
        at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:129)
        at io.vertx.core.net.impl.NetSocketImpl$DataMessageHandler.handle(NetSocketImpl.java:414)
        at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55)
        at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:158)
        at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:390)
        at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:157)
        at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)

sometime i get ERR MULTI calls can not be nested

2023-03-31 17:37:25,768 ERROR [org.jbo.res.rea.com.cor.AbstractResteasyReactiveContext] (executor-thread-0) Request failed: java.util.concurrent.CompletionException: ERR MULTI calls can not be nested
        at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:79)
        at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:65)
        at io.quarkus.redis.runtime.datasource.BlockingRedisDataSourceImpl.withTransaction(BlockingRedisDataSourceImpl.java:72)
        at io.quarkus.redis.datasource.RedisDataSource_16a9813a62a770a26823359040f690d9904a090a_Synthetic_ClientProxy.withTransaction(Unknown Source)
        at it.argosoft.poc.ratelimit.service.TokenBucketRateLimitingService.shouldAllow(TokenBucketRateLimitingService.java:58)
        at it.argosoft.poc.ratelimit.service.TokenBucketRateLimitingService_ClientProxy.shouldAllow(Unknown Source)
        at it.argosoft.poc.ratelimit.filters.ApiRateLimitFilter.filter(ApiRateLimitFilter.java:43)
        at org.jboss.resteasy.reactive.server.handlers.ResourceRequestFilterHandler.handle(ResourceRequestFilterHandler.java:48)
        at io.quarkus.resteasy.reactive.server.runtime.QuarkusResteasyReactiveRequestContext.invokeHandler(QuarkusResteasyReactiveRequestContext.java:104)
        at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:145)
        at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:576)
        at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2449)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1478)
        at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
        at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: ERR MULTI calls can not be nested

The same happen if i use withTransaction in the reactive variant of ReactiveRedisDataSource

What i'm doing wrong? Thanks

Expected behavior

No response

Actual behavior

No response

How to Reproduce?

No response

Output of uname -a or ver

No response

Output of java -version

No response

GraalVM version (if different from Java)

No response

Quarkus version or git rev

2.16.5

Build tool (ie. output of mvnw --version or gradlew --version)

./mvnw --version Warning: JAVA_HOME environment variable is not set.  100% Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63) Maven home: /home/scorallo/.m2/wrapper/dists/apache-maven-3.8.6-bin/67568434/apache-maven-3.8.6 Java version: 17.0.6, vendor: Amazon.com Inc., runtime: /home/scorallo/.jdks/corretto-17.0.6 Default locale: it_IT, platform encoding: UTF-8 OS name: "linux", version: "5.19.0-38-generic", arch: "amd64", family: "unix"

Additional information

No response

quarkus-bot[bot] commented 1 year ago

/cc @cescoffier (redis), @gsmet (redis), @machi1990 (redis)

cescoffier commented 1 year ago

Can you provide a reproducer?

Also, you cannot use withTransaction if you are already in one.

stefanorg commented 1 year ago

Yes i can provide a reproducer. Also i noticed that if i use redis (simple mode with devservices) everything work fine. The same code, with Cluster Mode redis give the errors i wrote. Maybe this can be the issue?

cescoffier commented 1 year ago

Hum, that's definitely the issue. We have a few issues around the clustered mode. @Ladicek is going to have a look soon.

stefanorg commented 1 year ago

ok, you need me to upload the reproducer?

Ladicek commented 1 year ago

Please do upload the reproducer, that will be very useful. Thanks!

stefanorg commented 1 year ago

you can find the reproducer here https://github.com/stefanorg/quarkus-rate-limit-poc

I've provided a docker-compose.yml to spin up a redis cluster. Hope this help! Thanks

ivivanov-bg commented 1 year ago

Is there any progress / workaround on that issue ? I'm experiencing the same in a cluster mode:

TransactionResult resetFileResult = redisDS.withTransaction(tx -> {
      tx.key().rename(OLD_KEY, NEW_KEY);
      tx.value(Long.class).getdel(SEQ_KEY);
  });

Produces:

java.util.concurrent.CompletionException: ERR EXEC without MULTI
Ladicek commented 1 year ago

Ooops, I totally forgot about this issue. It seems quite possible that this is caused by https://github.com/vert-x3/vertx-redis-client/issues/365, which was fixed recently. I'll try to reproduce and confirm.

Ladicek commented 1 year ago

OK, I was able to reproduce and upgrading to Vert.x 4.4.6 doesn't seem to help. So https://github.com/vert-x3/vertx-redis-client/issues/365 is not the cause of the problem here. I'll dig deeper.

Ladicek commented 1 year ago

So the crucial problem here is that the underlying Vert.x Redis client is not transaction-aware. It treats MULTI and EXEC as any other command. In the Redis cluster mode, this means that the client sends these commands to random nodes, because there's no key associated with them. This means:

  1. MULTI may end up on a different node than the following commands, meaning that those commands don't actually execute in a transaction.
  2. EXEC may end up on a different node than MULTI, which means the transaction is not ended properly (and EXEC fails).

This requires substantial changes on the Vert.x Redis client side. I can see 3 ways of handling a Redis transaction in a cluster:

  1. Unsupported. Transaction commands (WATCH, UNWATCH, MULTI, EXEC, DISCARD) are detected and the command is rejected directly by the client.
  2. Single-node transactions. MULTI is queued and is only emitted when the first command with a key is executed. This first command binds the connection to the corresponding node of the Redis cluster. All subsequent commands are targeted to that node. If some of the subsequent commands have a key that belongs to another node, this would fail on the Redis side. If WATCH is used before MULTI, its key(s) determine to which node the connection is bound and the subsequent MULTI doesn't need to be queued. If WATCH keys belong to multiple nodes, that would fail just like today. This provides roughly the same guarantees as a normal Redis transaction with a standalone Redis instance.
  3. Multi-node transactions. MULTI is queued and is emitted for each node that is targeted by any of the subsequent commands. EXEC is submitted to all nodes that were targeted by any of the previously executed commands. WATCH would be distributed to all nodes to which its keys belong. This is unlike a normal Redis transaction, because it's in fact N independent Redis transactions, uncoordinated.

The 1st option would be fairly easy to implement. The 2nd and 3rd options, not so much.

ivivanov-bg commented 1 year ago

For my particular case - the 2nd option is quite suitable. The keys in each transaction needs to be anyway bound to the same node and as you said - this will replicate the behavior of non-cluster redis And will not end up with multiple transactions with no way to discard all if something goes wrong (like in 3rd case)

MichaelKubovic commented 11 months ago

All subsequent commands are targeted to that node.

In addition to targeting the node, my experiments showed that you also need to target the same connection. Clustered client uses connection pool, so all commands in transactions IMO should re-use the same connection.

Ladicek commented 11 months ago

That is correct, the withTransaction method in Quarkus already works like that (it borrows a connection from the pool and runs all commands on it).