zeromq / libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
https://www.zeromq.org
Mozilla Public License 2.0
9.75k stars 2.36k forks source link

TIPC M:N Pub/Sub multicast not working #3080

Closed antonymayi closed 6 years ago

antonymayi commented 6 years ago

Issue description

TIPC transport is capable of multicasting so one would expect this allows to operate pub-sub topology in M:N configuration. I am able to get 1:N pub-sub working but when adding more publishers, only messages from the first one are received.

I must admin it is not entirely clean to me how the TIPC service address/range should be used in this case (even after reading http://tipc.sourceforge.net/messaging.html ) - I can only get some results when subscribing to TIPC address (tipc://{1234,0}) instead of a range as the doc suggests (that results with zmq error).

Can anyone please explain how this should work (or confirm there is some issue)?

Environment

Minimal test code / Steps to reproduce the issue

Demonstrating the problem using two processes on single host acting each as both publisher and subscriber both using same mcast address:

$ tipc node set address 1.1.1
$ tipc bearer enable  media eth device lo

And then calling following script on same host in multiple (>=2) instances (the script is periodically publishing its PID and also printing any message received on the subscribed endpoint):

import zmq
import time
import os
import threading

c = zmq.Context()
pub = c.socket(zmq.PUB)
pub.bind('tipc://{1234,0,0}')

sub = c.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, b'')
sub.connect('tipc://{1234,0}')

msg = str(os.getpid()).encode('utf-8')

def recv():
    while True:
        print(sub.recv())

threading.Thread(target=recv, daemon=True).start()

while True:
    time.sleep(1)
    pub.send(msg)

What's the actual result? (include assertion message & call stack if applicable)

Each process is printing (receiving) only its own PID.

What's the expected result?

Each process is printing (receiving) not only its own PID but also PIDs of all other processes (since they are all expected to be publishing and subscribing to same mcast service).

antonymayi commented 6 years ago

btw. tried also with a 4 node cluster making all of the publishers (2) and subscribers (2) truly network-distributed but with a similar result (only one publisher working).

eponsko commented 6 years ago

Hi, I think in this case what you should actually get is load balancing between the two subscribers (on the phone so can't verify). What should happen is that the two publishers bind to the same service and the subscribers almost randomly chooses one of them (with preference for one the same machine). You could verify this by separating the subscribing socket to it's own script.

What also should happen is failover between the two publishers, i.e if you kill one of them the connected subscribers should automatically connect to the live one.

To get a M:N setup I you'll need to use the tipc topology service to monitor which port addresses (the <z.c.n:6373728> ones) are providing the service , 1234 in your case, and connect to those instead of the service number. Topology tracking is described a bit here http://tipc.sourceforge.net/programming.html#service_tracking , there should be some examples of code on source forge too.

Cheers Pontus

antonymayi commented 6 years ago

Hi Pontus,

thanks for your help (again). You are right explaining the loadbalancing - when decoupling the subscribers into separate process I can demonstrate the publisher failover as you described it.

This all goes well along the TIPC documented features per the http://tipc.sourceforge.net/messaging.html which describes this scenario as:

Within a group there are four different transmission modes available. ... Anycast when the sender indicates a service address as destination. If there is more than one matching destination, one will be selected by round-robin... ...

(there is slight difference in their and my scenario as my service is the publisher while in their case it is a receiver but assuming this is analogical) But what I am actually after is the very next documented feature:

Multicast when the sender indicates a service range as destination. If there is more than one matching destination, all of them will receive a copy of the sent message.

My understanding is that if I were subscribing to a service range instead of just service address, it would then - instead of picking single publisher in round-robin fashion - be using all of them at the same time. But I was never able to receive any message when connecting to a service range (tipc://{1234,0,0} in my case) instead of the address (tipc://{1234,0}) so not sure how to implement the multicast they are talking about.

The concept of the topology service monitoring the individual publishers and allowing to connect to their port addresses is powerful and I'll give it some more thoughts but what worries me is: a) is this achievable using zeromq API without need for direct TIPC API? otherwise this cannot be used in languages that have no TIPC API. b) is this really what TIPC calls a multicast (as per the ref I quoted)? isn't this rather application-level full mesh of unicasts (each subscriber responsible for tracking masters establishing a channel with each of them individually)?

Thanks, Antony.

eponsko commented 6 years ago

I just noticed they have updated the TIPC website, looks much better now, but perhaps could be a bit clearer about the different modes they support. This page goes into more detail. In particular have a look at sections 1.3.X which describe the UDP-like SOCK_DGRAM/RDM, TCP-like SOCK_STREAM/SEQPACKET, and the new group messaging feature.

The first text you refer to (Within a group there...) is discussing the Communication Group feature of TIPC which is new since kernel 4.14. I believe it is capable of doing more or less what you expect it to do, I haven't had much time to play with this feature yet so I'm not able to say. It is however not used by ZeroMQ which only supports the SOCK_STREAM type TIPC sockets. It would be nice if support could be added to ZeroMQ, but I'm not sure how the ZMTP protocol used by ZeroMQ could handle it since it's designed for 1-to-1 connections.

You could also get the M:N behavior using TIPC sockets directly, without the Communication Groups support, but you'd have to use either SOCK_DGRAM och SOCK_RDM, which doesn't have connections or the flow control supported for groups. I've been planning to add support for SOCK_DGRAM TIPC to ZeroMQ (basically copying the existing UDP support) but haven't had any time for that either..

Regarding the topology service: a) it should be possible, the topology service is accessed by connecting to a local TIPC socket, and sending messages to (un-)subscribe to notifications and receiving updates. I've only used it in C but shouldn't be any problem to do it from e.g. Python since the TIPC socket API is just the same as the BSD socket API used for UDP, TCP, etc. You'd need to construct and parse the messages but shouldn't be too complicated. In this C-demo code you can see how the message structures are laid out.

b) it's not what TIPC calls multicast for datagrams or groups, but it is for connections, which is what ZeroMQ is using. Each subscriber does indeed have to add/remove connections to the publishers. However, the tracking is handled by TIPC and accessible through the topology service.

If you know how many publishers you will have you could also use different instance numbers and get the same functionality without the topology service. I.e. if you bind publisher1 to tipc://{1234,0,0}and publisher 2 to tipc://{1234,1,1} you could probably do something like:

sub.connect('tipc://{1234,0}')
sub.connect('tipc://{1234,1}')

or sub.connect(['tipc://{1234,0}','tipc://{1234,1}']) (I'm not sure how the Python API does it)

Cheers, Pontus

antonymayi commented 6 years ago

Very comprehensive, thanks.

I'll go through the TIPC Programmer's Guide to understand the different modes better but what looks to me after quick read-through as the perfect match for ZeroMQ Pub-Sub is this Datagram Messaging. The fact it has no connections/flow control just goes along with the philosophy of scalability over reliability which is what the ZeroMQ Pub-Sub is afaik based on.

So if you ever manage to get the time to integrate DGRAM TIPC with ZeroMQ I would deem it ZeroMQ's best mcast implementation (my biased view obviously). No pressure though ;-).

Thanks for the alternative workarounds, I'll give it a try.

Cheers, a.

eponsko commented 6 years ago

Yeah it would be quite nice.. I'll try to allocate some time for that. However, if that's the pattern you are looking for, you could consider the UDP multicast in ZeroMQ which should be able to do it right now.

Cheers

antonymayi commented 6 years ago

My understanding is UDP works only with the new RADIO/DISH sockets from the Drafts API which is not stable/released yet (ie in Python these sockets don't work with eventloops (#2941) which is, unfortunately, a blocker for my use case).