redis / lettuce

Advanced Java Redis client for thread-safe sync, async, and reactive usage. Supports Cluster, Sentinel, Pipelining, and codecs.
https://lettuce.io
MIT License
5.35k stars 960 forks source link

Retry commands failed with `LOADING` against the master node #2488

Open barshaul opened 1 year ago

barshaul commented 1 year ago

Bug Report

Current Behavior

Lettuce is filtering out nodes in LOADING state by checking their master_repl_offset filed == 0. However, in the scenarios described below nodes' replOffset can be left with -1L value. Thus, a loading replica with replOffset == -1, would be considered as a valid read candidate and Lettuce will continue to query it, resulting in LOADING error raised back to the user.

When the replOffset is not being initialized properly?

Lets examine the getNodeSpecificViews function in DefaultClusterTopologyRefresh.java:

NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requestedInfo) {

        List<RedisClusterNodeSnapshot> allNodes = new ArrayList<>();

        Map<String, NodeTopologyView> self = new HashMap<>();

        Set<RedisURI> nodes = requestedTopology.nodes();

        List<NodeTopologyView> views = new ArrayList<>();
        for (RedisURI nodeUri : nodes) {

            try {
                NodeTopologyView nodeTopologyView = NodeTopologyView.from(nodeUri, requestedTopology, requestedInfo);

                if (!nodeTopologyView.isAvailable()) {
                    continue;
                }

                RedisClusterNode node = nodeTopologyView.getOwnPartition();
                if (node.getUri() == null) {
                    node.setUri(nodeUri);
                } else {
                    node.addAlias(nodeUri);
                }

                self.put(node.getNodeId(), nodeTopologyView);

                List<RedisClusterNodeSnapshot> nodeWithStats = new ArrayList<>(nodeTopologyView.getPartitions().size());

                for (RedisClusterNode partition : nodeTopologyView.getPartitions()) {

                    if (validNode(partition)) {
                        nodeWithStats.add(new RedisClusterNodeSnapshot(partition));
                    }
                }

                allNodes.addAll(nodeWithStats);

                Partitions partitions = new Partitions();
                partitions.addAll(nodeWithStats);

                nodeTopologyView.setPartitions(partitions);

                views.add(nodeTopologyView);
            } catch (CompletionException e) {
                logger.warn(String.format("Cannot retrieve partition view from %s, error: %s", nodeUri, e));
            }
        }

        for (RedisClusterNodeSnapshot node : allNodes) {

            if (!self.containsKey(node.getNodeId())) {
                continue;
            }

            NodeTopologyView view = self.get(node.getNodeId());

            node.setConnectedClients(view.getConnectedClients());
            node.setReplOffset(view.getReplicationOffset());
            node.setLatencyNs(view.getLatency());
        }

        for (NodeTopologyView view : views) {
            view.postProcessPartitions();
        }

        return new NodeTopologyViews(views);
    }

In the first for loop, if a specific node, lets call it X, is not a part of nodes, but is found in other nodes' nodeTopologyView.getPartitions() (e.g., when a new node is added to the cluster topology), then X will be found in allNodes, but not in self. So, in the second loop we will skip this node and won't set it's replOffset:

            if (!self.containsKey(node.getNodeId())) {
                continue;
            }

Another scenario is when node Y is a part of node, but this node is currently loading. When a node is loading, CLUSTER NODES command will return a LOADING error. Therefore, we will skip adding this node to self in the first loop because nodeTopologyView.isAvailable() will be false. However, the loading node will be present in other nodes CLUSTER NODES output, and therefore will be added to allNodes. Again, we'll skip this node in the second loop and won't set it's replOffset:

...
                # First loop
                if (!nodeTopologyView.isAvailable()) {
                    continue;
                }
...
            # Second loop
            if (!self.containsKey(node.getNodeId())) {
                continue;
            }

In both cases, the created topology will contain nodes with replOffset == -1 at the end of the creation of the client's topology.

