uNetworking / uWebSockets.js

μWebSockets for Node.js back-ends :metal:
Apache License 2.0
7.95k stars 570 forks source link

Pushing data from backpressure -> server semi-crashed #979

Closed morpig closed 11 months ago

morpig commented 11 months ago

Hi,

I have a setup where if ws.send returns 0, all outgoing data will be queued in an array until the drain event have been called.

During the drain event, it will flush and push all queued data to the ws connection.

In my case, during the flush event, the server would crash. The node process itself is still running, but existing conns disconnected, and new conns stopped connecting (econnrefused.)

Node v20.5.1, uws.js v20.33.0 prof.txt isolate-0x5d78ff0-796595-v8.log

23/10/2023 08:37:54.406: LXLFUD backpressure3 draining status=1, size=371, bufferedAmount=0
23/10/2023 08:37:54.406: LXLFUD backpressure3 draining status=1, size=370, bufferedAmount=0
23/10/2023 08:37:54.406: LXLFUD backpressure3 draining status=1, size=369, bufferedAmount=0
23/10/2023 08:37:54.406: LXLFUD backpressure3 draining status=1, size=368, bufferedAmount=0
23/10/2023 08:37:54.406: LXLFUD backpressure3 draining status=1, size=367, bufferedAmount=0
23/10/2023 08:37:54.406: LXLFUD backpressure3 draining status=1, size=366, bufferedAmount=0
23/10/2023 08:37:54.406: LXLFUD backpressure3 draining status=1, size=365, bufferedAmount=0
23/10/2023 08:37:54.406: LXLFUD backpressure3 draining status=1, size=364, bufferedAmount=0
23/10/2023 08:37:54.406: LXLFUD backpressure3 draining status=0, size=364, bufferedAmount=7961
server crashed ~ but node is running

send:

if (!ws.isBackpressured) {
    const result = ws.send(data, true, false);

    if (result === 0) {
        console.log(`${getCurrentDateTime()}: ${ws.id} ws backpressured! queueing data bufferedAmount=${ws.getBufferedAmount()}`);
        ws.isBackpressured = true;
    }
} else {
    if (!backPressure[ws.id]) {
        backPressure[ws.id] = []
    }
    backPressure[ws.id].push(data);
}

drain:

drain: (ws) => {
    console.log(`${getCurrentDateTime()}: ${ws.id} drain triggered, sending pending data`);
    if (ws.isBackpressured && backPressure[ws.id]) {

        while (backPressure[ws.id].length > 0) {
            if (ws.getBufferedAmount() < 1024) {
                const b = backPressure[ws.id][0];
                const result = ws.send(b, true, false);
                if (result == 1) {
                    backPressure[ws.id].shift();
                }
                console.log(`${getCurrentDateTime()}: ${ws.id} backpressure3 draining status=${result}, size=${backPressure[ws.id].length}, bufferedAmount=${ws.getBufferedAmount()}`);
            }
        }

        if (backPressure[ws.id].length === 0) {
            ws.isBackpressured = false;
            delete backPressure[ws.id];
            console.log(`${getCurrentDateTime()}: ${ws.id} backpressure2 drain done`);
        }
    } 
},
morpig commented 11 months ago

Reworked the logic to increase maxBackpressure instead, and if ws.send returns 2, wait for drain event before sending again.