python-websockets / websockets

Library for building WebSocket servers and clients in Python
https://websockets.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
5.23k stars 519 forks source link

Connection breaks in between during a sustained transfer #1475

Closed gridhead closed 4 months ago

gridhead commented 5 months ago

I am creating a file transfer service, the code of which can be found here. While transferring file contents from a sender client to a receiver client, the connection breaks if the transfer is sustained for a long period of time. Please note that I am fitting a maximum of 640 bytes in a message using chunking of file contents. Instead of the making one big transmission, I have doing multiple smaller ones.

Traceback (most recent call last):
  File "/home/archdesk/Projects/expedite/venv/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1301, in close_connection
    await self.transfer_data_task
  File "/home/archdesk/Projects/expedite/venv/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 963, in transfer_data
    message = await self.read_message()
              ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/archdesk/Projects/expedite/venv/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1033, in read_message
    frame = await self.read_data_frame(max_size=self.max_size)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/archdesk/Projects/expedite/venv/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1108, in read_data_frame
    frame = await self.read_frame(max_size)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/archdesk/Projects/expedite/venv/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 1165, in read_frame
    frame = await Frame.read(
            ^^^^^^^^^^^^^^^^^
  File "/home/archdesk/Projects/expedite/venv/lib/python3.12/site-packages/websockets/legacy/framing.py", line 68, in read
    data = await reader(2)
           ^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/streams.py", line 752, in readexactly
    await self._wait_for_data('readexactly')
  File "/usr/lib/python3.12/asyncio/streams.py", line 545, in _wait_for_data
    await self._waiter
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/archdesk/Projects/expedite/venv/lib/python3.12/site-packages/websockets/legacy/server.py", line 236, in handler
    await self.ws_handler(self)
  File "/home/archdesk/Projects/expedite/expedite/server/room.py", line 40, in oper
    async for mesgcont in sockobjc:
  File "/home/archdesk/Projects/expedite/venv/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 498, in __aiter__
    yield await self.recv()
          ^^^^^^^^^^^^^^^^^
  File "/home/archdesk/Projects/expedite/venv/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 568, in recv
    await self.ensure_open()
  File "/home/archdesk/Projects/expedite/venv/lib/python3.12/site-packages/websockets/legacy/protocol.py", line 948, in ensure_open
    raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedError: sent 1011 (internal error) keepalive ping timeout; no close frame received
[2024-06-29 13:22:06 +0530] connection closed

Any help is appreciated.

Here is a screenshot of the logs that are produced per send and receive activity performed at all ends.

image

After getting rid of the logs, this is what it looks like. The receiving end seems to get stuck but maybe, it is the absence of message from the sending end due to either a problem at the sending end or at the broker end that leads to it happening.

image

gridhead commented 5 months ago

Speaking for the sheer unpredictability of the error here, I just happened to set the log level from INFO to DEBUG and that somehow made the transfer work just fine. Of course, I would not treat that as a fix but rather as a workaround but still this progresses us ahead by a bit.

image

In the above picture, the top left part is the sending client, the top right part is the receiving client, the bottom left part is the broker server and the bottom right part is a watch command to check just how far is the transmission progressing, according to codebase changes made.

aaugustin commented 5 months ago

If you wrote the loop that sends the file in 640 bytes chunks in the obvious way, there's a good chance that you're running into the problem described here: https://websockets.readthedocs.io/en/stable/faq/asyncio.html#why-does-my-program-never-receive-any-messages

gridhead commented 4 months ago

@aaugustin I do have a couple of functions (or coroutines? My asyncio game is admittedly weak) that perform sustained sock.send() and sock.recv(). Take a look at the following snippets, for instance.

The receiving end gets put on a sustained receiving mode until all the chunks are not received.

async def collect_contents(sock: WebSocketClientProtocol, pack: bytes = b"") -> bool:
    fuse_file(pack)
    with logging_redirect_tqdm():
        with tqdm(total=standard.client_filesize, unit="B", unit_scale=True, unit_divisor=1024, leave=False, initial=len(pack)) as prog:
            for indx in range(standard.client_chks - 1):
                mesgcont = await sock.recv()
                if isinstance(mesgcont, bytes):
                    fuse_file(mesgcont)
                    prog.update(len(mesgcont))
    # general(f"Collecting file contents from {standard.client_endo} - (SHA256 {sha256(pack).hexdigest()})")
    return True

The sending end gets put on a sustained sending mode until all the chunks are not sent.

async def deliver_contents(sock: WebSocketClientProtocol) -> bool:
    with logging_redirect_tqdm():
        with tqdm(total=standard.client_filesize, unit="B", unit_scale=True, unit_divisor=1024, leave=False, initial=0) as prog:
            for indx in range(0, len(standard.client_bind) - 1):
                bite = read_file(standard.client_bind[indx], standard.client_bind[indx + 1])
                # general(f"Delivering file contents to {standard.client_endo} (SHA256 {sha256(bite).hexdigest()}).")
                prog.update(standard.client_bind[indx + 1] - standard.client_bind[indx])
                await sock.send(bite)
        return True

So, a couple of questions.

  1. Is having the await asyncio.sleep(0) on the sending end enough, or should I also consider having the same at the receiving end?
  2. Is this inclusion likely to cause performance penalties and if the answer is yes, by how much should I expect the degradation?
aaugustin commented 4 months ago
  1. In practice, I've only encountered this problem on the send side. (Prompted by your comment, I've been thinking briefly about whether it could happen on the recv side; it turns out that I don't know; it would require uncommon circumstances. No one complained about that kind of issue in a recv loop in the 10+ years this library has existed so I'm not too worried.)

  2. The performance penalty shouldn't be measurable in your scenario. Running an iteration of the event loop is extremely fast. I remember benchmarking that extensively; the answer was always "not something you need to think about". Sending data requires a send(2) syscall which will be much slower than just going to the event loop and back.

    itIf you're concerned about performance, I would recommend a larger chunk size than 640 bytes — that's extremely small by modern standards.