Stack trace I added logs to Lettuce to show the behavior, notice the ' is a read candidate, reploffset is = -1': ``` 2023-08-16 09:11:21 WARN PooledClusterConnectionProvider:405 - RedisClusterNodeSnapshot [uri=redis://127.0.0.1:41788, nodeId='0f22044584019b4bcfe8b1384b0bc1220c96b26c', connected=true, slaveOf='null', pingSentTimestamp=0, pongReceivedTimestamp=1692177079000, configEpoch=3, replOffset=1190, flags=[MASTER], aliases=[], slot count=5461] is NOT a read candidate, reploffset is = 1190 2023-08-16 09:11:21 WARN PooledClusterConnectionProvider:405 - RedisClusterNodeSnapshot [uri=redis://127.0.0.1:22152, nodeId='505c3cbb1204788e851dffd515c3e4f50cb7ea0d', connected=true, slaveOf='a2beee0391b51f8812c24792adb8d4f92f788b79', pingSentTimestamp=0, pongReceivedTimestamp=1692177079000, configEpoch=1, replOffset=-1, flags=[SLAVE, REPLICA], aliases=[]] is NOT a read candidate, reploffset is = -1 2023-08-16 09:11:21 WARN PooledClusterConnectionProvider:402 - RedisClusterNodeSnapshot [uri=redis://127.0.0.1:20686, nodeId='eed0801c3df60c91f3e9dac72ce284317199a12e', connected=true, slaveOf='667d8a4028353e75b090c471eb7d1bc8511ea4fd', pingSentTimestamp=0, pongReceivedTimestamp=1692177081059, configEpoch=2, replOffset=-1, flags=[SLAVE, REPLICA], aliases=[]] is a read candidate, reploffset is = -1 2023-08-16 09:11:21 WARN PooledClusterConnectionProvider:405 - RedisClusterNodeSnapshot [uri=redis://127.0.0.1:43062, nodeId='f41fd8da7e8dc570aac137db7d89ef4131213d10', connected=true, slaveOf='0f22044584019b4bcfe8b1384b0bc1220c96b26c', pingSentTimestamp=0, pongReceivedTimestamp=1692177080056, configEpoch=3, replOffset=-1, flags=[SLAVE, REPLICA], aliases=[]] is NOT a read candidate, reploffset is = -1 2023-08-16 09:11:21 WARN PooledClusterConnectionProvider:405 - RedisClusterNodeSnapshot [uri=redis://127.0.0.1:44142, nodeId='a2beee0391b51f8812c24792adb8d4f92f788b79', connected=true, slaveOf='null', pingSentTimestamp=0, pongReceivedTimestamp=1692177077000, configEpoch=1, replOffset=1190, flags=[MYSELF, MASTER], aliases=[redis://127.0.0.1:44142], slot count=5461] is NOT a read candidate, reploffset is = 1190 2023-08-16 09:11:21 WARN PooledClusterConnectionProvider:230 - read candidates selection = [RedisClusterNodeSnapshot [uri=redis://127.0.0.1:20686, nodeId='eed0801c3df60c91f3e9dac72ce284317199a12e', connected=true, slaveOf='667d8a4028353e75b090c471eb7d1bc8511ea4fd', pingSentTimestamp=0, pongReceivedTimestamp=1692177081059, configEpoch=2, replOffset=-1, flags=[SLAVE, REPLICA], aliases=[]], RedisClusterNodeSnapshot [uri=redis://127.0.0.1:18358, nodeId='667d8a4028353e75b090c471eb7d1bc8511ea4fd', connected=true, slaveOf='null', pingSentTimestamp=0, pongReceivedTimestamp=1692177080000, configEpoch=2, replOffset=1250, flags=[MASTER], aliases=[], slot count=5462]] ... Aug 16, 2023 9:11:21 AM com.elasticache.DowntimeClients.BaseDowntimeClient exec WARNING: recieved excption: io.lettuce.core.RedisLoadingException: LOADING Redis is loading the dataset in memory```

Input Code

Input Code ```java // your code here; ```

Expected behavior/code

A loading node should not be considered as a read candidate.

Environment

Possible Solution

There are couple of options:

  1. Initialize replOffset with 0
  2. In the function mentioned above, add:
            if (!self.containsKey(node.getNodeId())) {
                node.setReplOffset(0);
                continue;
            }
  3. Separate between the topology view and the replication offset. CLUSTER NODES cannot be called while a node is loading, but INFO REPLICATION is. Even if we haven't got a view from a specific node, we can still set its master_repl_offset.

Mitigation for users who currently facing this issue:

1. Catch LOADING errors and route them back to the primary [preferred]

  public String execGet(RedisAdvancedClusterCommands<String, String> client, StatefulRedisClusterConnection<String, String> conn, String key) {
    try {
      return client.get(key);
    } catch (RedisLoadingException loadingError) {
        int slot = SlotHash.getSlot(key);
        Optional<RedisClusterNode> master = conn.getPartitions().getPartitions().stream()
            .filter(redisClusterNode -> redisClusterNode.hasSlot(slot)).findFirst();
        NodeSelection<String, String> node = client
            .nodes(redisClusterNode -> redisClusterNode.getNodeId().equals(master.get().getNodeId()));
        Executions<String> result = node.commands().get(key);
        String strResult = result.get(master.get());
        return strResult;
    }
  }

