codypiersall / pynng

Python bindings for Nanomsg Next Generation.
https://pynng.readthedocs.io
MIT License
271 stars 59 forks source link

Messages being dropped or how to send reliably #41

Open chaoflow opened 5 years ago

chaoflow commented 5 years ago

I'd like to use polyamorous Pair1 over unix domain sockets and would like to make use of flow control on the unix domain socket, i.e. if the receivers cannot process messages fast enough the sender should not be allowed to put further messages into the socket. So far, I fail to achieve this, but instead messages are simply dropped.

send_buffer_size and receive_buffer_size I understand to be within nng code not influencing buffering in the kernel.

from pynng import Pair1

address = f'ipc://nng.socket'
with Pair1(polyamorous=True, send_buffer_size=1, recv_buffer_size=1, recv_max_size=0) as s0, \
     Pair1(polyamorous=True, send_buffer_size=1, recv_buffer_size=1, recv_max_size=0) as s1:
    s0.listen(address)
    s1.dial(address)
    send_idx = recv_idx = 0
    while True:
        print(f'SEND {send_idx}')
        s1.send(str(send_idx).encode('ascii'))
        print(f'SEND {send_idx} DATA')
        s1.send(1 * 2 ** 20 * b'0')
        send_idx += 1

        print(f'SEND {send_idx}')
        s1.send(str(send_idx).encode('ascii'))
        print(f'SEND {send_idx} DATA')
        s1.send(1 * 2 ** 20 * b'0')
        send_idx += 1

        print(f'RECV {recv_idx}')
        data = s0.recv()
        assert len(data) != 1 * 2 ** 20 and len(data) == len(str(recv_idx)), \
            (len(data), data[0], recv_idx)
        assert data.decode('ascii') == str(recv_idx), (data, recv_idx)

        print(f'RECV {recv_idx} DATA')
        print(f'RECV {recv_idx}')
        data = s0.recv()
        assert len(data) == 1 * 2 ** 20
        recv_idx += 1

Given that I'm sending twice as much as I'm receiving, I'd expect this to block at some point during send. Instead messages are dropped:

...
RECV 4 DATA
RECV 4
SEND 10
SEND 10 DATA
SEND 11
SEND 11 DATA
RECV 5
Traceback (most recent call last):
  File "asyn.py", line 27, in <module>
    (len(data), data[0], recv_idx)
AssertionError: (1048576, 48, 5)

I tried to use select.select() to determine whether a socket is ready for sending. However, this seems not to be the case, i.e. the call to select does not return.

import select
from pynng import Pair1

address = f'ipc://nng.socket'
with Pair1(polyamorous=True, send_buffer_size=1, recv_buffer_size=1, recv_max_size=0) as s0, \
     Pair1(polyamorous=True, send_buffer_size=1, recv_buffer_size=1, recv_max_size=0) as s1:
    s0.listen(address)
    s1.dial(address)
    send_idx = recv_idx = 0
    while True:
        print(f'SEND {send_idx}')
        select.select([], [s1.send_fd], [])  # wait for socket to be ready
        s1.send(str(send_idx).encode('ascii'))
        print(f'SEND {send_idx} DATA')

What is the correct way to send messages reliably and is there way to make use of unix domain socket / tcp socket flow control?

codypiersall commented 5 years ago

Thanks for the issue report @chaoflow. Keep 'em coming!

I can't dig into this right now but I hope to in the next few days. At a cursory glance it looks like the select should work.

If you don't actually need polyamorous mode, you can use Pair0 which allows backpressure as expected.

Your understanding of send_buffer_size and receive_buffer_size is correct. There's a decent chance that increasing the send buffer sizes could result in fewer dropped messages, but there's also a chance that it would only serve to increase latency.

chaoflow commented 5 years ago

@codypiersall I can confirm that Pair0 blocks upon synchronous send and continues once the other end has read sufficient messages.

The setup I'm aiming to realize has multiple clients connecting to one server and each client having multiple channels supporting backpressure on the server side. It should work over unix domain sockets and tcp/ip with asynchronous calls. So far I tested with synchronous calls expecting synchronous and asynchronous to behave similarly (see below for async example). Apart from exceeding recv_max_size I'd not expect packages to be lost without raising errors, at least not over unix domain sockets or tcp/ip.

With asyncio and plain sockets the above is achieved by the following:

import asyncio
import struct
from itertools import count

async def server():
    conn_count = count()

    async def handle_client(reader, writer):
        conn_idx = next(conn_count)
        msg_idx = 0
        while True:
            writer.write(struct.pack('QQ', conn_idx, msg_idx))
            await writer.drain()
            msg_idx += 1

    await asyncio.start_unix_server(handle_client, '/tmp/asock')

