miguelgrinberg / Flask-SocketIO

Socket.IO integration for Flask applications.
MIT License
5.36k stars 890 forks source link

Sending high volume of packets in short amount of time causes packets dropping #2060

Open Fjf opened 5 months ago

Fjf commented 5 months ago

I'm typically sending small messages, but in some instances a large amount of data has to be sent over the socket. For this, to get around the maximum packet size, I chunked up the input bytes into chunks from the sender side, and stitch then up again on the receiver side. For small amount of chunks (~10-ish), this works fine and without any problems. For a large amount of chunks (>40), the receiver starts losing packets.

Below the client-side code, create_chunks creates json objects which contain the data, the total amount of chunks in this initial packet, and the chunk_id, so I know which packets I have received.

    def put(self, packet: Packet):
        data = packet.to_pickle()
        import time
        for chunk in create_chunks(data):
            print("Emitting packet with id", chunk["chunk_id"], len(pickle.dumps(chunk)))
            self.client.emit("io", pickle.dumps(chunk))

Server side code waits for data and stitches together.

    @self.sio.on("io")
    def io(data):
        self._intermediate_data_buffer.append(pickle.loads(data))
        logging.info(f'Receiving packet with id {self._intermediate_data_buffer[-1]["chunk_id"]}')
        n_chunks = self._intermediate_data_buffer[0].get("length")
        if n_chunks == len(self._intermediate_data_buffer):
            recreated_data = b''.join(
                x.get("data") for x in sorted(self._intermediate_data_buffer, key=lambda d: d.get("chunk_id"))
            )
            pkt = Packet.from_pickle(recreated_data)
            self._intermediate_data_buffer.clear()

            if self._on_packet_callback is not None:
                self._on_packet_callback(pkt)
            else:
                self.queue.put(pkt)

Logs from client side: Emitting packet with id 0 524351 Emitting packet with id 1 524351 ... Emitting packet with id 265 524352 Emitting packet with id 266 219229

Logs on server side: 2024-05-01 19:09:43,249 [INFO] Receiving packet with id 0 2024-05-01 19:09:43,249 [INFO] Receiving packet with id 1 ... 2024-05-01 19:09:43,260 [INFO] Receiving packet with id 38 2024-05-01 19:09:43,260 [INFO] Receiving packet with id 39

I can 'get around' this problem by adding a time.sleep(0.2) inbetween packet sending on the client side. Ideally I do not delay the sending of packets unnecessarily. Though this does indicate the problem has to do with some kind of internal buffer filling up.

The only thing I found on the flask-socketio documentation regarding message size regards a single message's size, and not the total aggregated size of all messages in the buffer. max_http_buffer_size – The maximum size of a message when using the polling transport. The default is 1,000,000 bytes.

miguelgrinberg commented 5 months ago

First of all, you can change the maximum packet size. You've mentioned this yourself, you can use max_http_buffer_size.

What happens in the client application after the loop emitting all the chunks ends?

Fjf commented 5 months ago

After the client is done sending all chunks it emits one more message, then does a disconnect, after which the process ends. As for increasing the packet size, I don't really want to have a max packet size in the GB range, though if this is the only way I will try that.

miguelgrinberg commented 5 months ago

Well, that is likely the problem. If you are sending a lot of data it may take a while for the background tasks for flush everything out. If you disconnect the WebSocket then some data may still be waiting to go out. Three suggestions:

  1. Have the server disconnect the client after it has received all the packets. The client will just wait until it gets disconnected. There is a sio.wait() for this.
  2. Have the server acknowledge that it has received all the packets by sending a final message to the client. The client can disconnect when it receives this event.
  3. Use the callback feature to receive acknowledgements for all sent packets. The client can disconnect after it has received callbacks for all the packets it sent out (or maybe just the last one, since packets are sent in order).
Fjf commented 5 months ago

The callback method works for me. If you think the disconnect should not block until queued messages are flushed, this would not be a bug and the issue can be closed.

Thanks!

Update: I've tried two methods:

The first method seems to disconnect the client because the 'ping' messages are not coming through anymore due to the large backed-up queue. I base this on the following CRITICAL log message: [2024-05-02 09:48:47 +0200] [31955] [CRITICAL] WORKER TIMEOUT (pid:31957) Second method works, though throughput is quite low due to the required back-and-forth per message.

As sending a large amount of data is not done often, it is fine for this application, but ideally it would not timeout the alive pings due to the message volume.

miguelgrinberg commented 5 months ago

If you think the disconnect should not block until queued messages are flushed

There's two ways to send events, emit() and call(). The former schedules the event and returns immediately, the latter only returns when the message was sent and acknowledged by the other side (which is implemented using callbacks as well). So this is working as designed.

I forgot to mention using call() above, that would be a fourth option you have, but of course if you do call() for all your events it may take you a bit longer to send everything out because there will be no buffering, each event will be sent and call() will block until the other side acknowledges the event.