nats-io / nats-streaming-server

NATS Streaming System Server
https://nats.io
Apache License 2.0
2.51k stars 283 forks source link

Super cluster with gateways sending multiple copies #1083

Closed wilstoff closed 4 years ago

wilstoff commented 4 years ago

Currently we have a super cluster setup with 3 data centers each with 3 nodes each. Each data center cluster is configured as a cluster within the data center and has gateway configurations for the other data center nodes. When we have this configuration every stan publish is heard 3 times by subscribers. If we only have a single cluster with 3 nodes we get the expected behavior of 1 message per publish. These aren't replayed messages, but it seems like multiple clusters are trying to send the message to the subscriber at the same time. First off is this a valid configuration or are we using it incorrectly? What is the recommended configuration for a 3 data center 3 node cluster per data center? Do we have to do a full mesh cluster? Note we embed nats and it does not duplicate the message when we publish. e.g.

listen: hostA:4222
http_port: 8222
debug: True
trace: False
max_pending: 268435456
max_payload: 10000000
write_deadline: 5s
cluster: {
  listen: hostA:5222
  routes: [nats-route://hostA:5222, nats-route://hostB:5222, nats-route://hostC:5222]
}
streaming: {
  cluster_id: belvedere_streaming
  store: file
  dir: /var/cache/nats1
  store_limits: {
    ...
    }
  }
  cluster: {
      node_id: A
      peers: [B, C]
      log_path: /var/cache/nats_stream1
  }
}
gateway: {
  name: DATACENTER1
  listen: hostA:7222
  gateways: [{name: "DATACENTER2", urls: ["nats:/hostD:7222", "nats://hostE:7222", "nats://hostF:7222"]}, {name: "DATACENTER3", urls: ["nats://hostG:7222", "nats://hostH:7222", "nats://hostI:7222"]}]
  reject_unknown: True

}

may be related to https://github.com/nats-io/nats-streaming-server/issues/1054

kozlovic commented 4 years ago

You can't have same cluster ID not clustered together. Yes, this is same than https://github.com/nats-io/nats-streaming-server/issues/1054.

You either have a single cluster that spans all DCs, but I would not recommend that at all, or you have to have replication across each cluster.

wilstoff commented 4 years ago

So the solution to avoid duplication, but have the same cluster id on all 9 nodes (since messages/topics can go cross site) would be to use: https://github.com/nats-io/nats-replicator?

kozlovic commented 4 years ago

No, you can't have the same cluster ID. Since they are part of the same NATS network (super-cluster), you would not prevent messages to be handled by each Streaming cluster in each DC. They have to be a different cluster ID. To replicate data, then yes, some users use nats-replicator.

wilstoff commented 4 years ago

I'm sorry i'm not really grasping the solution for us right now.

Our requirements are: 1) any topic can be published or subscribed to from any data center 2) clients within a data center will connect to only the servers within that data center (we config this on startup) 3) each message isn't duplicated when received from subscribers (except in the event of replay after timeout)

What is the best path forward? We've tried changing the streaming -> cluster_id config to one per data center (to match how the actual clusters are setup) and that doesn't correctly forward stan messages between the data centers.

kozlovic commented 4 years ago

That may not be possible to do with NATS Streaming. Are the clusters supposed to be mirrors of each other? Or are they independent ones? You can have a replicator to replicate data from one cluster to the 2 others (and have similar setup in each DC) so the data gets replicated in every DC. You still need to use different cluster-id because otherwise you will have problems described previously. The clients from one cluster will therefore produce/consume to their only cluster - since it is based on the cluster id. The replicators are the ones that will have connections to 1 cluster and another connection to other cluster to push messages there.

wilstoff commented 4 years ago

To me this is supposed to work effectively like one giant cluster of 9 nodes, except where the clients of data center A, can only communicate with data center A (federation).

It is very much what is like described with the slides on https://www.cncf.io/wp-content/uploads/2019/09/NATS-CNCF-Webinar-Sep-2019.pdf about cluster of clusters. We may have assumed that would work for STAN as well.