async def client(client_idx):
    reader, writer = await asyncio.open_unix_connection('/tmp/asock')
    previous_msg_idx = -1
    while True:
        data = await reader.readexactly(16)
        conn_idx, msg_idx = struct.unpack('QQ', data)
        assert msg_idx - previous_msg_idx == 1, (msg_idx, previous_msg_idx)
        previous_msg_idx = msg_idx
        if client_idx == 0:
            print(f'CLIENT{client_idx} RECV {msg_idx}')
            await asyncio.sleep(1)
        elif msg_idx % 100000 == 0:
            print(f'CLIENT{client_idx} RECV {msg_idx}')

async def main():
    tasks = []
    tasks.append(server())
    tasks.append(client(0))
    tasks.append(client(1))
    await asyncio.gather(*tasks)

asyncio.run(main())

The same using pynng.Pair1 is quickly losing messages and is by magnitudes slower in this case, which is off-topic but maybe still worth mentioning.

import asyncio
import struct
import pynng
from itertools import count

async def server():
    conn_count = count()

    async def send_loop(pipe, conn_idx):
        conn_idx = next(conn_count)
        msg_idx = 0
        while True:
            await pipe.asend(struct.pack('QQ', conn_idx, msg_idx))
            msg_idx += 1

     with pynng.Pair1(polyamorous=True) as socket:
        socket.listen('ipc:///tmp/asock')
        msg0 = await socket.arecv_msg()
        msg1 = await socket.arecv_msg()
        await asyncio.gather(send_loop(msg0.pipe, 0), send_loop(msg1.pipe, 1))

async def client(client_idx):
    previous_msg_idx = -1
    with pynng.Pair1(polyamorous=True) as socket:
        socket.dial('ipc:///tmp/asock')
        socket.send(b'')
        while True:
            data = await socket.arecv()
            conn_idx, msg_idx = struct.unpack('QQ', data)
            assert msg_idx - previous_msg_idx == 1, (msg_idx, previous_msg_idx)
            previous_msg_idx = msg_idx
            if client_idx == 0:
                print(f'CLIENT{client_idx} RECV {msg_idx}')
                await asyncio.sleep(1)
            else:
                print(f'CLIENT{client_idx} RECV {msg_idx}')

async def main():
    tasks = []
    tasks.append(server())
    tasks.append(client(0))
    tasks.append(client(1))
    await asyncio.gather(*tasks)

asyncio.run(main())

Using select.select before sending in the server, indicates pipe.socket.send_fd to be always readable but never writable.

All tests done with Python 3.7.1 and pynng 0.4.0.

chaoflow commented 5 years ago

Side note: Sending 1MB messages in between the 16 byte ones, pynng is only 10% slower than plain unix domain sockets.

chaoflow commented 5 years ago

@codypiersall As you pointed out already, polyamorous mode is not reliable and pair in general is not reliable.

I think this leaves here the question about purpose and usage of select.select on pipe.socket.send_fd.

codypiersall commented 5 years ago

Using select.select before sending in the server, indicates pipe.socket.send_fd to be always readable but never writable.

pynng inherits this behavior from nng. I clarified this in pynng's docs, too, just now. (link to docs) I'll ask Garrett why they are marked readable when data is ready to be sent; I'm sure he had a reason.

Side note: Sending 1MB messages in between the 16 byte ones, pynng is only 10% slower than plain unix domain sockets.

That's great news! Thanks for sharing.

tavurth commented 2 years ago

I'm also seeing this behaviour with Bus0.

I thought TCP would mean that the bus is reliable, i.e. each message should be posted to the queue and wait to be fetched.

My issue is that when I post many messages in a few milliseconds (1440 or so) the bus does not register these messages.

Anyone got any ideas except for slowing down my send rate?

codypiersall commented 2 years ago

I think it should help to increase the receive buffer size: sock.recv_buffer_size = 128 # or whatever size is appropriate. This is the size of the buffer in messages, not bytes. I thing nng upstream has also increased the performance of the bus protocol, and pynng needs to update the library version soon.

You're right that TCP is reliable at the transport level, but nng adds logic on top of that to decide if it needs to drop entire messages, which happens a layer of abstraction above TCP.

tavurth commented 2 years ago

@codypiersall thank you very much for the info, I'll give that a try!

I was trying to reproduce with a smaller script but I was unable, perhaps it's something to do with processing times in one of my microservices or another thought was that it could be to do with multiple bus subscribers.

I'll post again here when I narrow it down 👍

tavurth commented 2 years ago

@codypiersall thank you that seems to have done the trick 🎉 Increasing to recv_buffer_size=1024 gives me a good throughput for my stack. I guess I will increase it further until it starts showing me issues.