ezmsg-org / ezmsg

Pure-Python DAG-based high-performance SHM-backed pub-sub and multi-processing pattern
https://ezmsg.readthedocs.io/en/latest/
MIT License
15 stars 6 forks source link

zmq_publisher is unrecoverable after a subscriber closes #110

Closed cboulay closed 6 months ago

cboulay commented 8 months ago

https://github.com/iscoe/ezmsg/blob/0e78703cdaad3f136fbde8b2df91aeadbf7bae37/extensions/ezmsg-zmq/src/ezmsg/zmq/units.py#L205-L213

In the above snippet, the publisher advances from waiting for the socket to open to continuous publishing until the socket has closed, then the publisher returns and is never heard from again until the pipeline is restarted. Is this intentional?

I think a more flexible solution would be something like the following:

while True:
    if not self.socket_open: 
        await asyncio.sleep(POLL_TIME) 

    if self.socket_open: 
        poll_result = await self.STATE.socket.poll( 
            self.SETTINGS.poll_time * 1000, zmq.POLLIN 
        ) 
        if poll_result:
            ...
cboulay commented 8 months ago

Actually, after looking over socket_monitor, it seems as though this is intentional. i.e., if a socket was open then it is closed, theZMQPollerUnit should shutdown. What's the use case? Why does the ezmsg pipeline's zmq publishers care what the subscribers are doing?

If, in addition to the above change, I comment out the break lines in socket_monitor then this now works how I originally expected it to. Clients can disconnect and reconnect.

griffinmilsap commented 7 months ago

I agree with your read @cboulay. Although I think @pperanich might have more to say about this, I believe some of the early work we supported with this Unit facilitated a connection between an external data sink (in a Docker container) and our ezmsg system. It was a one-to-one connection and this code-path was never really evaluated.

I believe your commit above to your working branch is pretty innocuous and I don't believe we're currently running any systems with zmq_publisher that would be directly affected by this change. Id be happy to accept this change in a PR in pretty short order