atomix / atomix

A Kubernetes toolkit for building distributed applications using cloud native principles
https://atomix.io
Apache License 2.0
2.36k stars 388 forks source link

Messages and Tasks only send from Leader Node #165

Closed renarj closed 7 years ago

renarj commented 8 years ago

I have written a small test program to explore Atomix, I am obviously still learning so perhaps this is not really a bug and just intended. I have written a small test program that i start three times on a local machine with different ports bound to the node. I see the cluster forming succesfully and a leader was elected. Then i join a group called 'jasdb-group' and see all nodes succesfully join this group.

After an arbitrary 20 seconds I send two things, each node in its program sends a task that i expect to be received by every node. Also I send a direct message to each node using the 'connection.send()' method. However I am seeing two unexpected behaviours:

  1. When doing group.tasks.submit() this only works when submitted on the leader node, the follower nodes receive this. But anything submitted on a follower node never gets received elsewhere. I expected all nodes to receive any task even if submitted on a follower node, is this correct assumption?
  2. When iterating over all members in the group and sending a direct message using 'member.connection.send()' it goes into a big void and i don't see it arrive anywhere? What am I doing wrong here?

Here is the test program I am using:

public class AtomixTest {
    private static final Logger LOG = LoggerFactory.getLogger(AtomixTest.class);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String host = System.getProperty("host");
        int port = Integer.parseInt(System.getProperty("port"));

        Address address = new Address(host, port);
        List<Address> members = Arrays.asList(
                new Address("127.0.0.1", 5000),
                new Address("127.0.0.1", 5001),
                new Address("127.0.0.1", 5002)
        );
        Storage storage = new Storage(StorageLevel.MEMORY);
        Transport transport = new NettyTransport();

        LOG.info("Defining cluster");
        AtomixReplica replica = AtomixReplica.builder(address, members)
                .withStorage(storage)
                .withTransport(transport)
                .build();
        LOG.info("Cluster defined");

        CompletableFuture<Atomix> future = replica.start();
        Atomix atomix = Futures.getUnchecked(future);
        LOG.info("Cluster started");

        DistributedGroup jasdbGroup = atomix.getGroup("jasdb-group").get();

        jasdbGroup.onJoin(m -> {
            LOG.info("Member joined jasdb group: {} {}", m.id(), m.toString());
        });

        LOG.info("Joining jasdb group");
        jasdbGroup.join("member" + host + port).thenAccept(l -> {
            LOG.info("Joined group: {}", l);
            l.connection().onMessage("memberTopic", gm -> {
                LOG.info("Received a jasbd group message: {}", gm.body());
            });

            l.tasks().onTask(t -> {
                LOG.info("Received task: {}", t.task());
            });
        });

        LOG.info("Joining partition group");

        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.SECONDS);

        jasdbGroup.tasks().submit("Test message from node: " + host + ":" + port);

        jasdbGroup.members().forEach(m -> {
            m.connection().send("memberTopic", "Test message from: " + host + ":" + port);
        });

        LOG.info("Waiting for program end");
        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MINUTES);
        LOG.info("Exiting program");
    }
}
kuujo commented 8 years ago

This is great feedback.

So, what you're seeing with the connection API seems to be a consequence of a misconfigured group. The direct messaging API creates direct connections between members of the group, so each node must define a server Address when creating the group. This is necessary since groups can be created on clients as well, so a server is just started in all cases.

This is from the Javadoc for DistributedGroup:

DistributedGroup.Options options = new DistributedGroup.Options()
  .withAddress(new Address("localhost", 6000));
DistributedGroup group = atomix.getGroup("message-group", options).get();

That said, we've decided to make a last minute change to remove support for direct communication between group members in the next release candidate. While obviously useful, we believe that direct messaging between members of a group is outside the scope of Atomix and should be provided at a higher level perhaps in a separate project in the future. A pull request will be opened for this change today (it's in the group-packaging branch). This doesn't mean DGroup cannot do messaging, it just means it will only support more expensive messaging through the Raft algorithm (which is what the task queue does now). The new messaging API will combine sync/async/request-reply messaging through Raft.