Downsides:

In this mitigation we keep the loading replica in the client’s topology view, so the client will continue to refer to this replica as a read candidate and route read requests to it. This means that all commands that will fail due to LOADING error will have a doubled latency - once the time it took to send it to the replica, and second the time to send it to the primary. However, it will enforce not getting a full downtime for read queries. Another downside of this method is that all traffic that was routed to the replica will now be routed to the primary, which can increase the load on the primary node. If there’re other replicas, we would have wanted to be able to route the failed command to another replica, or at least to randomly choose a node from the primary and other replicas. However, AFAIK, Lettuce currently doesn’t provider information about which of the nodes returned the LOADING error, so we don’t have a way to select all nodes in this shard and exclude only the loading ones. I’m still waiting for response about if it’s possible on the git’s issue.

2. Filter out replicas with replOffset == -1, while enabling the periodic topology updates:

    final ClusterTopologyRefreshOptions topologyOptions = ClusterTopologyRefreshOptions.builder()
        .enableAllAdaptiveRefreshTriggers()
        .enablePeriodicRefresh()
        .dynamicRefreshSources(true)
        .enablePeriodicRefresh(Duration.ofSeconds(60))
        .build();
...
    ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
        .topologyRefreshOptions(topologyOptions)
        .socketOptions(socketOptions)
        .autoReconnect(true)
        .timeoutOptions(timeoutOptions)
        .nodeFilter(
            it -> !(it.is(RedisClusterNode.NodeFlag.FAIL)
                || it.is(RedisClusterNode.NodeFlag.EVENTUAL_FAIL)
                || it.is(RedisClusterNode.NodeFlag.NOADDR)
                || (it.is(RedisClusterNode.NodeFlag.REPLICA) && it.getReplOffset() == -1L)
                )) // Filter out Predicate
        .validateClusterNodeMembership(false)
        .build();

Downsides:

This mitigation means that we’ll filter out new replicas, regardless to their loading state, and replaced replicas during their loading state. The new replicas will be added back to the cluster topology only in the next periodic topology update. Consequently, these replicas will remain inaccessible for read queries for an extended period equal to or less than the duration between periodic topology checks. Moreover, if an existing replica gets into a LOADING state, the client will continue to route requests to it until it will remove it in the next topology check. If the cluster is large, setting the duration for the topology updates too low may impact performance, as the length of the CLUSTER NODES output is affected by the cluster size.

mp911de commented 1 year ago

Let's take a step back before jumping right into the code.

We default replOffset to -1 to indicate the absence of a replication offset. Lettuce can fetch topology details either from the initial seed nodes used to construct the cluster client or contact all cluster nodes. In the latter case, we can obtain all repl offsets, in the first case we only have repl offsets for the seed nodes.

If we'd default to zero as the initial value, we would stop considering replicas as read candidates.

We considered switching to CLUSTER SHARDS but that command stopped reporting the MYSELF node with Redis 7.2 so that isn't an option really.

I currently do not see a good way out to filter nodes in LOADING state without querying all Redis Cluster nodes. Let me know if I missed something.

barshaul commented 1 year ago

I think that the best option is to create new nodes with replOffset = 0, and changing the topology refresh function to query master_repl_offset only nodes that their replication offset is 0. So, in the client init we'll need to query all nodes (or we can reduce it only to replicas), and in later topology refreshments we will query only required nodes. I agree that we have to have some following check to update the offset of replicas that finished loading, so this change can be done only for clients that enable periodic topology updates , or if there's any other cron job that runs on all CME clients it can be enabled to all (where do you measure replicas with lowest latency?).

Since this change means querying all nodes only on the client init, and as "info replication" doesn't have a long output (comparing to cluster nodes with large clusters), I don't see an issue in doing so.

mannitrkl2006 commented 1 year ago

Hello Mark,

Another way to determine loading state for certain is looking at Persistence section of INFO command (tested on Redis-7.0). See below :

# Persistence
loading:1
async_loading:0
current_cow_peak:0

There is a field called loading which is 1 when a replica is loading. This can be used to set the LOADING flag for the node. Currently, the LOADING flag is supposed to be set from CLUSTER SHARDS command which you mentioned won't work in Redis-7.2

What do you think about it?

barshaul commented 1 year ago

@mannitrkl2006, the issue isn't how to find out if a replica is loading - it can also be achieved by the 'master_repl_offset' entry in the replication info as it's being done now.