gridhead commented 4 months ago

For now, I would keep the asyncio.sleep(0) part on both ends and rigorously test to see under what circumstances I face the same issue. That is, of course, if the problem occurs again. Following your advice, I have increased the chunk size to 1536 bytes but I would be interested to know your thoughts on what an optimal chunk size should be for one transfer occurrence and how big of a chunk size can I send before I start seeing errors?

aaugustin commented 4 months ago

I would be interested to know your thoughts on what an optimal chunk size should be for one transfer occurrence and how big of a chunk size can I send before I start seeing errors?

On a decent modern network probably it doesn't matter. I'd go for 16kB to 64kB — probably anything between 4kB and 256kB would be indistinguishable — and let the TCP/IP layer do its job. It's better at optimizing network throughput than us :-)

aaugustin commented 4 months ago

If you expect usage on poor networks, mobile, etc. AND if you have a good resume strategy, then you could go for a smaller size to increase the chances that a chunk go through before the connection breaks; and then you can resume.

gridhead commented 4 months ago

While I plan to include resumability to address weaker connections, I am sticking to the vanilla "If it fails, just try again from the start" kind of transfer. For a file that is approximately 993 MiB in size and a server that is run on the same host as where the data is sent from and received, these are the numbers that I get to see.

Chunk size Duration Improvement
1 286.17 0
2 154.3 -46.0810008
4 96.81 -37.25858717
8 68.12 -29.63536825
16 54.87 -19.45096888
32 48.52 -11.57280846
64 44.18 -8.944765045
128 43.76 -0.9506564056
256 42.5 -2.879341865
512 42.36 -0.3294117647
768 42.28 -0.1888574127
1024 FAIL FAIL

Until I can devise a resuming strategy (with a checkpoint write to a temporary directory every after a certain count of chunks are received), I think I would like to keep the range of chunk sizes from 64KiB to 768KiB. I am considering keeping 256KiB as the default chunk size for those users who do not want to tweak things.

aaugustin commented 4 months ago

Sounds sensible.