redis / lettuce

Advanced Java Redis client for thread-safe sync, async, and reactive usage. Supports Cluster, Sentinel, Pipelining, and codecs.
https://lettuce.io
MIT License
5.3k stars 947 forks source link

ClusterTopologyChangedEvent Timing Issue #2860

Open ojh3636 opened 1 month ago

ojh3636 commented 1 month ago

Bug Report

The ClusterTopologyChangedEvent is published before updating the cluster's partitions. This can sometimes cause unexpected behavior in the application logic. Specifically, we use the topology changed event to notify publisher to make a new connection if the previous publishing node has changed from master to replica. However, due to the timing issue, when the publisher makes a new connection by performing a node selection operation (which filters only master nodes), the newly chosen master is actually a replica node. There is no opportunity to eventually correct the node selection because, after the partition is updated, the ClusterTopologyChangedEvent is no longer published.

Current Behavior

io.lettuce.core.cluster.RedisClusterClient

    public CompletionStage<Void> refreshPartitionsAsync() {

        List<RedisURI> sources = new ArrayList<>();

        Iterable<RedisURI> topologyRefreshSource = getTopologyRefreshSource();
        for (RedisURI redisURI : topologyRefreshSource) {
            sources.add(redisURI);
        }

        EventRecorder.RecordableEvent event = EventRecorder.getInstance().start(new TopologyRefreshEvent(sources));

        if (partitions == null) {
            return initializePartitions().thenAccept(Partitions::updateCache)
                    .whenComplete((unused, throwable) -> event.record());
        }

        return loadPartitionsAsync().thenAccept(loadedPartitions -> {

            if (TopologyComparators.isChanged(getPartitions(), loadedPartitions)) {

                logger.debug("Using a new cluster topology");

                List<RedisClusterNode> before = new ArrayList<>(getPartitions());
                List<RedisClusterNode> after = new ArrayList<>(loadedPartitions);

                getResources().eventBus().publish(new ClusterTopologyChangedEvent(before, after));
            }

            this.partitions.reload(loadedPartitions.getPartitions());
            updatePartitionsInConnections();
        }).whenComplete((unused, throwable) -> event.record());
    }

Environment

Possible Solution

Publish ClusterTopologyChangedEvent after partition map's reload is done.

Additional context