The issue Mark refers to is that querying INFO retrieves information only about the queried node, so it requires to run the INFO command on all nodes. This is unlike CLUSTER SHARDS, which gives info about all nodes in the cluster.

However, as I said above, I don't really see a problem with querying all nodes in the client initialization.

mp911de commented 1 year ago

querying all nodes in the client initialization.

It involves a bit more than that. Clients can reside in loading state at initialization and these nodes would require re-querying. Also, I'm not quite sure whether node that crashes can get back into loading state.

barshaul commented 1 year ago

But that's ok, we want to re-query loading nodes. This is why I said we need to have some periodic job that updates it (or an async task that waits for loading nodes to finish). What do you mean by 'node that crashes'? if a new server starts on this node, it will get into loading again. If the replication lag was too high, the node might be required to do a fullsync again and will get into loading.

mp911de commented 1 year ago

That leaves us with the requirement to re-query all cluster nodes and that is what we tried to eliminate in the first place. The main reason I'm reluctant to query all cluster nodes are several reports of projects that utilize huge clusters with 300+ nodes. Connecting, querying, and disconnecting to each node from each application creates a lot of load.

barshaul commented 1 year ago

What about using the existing connections, or querying for the replication offset during the periodic topology check?

Also, as a user-side mitigation, is there a way of getting the cluster node that responded with the loading error? This would enable users to catch loading errors and retry their command by randomly selecting one of this partition nodes while execulding the loading node. For example, this mitigation can be applied to retry the command on the primary node:

  public String execGet(RedisAdvancedClusterCommands<String, String> client, StatefulRedisClusterConnection<String, String> conn, String key) {
    try {
      return client.get(key);
    } catch (RedisLoadingException loadingError) {
        int slot = SlotHash.getSlot(key);
        Optional<RedisClusterNode> master = conn.getPartitions().getPartitions().stream()
            .filter(redisClusterNode -> redisClusterNode.hasSlot(slot)).findFirst();
        NodeSelection<String, String> node = client
            .nodes(redisClusterNode -> redisClusterNode.getNodeId().equals(master.get().getNodeId()));
        Executions<String> result = node.commands().get(key);
        String strResult = result.get(master.get());
        return strResult;
    }
  }

However, it would enforce routing all unsuccessful commands stemming from loading errors to the primary node. If we'll get the failed node ID, users could temporary manage the read candidates in the application end by deselecting it and randomly choosing another node to query directly (not optimal because it means doubling up the latency due to re-querying, but better than facing continues loading errors).

mp911de commented 1 year ago

What about using the existing connections

Existing connections may be busy/blocked so that could cause issues.

Also, as a user-side mitigation, is there a way of getting the cluster node that responded with the loading error?

I like this idea very much. I just have to make up my mind regarding configuration to be able to fall back to LOADING errors. Other than that, this seems the best way forward. Thanks for this idea.

srgsanky commented 12 months ago

We considered switching to CLUSTER SHARDS but that command stopped reporting the MYSELF node with Redis 7.2 so that isn't an option really.

Was this intentional in 7.2? Can you provide any pointers?

What is the reason Lettuce uses CLUSTER NODES for topology refresh as opposed to CLUSTER SLOTS (pre redis 7.0) or CLUSTER SHARDS (post redis 7.0)?

mp911de commented 12 months ago

Lettuce maintains a Partitions object that represents all nodes along with their slots. We do select nodes for read and write operations based on that information and we coordinate our connection pools based on node information. CLUSTER SLOTS considers only the aspect of slot assignment. Nodes without slots are not mentioned there although these could be used for Pub/Sub.

As for CLUSTER SHARDS, Redis 7.2 has removed the MYSELF node which is a real pity.

srgsanky commented 11 months ago

@mp911de can you please expand on

As for CLUSTER SHARDS, Redis 7.2 has removed the MYSELF node which is a real pity.

I tried issuing cluster shards on a 7.0 cluster and 7.2 cluster and I don't understand your comment. I have attached the output from two clusters each with 5 shards and 2 replicas. cluster-shards-7.0.txt cluster-shards-7.2.txt

mp911de commented 11 months ago

I'm not quite sure what the difference is. However, I see the following outputs (smaller clusters):

7.2

