redis / lettuce

Advanced Java Redis client for thread-safe sync, async, and reactive usage. Supports Cluster, Sentinel, Pipelining, and codecs.
https://lettuce.io
MIT License
5.41k stars 975 forks source link

Race condition when using pipelining in cluster mode #1111

Open volodymyrpavlenko opened 5 years ago

volodymyrpavlenko commented 5 years ago

Bug Report

Current Behavior

When using Lettuce in cluster mode with pipelining (connection.setAutoFlushCommands(false)), there seems to be a race condition between adding commands to write buffer, and flushing commands to output.

The race condition can be triggered when lettuce client is not yet connected to the node, where the command should be routed. In this case, the response future from a GET method can be returned before the command is actually added to the command buffer.

If connection.flushCommands() is invoked before the command is added to the buffer, then it will not be send, thus the client future will never complete (unless we invoke connection.flushCommands() again).

The race condition seems to be due to the following lines: https://github.com/lettuce-io/lettuce-core/blob/master/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java#L104 https://github.com/lettuce-io/lettuce-core/blob/master/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java#L129

Input Code

Input Code ```java public static void main(String[] args) { for (int i = 0; i < 1000; i++) { test(); System.out.println("Finished iteration " + i); } } private static void test() { RedisClusterClient redisClusterClient = RedisClusterClient.create("redis://localhost:7008/"); final StatefulRedisClusterConnection connection = redisClusterClient.connect(); // connection.setReadFrom(ReadFrom.SLAVE); // Uncommenting this line will make test fail much more often connection.setAutoFlushCommands(false); final RedisAdvancedClusterAsyncCommands async = connection.async(); /* 127.0.0.1:7000> cluster keyslot bar (integer) 5061 (node1) 127.0.0.1:7000> cluster keyslot baz2 (integer) 10594 (node2) 127.0.0.1:7000> cluster keyslot foo (integer) 12182 (node3) */ final RedisFuture future1 = async.get("bar"); final RedisFuture future2 = async.get("baz2"); final RedisFuture future3 = async.get("foo"); connection.flushCommands(); if (!LettuceFutures.awaitAll(10, TimeUnit.SECONDS, future1, future2, future3)) { connection.flushCommands(); if (LettuceFutures.awaitAll(1, TimeUnit.SECONDS, future1, future2, future3)) { throw new IllegalStateException("Commands didn't finish in 10 seconds. Finished after second invocation of connection.flushCommands()."); } else { throw new IllegalStateException("Commands didn't finish in 10 seconds."); } } connection.close(); } ```
Output ``` Finished iteration 0 Finished iteration 1 Finished iteration 2 Finished iteration 3 Finished iteration 4 Finished iteration 5 Finished iteration 6 Finished iteration 7 Finished iteration 8 Finished iteration 9 Finished iteration 10 Finished iteration 11 Finished iteration 12 Finished iteration 13 Exception in thread "main" java.lang.IllegalStateException: Commands didn't finish in 10 seconds. Finished after second invocation of connection.flushCommands(). at com.spotify.redisperformancebenchmark.tests.TestRaceCondition.test(TestRaceCondition.java:47) at com.spotify.redisperformancebenchmark.tests.TestRaceCondition.main(TestRaceCondition.java:13) ```

Expected behavior/code

All futures eventually complete with only one invocation of flushCommands()

Environment

Possible Solution

Is it possible to separate creation of a Connection object (and command buffer), and actual network connectivity. This will allow to store command into the buffer before returning future to the client.

Additional context

From the code, looks like MasterSlaveChannelWriter will have the same issue, though I have not validated this.

mp911de commented 5 years ago

The code behaves as designed. Connections to cluster nodes are established asynchronously. Flushing commands without having all cluster connections established can lead to the state where a connection is being created while flushCommands() is called. This signal does not reach the connection in the creation and that is why you see some commands not executed.

Changing AutoFlush state requires special attention and more attention when using Redis Cluster or Master/Slave. Connections to nodes need to be fully initialized before AutoFlush state can be altered.

volodymyrpavlenko commented 5 years ago

