dotnet / dotNext

Next generation API for .NET
https://dotnet.github.io/dotNext/
MIT License
1.64k stars 123 forks source link

Dynamic cluster membership is not working as expected #135

Closed aley1 closed 1 year ago

aley1 commented 1 year ago

I am trying to get DotNext Tcp Raft to work with a dynamic membership setup. When member nodes are known in advance, and added in the configuration during startup (e.g. 5 members each started with a config file that provides the list of members and endpoints) - everything works well.

However I am attempting to do this:

  1. Start 1st node in coldstart mode (no knowledge of other members) - this will become a Leader node
  2. Start a 2nd node which sends request to 1st node to join cluster. 1st Node executes AddMembersAsync - this works, node joins cluster. membership count=2
  3. Start 3rd node, 4th, 5th nodes using step 2. membership count = 5
  4. Nodes are upgraded to normal nodes via RevertToNormalModeAsync();
  5. Leader is 1st Node
  6. Kill 1st Node - expecting other nodes to take over as Leader

What I see is Leader is still shown as 1st Node (after 1st node is already offline) - no election takes place I've also tried step 1. without starting as coldstart node but adding members does not work.

Please advise if I am missing something or if this is bug. Thanks

sakno commented 1 year ago

The bug is confirmed, can be reproduced with the following test: https://github.com/dotnet/dotNext/blob/390e5d1b2d912f3d9260a6b32a9c7094150482fe/src/DotNext.Tests/Net/Cluster/Consensus/Raft/Http/RaftHttpClusterTests.cs#L570-L647

The fix is already in develop branch

aley1 commented 1 year ago

I fetched your fix in develop branch and tried it out, but the problem is still there.

  1. Start 1st node as coldstart node. Output: New cluster leader is elected. Leader address is 127.0.0.1:3260 Term of local cluster member is 1. Election timeout 00:00:00.2940000

  2. Start 2, 3, 4 nodes. 1st node adds them as members

  3. Member count = 4, leader = 1st node

  4. Kill 1st node

The other member nodes still show membership count = 4, and leader = null, and following info: info: DotNext.Net.Cluster.Consensus.Raft.Tcp.TcpServer[74029] Connection was reset by 127.0.0.1:49425 Consensus cannot be reached Term of local cluster member is 1. Election timeout 00:00:00.1920000

sakno commented 1 year ago

Did you start 3 nodes with standby property set to true? The test described above reproduces exactly the same scenario as you mentioned.

sakno commented 1 year ago

Same test repeated for TCP transport here https://github.com/dotnet/dotNext/blob/45aec28ea5bc6f9e3cd12de71a3b8216b55f52b8/src/DotNext.Tests/Net/Cluster/Consensus/Raft/Tcp/TcpTransportTests.cs#L241-L250

and here https://github.com/dotnet/dotNext/blob/45aec28ea5bc6f9e3cd12de71a3b8216b55f52b8/src/DotNext.Tests/Net/Cluster/Consensus/Raft/TransportServices/TransportTestSuite.cs#L458-L504

It is successful as well. Try to modify or write test based on your code that reproduces the situation.

aley1 commented 1 year ago

I have created a test project to reproduce the issue. 2 files: program.cs and internal.cs

Program.cs

using System.Net;
using DotNext;
using DotNext.Net;
using DotNext.Net.Cluster;
using DotNext.Net.Cluster.Consensus.Raft;
using DotNext.Net.Cluster.Consensus.Raft.Membership;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Console;
using Microsoft.Extensions.Options;

bool isColdStartNode;
int[] otherNodePorts = new int[] { 3262, 3263, 3264};
var cluster = SetupRaftCluster(args, out isColdStartNode);
cluster.LeaderChanged += RaftTest.ClusterConfigurator.LeaderChanged;
await cluster.StartAsync(CancellationToken.None);

if (isColdStartNode)
{
    Console.WriteLine("0) Type the port number and press Enter to add a new cluster member");
}
else
{
    await cluster.RevertToNormalModeAsync();
}
Console.WriteLine("1) Type 'i' and press Enter to display current cluster state");
Console.WriteLine("2) Type 'x' and press Enter to exit");
string input = "";
while (input != "x")
{
    Console.WriteLine("Enter an command: ");
    input = Console.ReadLine();
    if (input == "i")
    {
        if (cluster.Leader != null)
        {
            Console.WriteLine("Current leader: " + cluster.Leader.EndPoint.ToString());
        }
        else
        {
            Console.WriteLine("Current leader: null");
        }
        Console.WriteLine("List of members: ");
        foreach (var item in cluster.Members)
        {
            Console.WriteLine(item.EndPoint.ToString());
        }
    }
    else
    {
        if (input != "x")
        {
            bool isPortNumber = false;
            int port = 0;
            try
            {
                port = Convert.ToInt32(input);
                isPortNumber = true;
            }
            catch
            {
                isPortNumber = false;
            }
            if (isPortNumber)
            {
                AddMember(cluster, port);
            }
        }
    }    
}

await cluster.StopAsync(CancellationToken.None);

