valkey-io / valkey

A flexible distributed key-value datastore that supports both caching and beyond caching workloads.
https://valkey.io
Other
14.86k stars 529 forks source link

[NEW] Pub/Sub in Cluster Mode #546

Open hpatro opened 2 months ago

hpatro commented 2 months ago

The problem/use-case that the feature addresses

Pub/Sub in cluster mode uses the clusterbus to propagate the message across the cluster. Regular publish (https://valkey.io/commands/publish/) broadcasts the message to all the nodes in the cluster. Sharded publish (https://valkey.io/commands/spublish/) broadcasts the message within the shard a channel belongs to. Sharded pubsub reduces the volume of message transfer by a lot however there are still some challenges. The usage of clusterbus for data propagation is a nice mechanism to propagate information however it has two issues:

  1. Cluster health - The clusterbus gets overwhelmed if the pubsub traffic is really high and can cause the cluster link buffer to grow and eventually get disconnected. This leads to unnecessary failover(s) and unhealthy cluster state.
  2. High metadata overhead - The standard cluster message packet has lot's of cluster health/topology information which aren't related for the pubsub data to be transferred.

The solution(s) which have been brought up so far:

  1. Improve SPUBLISH by using replication link https://github.com/valkey-io/valkey/pull/307
  2. Improve PUBLISH by allowing multiple messages to be broadcasted in a single message https://github.com/valkey-io/valkey/issues/525
  3. Improve both PUBLISH/SPUBLISH by reducing the payload size of cluster message for pubsub data https://github.com/valkey-io/valkey/issues/525#issuecomment-2125354944

Option 3 with provision to pack multiple message(s) should be a generic solution applicable to both classic/sharded pubsub. I'm leaning towards it.

However, issue 1 described above can still happen due to the mixed usage of the clusterbus. Hence, I would like to propose to have an additional clusterbus mesh which should be only used for data transfer purposes (maybe we can call it "databus"). This would allow cluster gossip communication to be unaffected by the pubsub commands. Over this databus, we could implement solution 3, a lightweight metadata to transfer data/commands. We would also need to have similar properties like cluster link management on top of the databus for operational usage.

hpatro commented 2 months ago

@valkey-io/core-team Thoughts?

zuiderkwast commented 2 months ago

Hi Harkrishn. For the "databus", you're thinking a separate port where all cluster nodes form another full mesh of connections? This port would also need to be distributed in the cluster bus using a ping extensions then?

Since we're talking about changing the cluster bus to "v2", a Raft cluster of some sort, I think we shouldn't make too large changes to the existing cluster design.

I think a new lightweight cluster bus message is a small change that doesn't need any additional config. Just one bit in the clusterMsg to indicate the support for this.

Replication likewise: I think it's straight-forward. We'll add a version field to the REPLCONF command which can be used to detect if pubsub can be sent this way or not. #414

To protect the cluster bus from starvation, it's not enough to handle pubsub in a separate connection. Even its own thread will not completely solve that problem. It still shares resources with other traffic, not least network bandwidth. (For this, we could consider https://en.wikipedia.org/wiki/Differentiated_services, or the ToS field in IPv4, to mark the IP packets as high prio.)

madolson commented 1 month ago

I think a new lightweight cluster bus message is a small change that doesn't need any additional config. Just one bit in the clusterMsg to indicate the support for this.

+1, this seems like the way. It'll be a bit annoying because now we'll have basically two clustebus messages (the compact and full version). We'll also need to update a bunch of logic since we can use pubsub messages as proxies for ping/pong messages, which is no longer true since it's missing most of the payload. It seems like the right optimization to make though.

zuiderkwast commented 1 month ago

We'll also need to update a bunch of logic since we can use pubsub messages as proxies for ping/pong messages, which is no longer true since it's missing most of the payload.

@madolson Pubsub messages don't touch ping_sent or pong_received timestamps. They do touch the data_received timestamp though:

    /* Update the last time we saw any data from this node. We
     * use this in order to avoid detecting a timeout from a node that
     * is just sending a lot of data in the cluster bus, for instance
     * because of Pub/Sub. */
    if (sender) sender->data_received = now;
PingXie commented 1 month ago

+1 on the lighter version of clusterMsg.

+1 on not investing too much into a new "cluster infra", aka "data bus", just yet. We had a few offline chats but we haven't looked deep into moving the existing cluster bus off of the main thread, which I think would bring more bang for the buck.

The use of pubsub as proxy for ping/pong is an anti-pattern IMO. I am for removing this coupling.

madolson commented 1 month ago

@zuiderkwast Correct, we use it as part of the health check as part of data_delay https://github.com/valkey-io/valkey/blob/unstable/src/cluster_legacy.c#L4730. As long as we are updating that field in the new flow we shouldn't be introducing any new failure modes.

madolson commented 1 month ago

The use of pubsub as proxy for ping/pong is an anti-pattern IMO. I am for removing this coupling.

Just like to clarify we aren't using it for ping/pong messages, we were using it as a proxy for ping/pong message if we had it. If you have a high amount of pubsub traffic, it is what is preventing unexpected failovers.

hpatro commented 1 month ago

Hi Harkrishn. For the "databus", you're thinking a separate port where all cluster nodes form another full mesh of connections? This port would also need to be distributed in the cluster bus using a ping extensions then?

Since we're talking about changing the cluster bus to "v2", a Raft cluster of some sort, I think we shouldn't make too large changes to the existing cluster design.

@zuiderkwast Thanks for the feedback.

I was thinking of a separate connection over the same port with a limited buffer size where the connection can be dropped if the threshold limit exceeds. I agree it will be a much larger change. A smaller payload size should go a long way helping with the issue, however we shouldn't treat Cluster V1 as maintenance mode. Cluster V2 will take atleast 2-3 major version to be well adopted, till then we should improve for the best of the users.

To protect the cluster bus from starvation, it's not enough to handle pubsub in a separate connection. Even its own thread will not completely solve that problem. It still shares resources with other traffic, not least network bandwidth. (For this, we could consider https://en.wikipedia.org/wiki/Differentiated_services, or the ToS field in IPv4, to mark the IP packets as high prio.)

This sounds interesting, will learn more about it.