MetPX / sarracenia

https://MetPX.github.io/sarracenia
GNU General Public License v2.0
45 stars 22 forks source link

winnow + vip enhancement #913

Open reidsunderland opened 9 months ago

reidsunderland commented 9 months ago

When running a winnow on a cluster, you have to use a VIP to ensure that duplicates get suppressed.

If you don't use a VIP, each node will have different nodupe caches and duplicates could get through if the first fileA message goes to node1 and the second (duplicate) fileA message goes to node2.

Currently on v2 and sr3, the node with the VIP subscribes and posts messages and the other nodes do nothing. If the VIP switches to a different node, the nodupe cache on that node will be empty and there's a chance duplicate messages could be posted.

An improvement would be to make this work more like poll does now.

All nodes would need to use a unique queue subscribed to the source exchange.

Node with the VIP:

Nodes without the VIP:

petersilva commented 9 months ago

This is the same as v2 behaviour... not a regression but a opportunity for improvement.

petersilva commented 9 months ago

Current method sends duplicates when the vip changes owners. Idea/goal is to fix that.

vip voting scheme can put significant time periods (minutes) where the vip is either in the wrong place, or no-place while a transfer is in progress.

Method 1: separate queues, vip gates posting.

SWOT:

Method 2: poll style.

SWOT:

observations...

petersilva commented 9 months ago

Method 3: one queue with two bindings... then use the exchange to differentiate input from output.

This would address the weakness of both other methods where if a node is slowly dying, the failover node will start queueing up stuff that it hasn't seen in the output queue, and when it gets the vip, it will catch up.

This works if the input and output exchanges are on the same broker, so that a single queue can have bindings to both.

petersilva commented 7 months ago

method 4: nodupe.sync class... gather() implements a second subscriber. with settings:

is is installed with callback_prepend. has two entry points: gather, and after_accept.

Gather is a normal gather (like gather/message) but for every message gathered, you add a field "m["from_nodupe_sync_cache"] = True.

Then have an after_accept entry point that drops all messages with that field in it, so the cache is primed.

This seems really easy to do... and kind of a general way to explore shared state caches.