rialto-php / rialto

Manage Node resources with PHP
MIT License
170 stars 80 forks source link

Sockets fail with very large loads #36

Open mb0000 opened 3 years ago

mb0000 commented 3 years ago

This is "similar" to issue [https://github.com/rialto-php/rialto/issues/1] - I assume the fix to that issue was to make the socket headers longer (4 digit), but that merely put the problem further away by making problematic payloads of more than 10MB. As the payloads are base64 encoded, that means the actual problematic payload limit is smaller (say 6MB). Anyway, the problem I have been experiencing is in ProcessSupervisor.php / Connection.js. A payload is prepared for sending by base64 encoding, then chunking it into chunks prepended with <number of chunks remaining>: The <number of chunks remaining> is zero filled to one less than the header size (currently 5 bytes, so filled to 4 bytes, the fifth being the : separator character). This fails when the number of chunks exceeds 9999, where 5 or more digits are required to represent the number of remaining chunks, leading to the chunk being truncated by the number of "excess" digits over 4. I have implemented the following workaround for this that maintains compatibility with existing "expects 4 digits" code (NOTE this workaround is only in the JS to PHP path, not the other way around - a more complete solution should address that too):

ProcessSupervisor.php / readNextProcessValue - do-while loop becomes:

            do {
                $this->client->selectRead($readTimeout);
                $packet = $this->client->read(static::SOCKET_PACKET_SIZE);

                // Determine the payload headersize based on either the default, or the position of : in the packet
                $headerSize = max( static::SOCKET_HEADER_SIZE, strpos( $packet, ':') + 1 );
                $chunksLeft = (int) substr($packet, 0, $headerSize); // ---- Use the determined header size
                $chunk = substr($packet, $headerSize); // ---- Use the determined header size

                $payload .= $chunk;

                if ($chunksLeft > 0) {
                    // The next chunk might be an empty string if don't wait a short period on slow environments.
                    usleep(self::SOCKET_NEXT_CHUNK_DELAY * 1000);
                }
            } while ($chunksLeft > 0);

Connection.js / writeToSocket becomes:

        const payload = Buffer.from(str).toString('base64');
        const headerSize = Math.max( Connection.SOCKET_HEADER_SIZE,
            Math.ceil( Math.log10( payload.length / (Connection.SOCKET_PACKET_SIZE - Connection.SOCKET_HEADER_SIZE) ) ) + 1 );
        const bodySize = Connection.SOCKET_PACKET_SIZE - headerSize,    // ----- Use determined header size
            chunkCount = Math.ceil(payload.length / bodySize);

        for (let i = 0 ; i < chunkCount ; i++) {
            const chunk = payload.substr(i * bodySize, bodySize);

            let chunksLeft = String(chunkCount - 1 - i);
            chunksLeft = chunksLeft.padStart(headerSize - 1, '0');    // ----- Use determined header size

            this.socket.write(`${chunksLeft}:${chunk}`);
        }

I hope this helps someone. I must confess that the Sockets handling in Rialto needs a little rework (for example the PHP manual advises against using timeouts in sockets handling and I see that the readSelect value is not actually checked, but a read takes place anyway. However, I'm no expert.