127.0.0.1:7379> cluster nodes
27f88788f03a86296b7d860152f4ae24ee59c8c9 127.0.0.1:7379@17379 myself,master - 0 1696942023000 1 connected 0-11999 0-11999
c2043458aa5646cee429fdd5e3c18220dddf2ce5 127.0.0.1:7380@17380 master - 1696942023658 1696942023567 0 connected 12000-16383
1c541b6daf98719769e6aacf338a7d81f108a180 127.0.0.1:7381@17381 slave 27f88788f03a86296b7d860152f4ae24ee59c8c9 1696942023658 1696942023572 1 connected
2c07344ffa94ede5ea57a2367f190af6144c1adb 127.0.0.1:7382@17382 slave c2043458aa5646cee429fdd5e3c18220dddf2ce5 1696942023658 1696942023576 0 connected
127.0.0.1:7379> cluster shards
1) 1) "slots"
   2) 1) "12000"
      2) "16383"
   3) "nodes"
   4) 1)  1) "id"
          2) "c2043458aa5646cee429fdd5e3c18220dddf2ce5"
          3) "port"
          4) (integer) 7380
          5) "ip"
          6) "127.0.0.1"
          7) "endpoint"
          8) "127.0.0.1"
          9) "role"
         10) "master"
         11) "replication-offset"
         12) (integer) 28
         13) "health"
         14) "online"
      2)  1) "id"
          2) "2c07344ffa94ede5ea57a2367f190af6144c1adb"
          3) "port"
          4) (integer) 7382
          5) "ip"
          6) "127.0.0.1"
          7) "endpoint"
          8) "127.0.0.1"
          9) "role"
         10) "replica"
         11) "replication-offset"
         12) (integer) 28
         13) "health"
         14) "online"
2) 1) "slots"
   2) 1) "0"
      2) "11999"
   3) "nodes"
   4) 1)  1) "id"
          2) "1c541b6daf98719769e6aacf338a7d81f108a180"
          3) "port"
          4) (integer) 7381
          5) "ip"
          6) "127.0.0.1"
          7) "endpoint"
          8) "127.0.0.1"
          9) "role"
         10) "replica"
         11) "replication-offset"
         12) (integer) 28
         13) "health"
         14) "online"
127.0.0.1:7379> 

7.0

127.0.0.1:7379> cluster nodes
27f88788f03a86296b7d860152f4ae24ee59c8c9 127.0.0.1:7379@17379 myself,master - 0 1696941909000 1 connected 0-11999
1c541b6daf98719769e6aacf338a7d81f108a180 127.0.0.1:7381@17381 slave 27f88788f03a86296b7d860152f4ae24ee59c8c9 0 1696941909532 1 connected
2c07344ffa94ede5ea57a2367f190af6144c1adb 127.0.0.1:7382@17382 slave c2043458aa5646cee429fdd5e3c18220dddf2ce5 0 1696941909533 0 connected
c2043458aa5646cee429fdd5e3c18220dddf2ce5 127.0.0.1:7380@17380 master - 0 1696941909532 0 connected 12000-16383
127.0.0.1:7379> cluster shards
1) 1) "slots"
   2) 1) (integer) 0
      2) (integer) 11999
   3) "nodes"
   4) 1)  1) "id"
          2) "27f88788f03a86296b7d860152f4ae24ee59c8c9"
          3) "port"
          4) (integer) 7379
          5) "ip"
          6) "127.0.0.1"
          7) "endpoint"
          8) "127.0.0.1"
          9) "role"
         10) "master"
         11) "replication-offset"
         12) (integer) 14
         13) "health"
         14) "online"
      2)  1) "id"
          2) "1c541b6daf98719769e6aacf338a7d81f108a180"
          3) "port"
          4) (integer) 7381
          5) "ip"
          6) "127.0.0.1"
          7) "endpoint"
          8) "127.0.0.1"
          9) "role"
         10) "replica"
         11) "replication-offset"
         12) (integer) 14
         13) "health"
         14) "online"
2) 1) "slots"
   2) 1) (integer) 12000
      2) (integer) 16383
   3) "nodes"
   4) 1)  1) "id"
          2) "c2043458aa5646cee429fdd5e3c18220dddf2ce5"
          3) "port"
          4) (integer) 7380
          5) "ip"
          6) "127.0.0.1"
          7) "endpoint"
          8) "127.0.0.1"
          9) "role"
         10) "master"
         11) "replication-offset"
         12) (integer) 14
         13) "health"
         14) "online"
      2)  1) "id"
          2) "2c07344ffa94ede5ea57a2367f190af6144c1adb"
          3) "port"
          4) (integer) 7382
          5) "ip"
          6) "127.0.0.1"
          7) "endpoint"
          8) "127.0.0.1"
          9) "role"
         10) "replica"
         11) "replication-offset"
         12) (integer) 14
         13) "health"
         14) "online"