We can setup one giant cluster of 9 nodes, but are concerned about about how each client may automatically find other nodes. We setup the client to pass in only the 3 nodes in its data center, but with failover of lets say one node in its data center, will it try to connect to a node not in its data center, or not in the initial passed in list? Mostly talking from C#/C/Python clients.

Given this example with a 9 node cluster (no gateways): DataCenter DC-A (node1-3) DataCenter DC-B (node4-6) DataCenter DC-C (node7-9) Publisher P1 (connected to node 1) Consumer C1 (connected to node 4) Consumer C2 (connected to node 5) Consumer C3 (connected to node 4)

Would messages from P1 go to node1 -> C1 and node1 -> C2 and node1 -> C3 (duplicated) Or would messages go from P1 to node1 -> node4 -> C1/C3/node5 -> C2 ?

If we want to have de-duplication (across data centers) and fan out within data center what is the best configuration for STAN? To me at least from our testing NATS with gateways is doing this as we expect, but STAN is not.

kozlovic commented 4 years ago

We can setup one giant cluster of 9 nodes, but are concerned about about how each client may automatically find other nodes.

To me that is not the issue. The issue is having a cluster of 9 nodes across regions which won't work well with NATS Streaming clustering since it is RAFT based and RAFT protocol is chatty and sensitive to latency.

We setup the client to pass in only the 3 nodes in its data center, but with failover of lets say one node in its data center, will it try to connect to a node not in its data center, or not in the initial passed in list?

Even with core NATS, if you give a client the URL of 1 server in a cluster, it will get the list of the other members of THAT cluster (assuming you did not disable address advertisement) but will NOT learn about other clusters. So even in that case, a NATS client cannot fail over to a different cluster. If you do provide the URLs of all clusters, then there is a possibility that it connects to any server in the list (so any cluster). This is not what you want. There is currently no built-in mechanism to try all URLs of a given cluster and then fallback to a different cluster, etc..

Given this example with a 9 node cluster (no gateways): DataCenter DC-A (node1-3) DataCenter DC-B (node4-6) ... Would messages from P1 go to node1 -> C1 and node1 -> C2 and node1 -> C3 (duplicated) Or would messages go from P1 to node1 -> node4 -> C1/C3/node5 -> C2 ?

Again, I don't think it is a good idea to create a NATS Streaming cluster of 9 nodes across different regions. But if it was a single cluster, then produced message from any node will make it to the leader (single leader for the whole cluster), then would be RAFT replicated to all nodes in the cluster (leader+8) then ack'ed back to the publisher. For consumers, again, regardless where they are connected they would get the message sent from the leader. There would not be duplication of messages in this setup since it is a single cluster.

wilstoff commented 4 years ago

So we reset our configs and restarted the nodes, and now everything is working in our dev environment. Currently we have everything as the same cluster_id (all 9) and we have clusters and gateways like below. From your understanding if we do this, would you expect to get 3X messages on the subscribers always or sometimes (some race condition or something)?

We looked into the replicator route, but the configs weren't clear if they would allow wild cards to map topics. It seemed very hard coded which we are trying to avoid. If we were to go this route if a message can be published on topic T from data center A and listened to by B C, and another message by published from C and be replicated to B and A? How is the state of the topic managed then? Would each DC have the same order of messages or would they all have different states?

What is the difficulty with Nats streaming that prevents the kind of setup like this where for Nats it works like described in the presentation? Lastly you said it may not be possible to do what we want, what do you think is the best path forward?