Hi Mark, thanks for your reply!

  1. Is there a way to know if the connection has been already established? It seemed that it's quite encapsulated inside the Lettuce code, and is not exposed to the application.

  2. What would happen if connection is broken for any reason, and the Lettuce client needs to reconnect when the program is running, for example in the following case:

1. application successfully checks if the client is connected (how?)
2. network connection interrupted, client disconnects
3. client starts reconnection
4. send command
5. flush
6. connection established

This would lead to command not being flushed.
  1. If I wait for the connection to be established, there is still a race condition (however a less likely one) if I flush after the connection was established, but before the command was written.
mp911de commented 5 years ago

You can iterate over partitions of the cluster StatefulRedisClusterConnection.getPartitions() and fetch the connection via StatefulRedisClusterConnection.getConnection(…) or StatefulRedisClusterConnection.getConnectionAsync(…).

There's no way to introspect connected nodes. Regarding connected/disconnected state, you can inspect the state by calling StatefulRedisConnection.isOpen() and assume on that basis whether you can send the bulk.

Does that help?

volodymyrpavlenko commented 5 years ago

I am not sure if this will actually work in our approach due to: 1) Even if the connection is open before sending the batch, the race condition still exists:

mp911de commented 5 years ago

Therefore, setAutoFlushCommands(…) should be really used in standalone scenarios if you have full control over the connection and not in use-cases where connections might be shared.

Closing this ticket as we effectively cannot do anything here.

volodymyrpavlenko commented 5 years ago

In this case, it'd be great to update documentation, since it took a lot of time for us to find the reason for the race condition.

If pipelining should not be used in master/slave or cluster mode, why not to remove the method altogether from the code to avoid developers confusion in future?

mp911de commented 5 years ago

Controlling command flushing (e.g. when running a standalone main or something like that) can improve performance by the factor of 10x. For this to properly work, you either need a standalone connection of a properly warmed connection. It is intended for bulk-loading.

Updating the docs is a good suggestion, we should do that.

volodymyrpavlenko commented 5 years ago

Just to give you more context, we have a system with a very high throughput, which is also latency critical. We need to get batches of randomly spread keys (total volume peaks at 10m items/sec).

Since redis cluster does not support cross-slot MGET, and our keys are randomly distributed, we thought about using pipelining for batching GETs.

We'd like to use cluster for easiness of nodes discovery, failover, and sharding.

Do you have a better solution of this problem?

mp911de commented 5 years ago

Have you tried using plain MGET calls through Lettuce's API? The Redis Cluster implementation splits keys into command chunks according the slot-hash. There's an ongoing discussion at #1116 about a similar topic. To improve time to first response, you could use the streaming MGET API. But really, I suggest benchmarking first before opening up for more complexity.

If Lettuce cannot satisfy your requirements, netty has a built-in Redis codec that gives you way more control over the Redis protocol. Did you try whether the Jedis Redis client would be an option?

volodymyrpavlenko commented 5 years ago

MGET would be split by slot, and splitting 10-100 sized multigets over 16384 slots will simply transform them into single gets.

We tried this before, and performance was very poor. We found that running pipelines instead of MGETs in cluster mode is a great optimization.

We didn't try Jedis since it's fully synchronous, and I don't think that synchronous approach would work with our load of ~110k reqs/sec on a single client.

We looked into Redisson client, and, in cluster mode it supports pipelining, we are testing it now.

nwpuqyj commented 1 month ago

MGET would be split by slot, and splitting 10-100 sized multigets over 16384 slots will simply transform them into single gets.

We tried this before, and performance was very poor. We found that running pipelines instead of MGETs in cluster mode is a great optimization.

We didn't try Jedis since it's fully synchronous, and I don't think that synchronous approach would work with our load of ~110k reqs/sec on a single client.

We looked into Redisson client, and, in cluster mode it supports pipelining, we are testing it now.

@volodymyrpavlenko Hello,friend,have you solved this scenario based on redisson or other ways?We are facing the same problem!In Redis cluster mode, there is a race condition in the pipeline operation for batch random key query (random slot).manual flush commands often causes pipeline query timeout(some commands maybe never send,so the future will never complete……)

tishun commented 1 month ago

I'd like to revisit this scenario, as it seems to be something interesting to support, if possible.