codypiersall / pynng

Python bindings for Nanomsg Next Generation.
https://pynng.readthedocs.io
MIT License
268 stars 58 forks source link

Subscriber don't receive a few first message or don't receive anything #42

Closed khssnv closed 5 years ago

khssnv commented 5 years ago

Hello.

I see pubsub always missing a few first messages or sometimes don't recieve messages at all. Please find my pub and sub scripts below with a log.

Is it possible to implement reliable message delivery?

I very like nng ideas (like surveyor/respondent and push/pull), hope it will grow with reliability enogh to use!

There where three publisher runs. At first subscriber recieves messages from 2nd, at second try it missed all 13 messages and at third try it starts recieving from 3rd one. In all of this I've started subscriber before publisher.

subscriber.py ```python import asyncio, pynng async def subscriber(): sub = pynng.Sub0(dial="tcp://localhost:5555") sub.subscribe(b"") while True: incoming = await sub.arecv() print(f"incoming: {incoming}") asyncio.get_event_loop().run_until_complete(subscriber()) ```
publisher.py ```python import asyncio, pynng async def publisher(): pub = pynng.Pub0(listen="tcp://localhost:5555") count = 1 while True: msg = str(count).encode("utf-8") print(f"outgoing: {msg}") pub.send(msg) count += 1 await asyncio.sleep(1) asyncio.get_event_loop().run_until_complete(publisher()) ```
$ python publisher.py ```console ╭─khassanov@32vd ~/Workspace/ ╰─$ python publisher.py outgoing: b'1' outgoing: b'2' outgoing: b'3' outgoing: b'4' KeyboardInterrupt ╭─khassanov@32vd ~/Workspace/ ╰─$ python publisher.py outgoing: b'1' outgoing: b'2' outgoing: b'3' outgoing: b'4' outgoing: b'5' outgoing: b'6' outgoing: b'7' outgoing: b'8' outgoing: b'9' outgoing: b'10' outgoing: b'11' outgoing: b'12' outgoing: b'13' KeyboardInterrupt ╭─khassanov@32vd ~/Workspace/ ╰─$ python publisher.py outgoing: b'1' outgoing: b'2' outgoing: b'3' outgoing: b'4' outgoing: b'5' KeyboardInterrupt ```
$ python subscriber.py ```console ╭─khassanov@32vd ~/Workspace/ ╰─$ python subscriber.py incoming: b'2' incoming: b'3' incoming: b'4' incoming: b'3' incoming: b'4' incoming: b'5' ```
codypiersall commented 5 years ago

Hi @khssnv,

Thanks for trying out the library and for opening an issue.

I think what is happening here is that some messages are dropped before the connection between the pub/sub sockets are actually established; that's why the first few messages are always dropped. If you have the publisher wait until sockets are connected to start publishing data, you should see better results. Something like this may do the trick:

new_publisher.py ```python3 import asyncio import time import pynng async def wait_for_connection(sock, timeout=60000): """ (Asynchronously) wait for a connection to establish on the socket. If a timeout does not occur within ``timeout`` ms, a ``pynng.Timeout`` exception is raised. """ start = time.monotonic() timeout_time = start + timeout / 1000 while time.monotonic() < timeout_time: if len(sock.pipes) > 0: return # a better implementation may set condition variables, etc. await asyncio.sleep(0.01) else: msg = 'Connection was not established within {} ms' raise pynng.Timeout(msg.format(timeout)) async def publisher(): pub = pynng.Pub0(listen="tcp://localhost:5555") count = 1 await wait_for_connection(pub) while True: msg = str(count).encode("utf-8") print(f"outgoing: {msg}") pub.send(msg) count += 1 await asyncio.sleep(1) asyncio.get_event_loop().run_until_complete(publisher()) ```

I use a synchronous version of wait_for_connection in my test suite. Maybe pynng should add synchronous and asynchronous versions of this function?

As to your other question:

Is it possible to implement reliable message delivery?

Hopefully what I showed above is Good Enough™. pub/sub is designed to be able to drop messages in nng; the problem is that a slow peer could otherwise DoS all other connections. For a lot of use cases the pub/sub handling is good enough, and there are some mitigations that can help, like setting a higher [recv_buffer_size on subscribers and send_buffer_size](https://pynng.readthedocs.io/en/v0.4.0/core.html#socket-attributes) on publishers.

khssnv commented 5 years ago

Hi @codypiersall, thank you for your answer! Yes, wait_for_connection seems useful. I understand here nng fits best for fast applications value fastest message exchange assuming there is a small probability of losses.