listen: hostA:4222
http_port: 8222
debug: True
trace: False
max_pending: 268435456
max_payload: 10000000
write_deadline: 5s
cluster: {
  listen: hostA:5222
  routes: [nats-route://hostA:5222, nats-route://hostB:5222, nats-route://hostC:5222]
}
streaming: {
  cluster_id: belvedere_streaming
  store: file
  dir: /var/cache/nats1
  store_limits: {
    ...
    }
  }
  cluster: {
      node_id: 001
      peers: [002, 003]
      log_path: /var/cache/nats_stream1
  }
}
gateway: {
  name: CHI
  listen: 10.10.161.146:7222
  gateways: [{name: "DC1", urls: ["nats://hostA:7222", "nats://hostB:7222", "nats://hostC:7222"]}, {name: "DC2", urls: ["nats://hostD:7223", "nats://hostE:7223", "nats://hostF:7223"]}, {name: "DC3", urls: ["nats://hostG:7224", "nats://hostH:7224", "nats://hostI:7224"]}]
  reject_unknown: True

}
kozlovic commented 4 years ago

I am sorry, but you are trying to make something work although I explained that it won't. If you have the same cluster_id across different NATS clusters, as long as those clusters see each other (connected with gateways), you will end-up with published messages being stored in may places, nodes being confused because they will have the same "peer" (based on the cluster and peer node name) multiple times, etc.. This is going to fail in a bad way.

Moreover, I see that you have in the above config "CHI" as the name of this cluster, but then 3 connections to other clusters named DC1, DC2 and DC3. I assume that this is just for illustration and one of this gateway is actually "CHI", otherwise we are now talking about 4 clusters...

If we were to go this route if a message can be published on topic T from data center A and listened to by B C, and another message by published from C and be replicated to B and A? How is the state of the topic managed then? Would each DC have the same order of messages or would they all have different states?

Different. Again, in that route each streaming cluster would have different cluster id, so see that as different systems.

What is the difficulty with Nats streaming that prevents the kind of setup like this where for Nats it works like described in the presentation?

NATS Streaming is RAFT based and forms a cluster of node that gets data replicated in this cluster. Core NATS does not have persistence, there is no RAFT, no consensus.

I just experimented with a small case: 2 nats servers connected through gateways and 3 nats streaming servers configured as a cluster of 3 on each side, but sharing the same cluster id. Enable cluster_raft_logging and you will see the mess that this is causing. In my case I was getting the message duplicated once (there is only 2 gateways).

Lastly you said it may not be possible to do what we want, what do you think is the best path forward?

Not sure. I can't think of a way to make this work, but maybe @wallyqs and/or @derekcollison have had done similar setups and can make it work.

wilstoff commented 4 years ago

Thanks for your help definitely appreciate it. Yes CHI is meant to be DC1 just forgot to update that part of the file.

Looks like we'll go the separate cluster_id route and then replicators between the stan clusters. After briefly looking at the code i could imagine some dynamic improvements for the replicator (assuming it has access to a nats streaming store). If it were to monitor the store for additions, it could dynamically spin up a STANtoSTAN replicator when it gets a new topic. Ideally in a 3 node cluster we'd have one of these replicators on each node, within a common queue group and durable subscriptions. We have a sort of Hub and spoke model, where DC2 connects to DC3 and DC1 directly so we'd need only 3 in DC1/DC3 and 6 in DC2. We'd have to be careful not to duplicate message back though. Does this seem reasonable to you? (i may be oversimplifying as i could see some difficulty in not having messaging routing backwards)