MessageProducer.Options options = new MessageProducer.Options()
  .withExecution(MessageProducer.Execution.SYNC)
  .withDelivery(MessageProducer.Delivery.RANDOM);
MessageProducer<String> producer = group.messages().producer("foo", options);
producer.send(...)

On the task issue, you're only seeing messages arrive to members on the Raft leader? One reason for this could simply be that serialization is failing. The current release candidate had some serialization issues that have since been fixed in master. The reason the Raft leader's group could receive tasks that the other nodes can't is because the Raft leader's state machine communicates with the local DGroup instance without serialization. Are you using the current release candidate or master? Another RC will be pushed likely tomorrow.

I haven't been able to reproduce this in master, but I'll keep trying in the meantime.

Here are the updated DGroup tests in case we've missed anything: https://github.com/atomix/atomix/blob/group-packaging/groups/src/test/java/io/atomix/group/DistributedGroupTest.java

kuujo commented 8 years ago

The real reason for removing the direct messaging API is because it doesn't fit the atomic nature of Atomix, and that's demonstrated by the need to configure a server address for messaging. It will likely be replaced at some point, but the focus is on existing APIs for the time being.

renarj commented 8 years ago

Ok I understand on the direct messaging part, I just went there as I got unexpected behaviour and wanted to test other combinations. I am currently directly running on master branch.

About the task issue, I still do not 100% get this, I do the following right now on each node:

DistributedGroup jasdbGroup = atomix.getGroup("jasdb-group").get();
jasdbGroup.tasks().submit("Test message from node: " + host + ":" + port);

However what i am seeing is that all nodes only receive the task that was directly submitted on the leader (port 5000): 2016-04-05 10:09:05,664 INFO AtomixTest - Received task: Test message from node: 127.0.0.1:5000

All other tasks submitted by the followers are not received by either the leader or the followers. Should I for this also have a Address assigned to the distributed group? I find this a bit unhandy as this means for each group is sort of myself have to assign a 'leader' instead of the cluster just figuring out the group leader itself (which can be the rift leader i assume?)?

renarj commented 7 years ago

This scenario is currently working on latest Atomix release

kuujo commented 7 years ago

There are a couple of possibilities here. First, there is a bug report that has yet to be fixed that relates to serialization in session events. Specifically, when serialization fails, the exception is gobbled up and there's no evidence that it failed. That can cause a message not to be delivered and can I think ultimately cause the client to hang if events are created but never delivered (since events use a reliable/sequentially consistent delivery algorithm). In that case, the reason they'd work on some nodes and not others is quite simply because if a local replica is sending an event then no serialization is needed.

The other possibility is just that this is indeed a bug. The DistributedGroup resource is easily the most complex resource, and thus the messaging APIs are currently marked @Experimental until an FSM test framework can be developed. I don't feel comfortable with endorsing them until that's done. But if it is indeed a bug, it should be easy to reproduce with the code you've provided, which I'll try to do today.

On Apr 5, 2016, at 1:15 AM, Renze de Vries notifications@github.com wrote:

Ok I understand on the direct messaging part, I just went there as I got unexpected behaviour and wanted to test other combinations. I am currently directly running on master branch.

About the task issue, I still do not 100% get this, I do the following right now on each node:

DistributedGroup jasdbGroup = atomix.getGroup("jasdb-group").get(); jasdbGroup.tasks().submit("Test message from node: " + host + ":" + port); However what i am seeing is that all nodes only receive the task that was directly submitted on the leader (port 5000): 2016-04-05 10:09:05,664 INFO AtomixTest - Received task: Test message from node: 127.0.0.1:5000

All other tasks submitted by the followers are not received by either the leader or the followers. Should I for this also have a Address assigned to the distributed group? I find this a bit unhandy as this means for each group is sort of myself have to assign a 'leader' instead of the cluster just figuring out the group leader itself (which can be the rift leader i assume?)?

— You are receiving this because you commented. Reply to this email directly or view it on GitHub