static RaftCluster SetupRaftCluster(string [] args, out bool coldStart)
{
    int port = Convert.ToInt32(args[0]);
    coldStart = Convert.ToBoolean(args[1]);

    Console.WriteLine();
    Console.WriteLine("Endpoint: " + IPAddress.Loopback + ":" + port);
    Console.WriteLine("Is coldstart: " + coldStart.ToString());
    Console.WriteLine();

    var configuration = new RaftCluster.TcpConfiguration(new IPEndPoint(IPAddress.Loopback, port))
    {
        RequestTimeout = TimeSpan.FromMilliseconds(140),
        LowerElectionTimeout = 150,
        UpperElectionTimeout = 300,
        TransmissionBlockSize = 4096,
        ColdStart = coldStart,       
    };

    configuration.UseInMemoryConfigurationStorage();

    var loggerFactory = new LoggerFactory();
    var loggerOptions = new ConsoleLoggerOptions
    {
        LogToStandardErrorThreshold = LogLevel.Information
    };
    loggerFactory.AddProvider(new ConsoleLoggerProvider(new RaftTest.FakeOptionsMonitor<ConsoleLoggerOptions>(loggerOptions)));
    configuration.LoggerFactory = loggerFactory;

    var cluster = new RaftCluster(configuration);

    return cluster;
}

static void AddMember(RaftCluster cluster, int port)
{
    var ipe = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
    Console.WriteLine("Adding member > " + ipe.ToString());
    cluster.AddMemberAsync(ClusterMemberId.FromEndPoint(ipe), ipe);
}

Internal.cs

using DotNext.Threading;
using DotNext;
using DotNext.Net.Cluster;
using DotNext.Net.Cluster.Consensus.Raft;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.DependencyInjection;
using System.Diagnostics;

namespace RaftTest
{
    public sealed class FakeOptionsMonitor<T> : IOptionsMonitor<T>
    {
        private sealed class ChangeToken : Disposable
        {
        }

        public FakeOptionsMonitor(T value) => CurrentValue = value;

        public T CurrentValue { get; }

        T IOptionsMonitor<T>.Get(string name) => CurrentValue;

        IDisposable IOptionsMonitor<T>.OnChange(Action<T, string> listener) => new ChangeToken();
    }
    public sealed class ClusterConfigurator : IClusterMemberLifetime
    {
        public static void LeaderChanged(ICluster cluster, IClusterMember? leader)
        {

            Debug.Assert(cluster is IRaftCluster);
            var term = ((IRaftCluster)cluster).Term;
            var timeout = ((IRaftCluster)cluster).ElectionTimeout;

            Console.WriteLine(leader is null
                ? "Consensus cannot be reached"
                : $"New cluster leader is elected. Leader address is {leader.EndPoint}");
            Console.WriteLine($"Term of local cluster member is {term}. Election timeout {timeout}");
        }

        public void OnStart(IRaftCluster cluster, IDictionary<string, string> metadata)
        {
            cluster.LeaderChanged += LeaderChanged;
        }

        public void OnStop(IRaftCluster cluster)
        {
            cluster.LeaderChanged -= LeaderChanged;
        }
    }
}
  1. Started 4 nodes: RaftTest.exe 3261 true RaftTest.exe 3262 false RaftTest.exe 3263 false RaftTest.exe 3264 false

  2. On console for node 3261, added the other 3 nodes as members by entering their port numbers in cmd line

  3. Enter 'i' to check cluster leader and member info on all nodes

  4. Enter 'x' on node 3261 to stop the node

  5. 'Consensus cannot be reached' error appears on the other 3 nodes. Was expecting a node to be promoted to leader.

sakno commented 1 year ago

Result of AddMemberAsync is not awaited. It is asynchronous method. RevertToNormalModeAsync call is not correct when coldStart=false. The node can't work normally if it doesn't have its own address in the configuration. By default, a fresh node starts in standby mode. Reverting it to follower state without correct sync of cluster configuration may lead to undefined behavior. I've added this check to RevertToNormalModeAsync implementation, now it returns false if local config has no local node.

sakno commented 1 year ago

And one more thing, that is probably a source of the issue:

cluster.AddMemberAsync(ClusterMemberId.FromEndPoint(ipe), ipe);

This is incorrect. You're adding a member with ID that doesn't exist in the cluster. You need to use RaftCluster.LocalMemberId of the node that you want to add. ID generates randomly when RaftCluster instantiates. This is called announcing that allows a fresh node to advertise its ID and address to the cluster manager. See RaftCluster.NodeConfiguration.Announcer property that can be used to setup announcement callback. For instance, you can print its ID and address and then print a command on the leader side to add the announced member.

sakno commented 1 year ago

This is the correct bootstrap code for the node:

if (isColdStartNode)
{
    Console.WriteLine("0) Type the port number and press Enter to add a new cluster member");
}
else
{
    Console.WriteLine("Node with id {cluster.LocalMemberId} and address {cluster.LocalMemberAddress} is announced");
    await cluster.Readiness;
    Console.WriteLine("Node is joined the cluster successfully");
}

Also, you need to add an ability to call AddMemberAsync inside of the loop using command-line args. Or, you can use Inter-Process Communication to promote a new member to the leader (works only when running all nodes on the same machine).

aley1 commented 1 year ago

The incorrect LocalMemberId passed during AddMemberAsync is indeed the cause of the issue. In the actual project, we have an RPC that can be used to announce the LocalMemberId and MemberAddress to the coldstart node, and based on testing it works now. Thank you for your help.