If that setup seems reasonable and works, it would be great to be able to hook that into stan streaming code itself and take out the extra hops/client communication. It would lose absolute ordering (which i don't think is possible without raft or consensus cross data center), but i think that's acceptable.

kozlovic commented 4 years ago

@wilstoff

If it were to monitor the store for additions

Well right now there is no notification from NATS Streaming that channels are added or removed (if using max inactivity). You can use monitoring endpoints to get the list of channels and figure out that, but may not be ideal.

Ideally in a 3 node cluster we'd have one of these replicators on each node, within a common queue group and durable subscriptions. We have a sort of Hub and spoke model, where DC2 connects to DC3 and DC1 directly so we'd need only 3 in DC1/DC3 and 6 in DC2. We'd have to be careful not to duplicate message back though

That's the problem. When a message is say replicated from DC2 to DC1, but in DC1 you have a replicator that replicates to DC2 and DC3, then you would have to be careful to only replicate messages that originated from DC1. That means that you would need some kind of information in each message to include the origin of the cluster - as to suppress replication.

To be clear, often users that use nats replicator is in the context of replicating data from the active cluster to a disaster recovery one, not replicating back and forth from "active/live" clusters.

Does this seem reasonable to you?

See above. If really those are active clusters and they are meant to be global, then it really looks like this should be a single big cluster, but with the drawbacks that were previously mentioned: RAFT is chatty and sensitive to latency, which is therefore not ideal in WAN networks. If going this way, I would then reduce the cluster size to 3, one on each DC, but the other point is that Streaming cluster works in a way that only the leader handles client messages, which means that producers/consumers that are not in the DC where the leader is running would suffer from having messages sent/received from another DC.

Finally, not sure if you are aware, but we are working on a Streaming 2.0 if you will, that is called JetStream. It is in tech preview stage (single server) and the team is working on clustering at the moment. Depending on how far you are in your design, and what is your deadline, you could have a look at it.

wilstoff commented 4 years ago

I haven't looked at jetstream in detail, but plan on doing so once it has clustering support.

So we came to the same conclusion on having to add information about the origin cluster to the topic name, after looking at this article that describes the same issue but within the kafka world: https://www.altoros.com/blog/multi-cluster-deployment-options-for-apache-kafka-pros-and-cons/ With the Active-active configuration all we need to do is prepend the data center's cluster id to any topic published (which is easy because we wrapped the STAN and NATS api with our own). And then also change the subscription of any topic to subscribe to multiple topics (1 for for each data center prepended with cluster id).

We plan on then figuring out how to dynamically spin up the replicators (easy can watch the cache folder or monitoring endpoints), as well as spin down (harder because max_inactivity won't delete the topic if a replicator still has a subscription to it) when there are no messages.

Ideally if we could hide all of this within nats streaming it would be great. Basically have some kind of SuperCluster mode. This would hide all the topic differences and present to each subscriber/publisher the topic without clusterid prepended. It would also allow configuration to point the direction you want the replication to go. Currently we have a Hub and Spoke model which one data center is the central and the other point inward. But others may want a more direct connection depending on location of those data center.

Hub and spoke config

{
    ClusterId: "dc1"
    SuperClusterMode: on
    Replication: ["dc1->dc2"]
}
{
    ClusterId: "dc2"
    SuperClusterMode: on
    Replication: ["dc2->dc1", "dc2->dc3", "dc1->dc3", "dc3->dc1"]
}
{
    ClusterId: "dc3"
    SuperClusterMode: on
    Replication: ["dc3->dc2"]
}
kozlovic commented 4 years ago

Hub-and-spoke is not really meant for super-clusters. The design of gateways is for it to form a "full mesh". So if DC1 knows about DC2 and DC2 knows about DC3, then DC1 would try to connect to DC3. I understand that you use "reject unknown", but again, that was not the intent of this boolean: it was meant to prevent unconfigured GW to connect, not to "shape" a desired topology. For Hub-and-spoke, you may want to look at leafnodes instead.

If you had full mesh gateways, I would have each DC responsible to replicate to the 2 other clusters instead of having DC2 being responsible to replicate DC1 messages down to DC3 and vice-versa.

And there is no plan to add the replicator as part of the streaming server. Not to say that we won't be adding any new feature (outside of bug fixes) to NATS Streaming server, but most of our efforts will go to JetStream.

Anyway, I am closing this issue now since it is understood why the multiple messages came from.

wilstoff commented 4 years ago

Ah yes sorry i forgot to mention we'd remove the gateway configuration, and separate the nats server processes from the nats streaming processes. That way we can configure nats with gateways into a full mesh and perhaps nats streaming into a different model with no connection between clusters outside of these replicators. But that is a good point on how we are using gateways nats and we should revisit that configuration, although it is working for us now.