uNetworking / uWebSockets.js

μWebSockets for Node.js back-ends :metal:
Apache License 2.0
7.84k stars 569 forks source link

Very high memory usage that will not decrease under high load (websockets) #1081

Closed theogbob closed 1 month ago

theogbob commented 1 month ago

I have a program that uses websockets and it seems like memory isnt being cleared, im unsure if its an issue with my code or the library itself but ive opened this issue since ive been unable to see why my code would be memory leaking, Under very heavy load conditions (any request increases it but its more noticeable under load) it reaches up to 16GB of ram and this usage does not fall until the application is forcefully stopped.

uNetworkingAB commented 1 month ago

Your code doesn't look like it handles backpressure at all.

theogbob commented 1 month ago

How could I implement this?

theogbob commented 1 month ago

If your refering to "maxBackpressure: 1024" it doesnt seem to effect anything and ram usage is still very high image

uNetworkingAB commented 1 month ago

Very hard for me to say anything about your app. I don't know how you get that much memory usage?

What I would do is to run whatever reproducer you have, for a very long time, tracking the memory usage over time. If you get logarithmically increasing memory usage, then it looks like typical glibc behavior.

I have seen similar behavior when testing millions of connections, glibc uses sbrk syscall and doesn't reliably return memory to the system but it doesn't continue to grow, it just leaves the heap big.

The same issue is not seen if linking against tcmalloc, since it uses mmap and reliably returns memory to the system.

But yeah - run the test for long and see if you still end up with about the same memory usage. In that case it's fine.

Note that we fuzz the library under LeakSanitizer (part of AddressSanitizer) with millions of runs every 24 hours as part of Google OSS-Fuzz, so a memory leak in the library is highly unlikely.

theogbob commented 1 month ago

The app is ment to be used to proxy tcp/udp connections over a websocket server, the protocol is described here It works fine with the normal 'ws' package. The issue is the way that the server will be used wont be slow gradual over time since it will be used on servers with possibly hundreds of users sending requests each or for VPN connections.

If you want to reproduce my test:

1) setup echo server git clone https://github.com/tokio-rs/tokio && cd tokio RUSTFLAGS='-C target-cpu=native' cargo r -r --example echo

2) Setup the server uWisp server (the one were talking about here) npm i git+https://github.com/Astatine-Development/uWisp-Server.git

import { uwsServer } from 'uwisp-server';

let wsPort = parseInt(process.env.PORT || "");
if (isNaN(wsPort)) wsPort = 9090;

uwsServer.listen(wsPort, (listenSocket) => {
  if (listenSocket) {
    console.log(`[Wisp]: Server is listening on port ${wsPort}`);
  }
});

3) setup basic wisp client for performance test:

git clone https://github.com/MercuryWorkshop/epoxy-tls cd epoxy-tls cd simple-wisp-client RUSTFLAGS='-C target-cpu=native' cargo r -r -- -w ws://localhost:9090 -t 127.0.0.1:8080 -p 30 -d 10s --wisp-v1

Other then that im not really sure how im supposed to fix this issue since it makes it virtually unusable under load and if i limit the ram usage of nodejs itself the performance just becomes horrible so its not even worth it to use.

uNetworkingAB commented 1 month ago

If you have a better experience with "ws" I recommend using "ws". ✅

I don't perform app debugging of third-parties having issues with the library on my spare time. ❌

Of course, there are numerous successful production uses far more stressful that what you mention, running for half a decade and then some, so by simple reasoning there definitely are ways to make it work.

uNetworkingAB commented 1 month ago

Btw; other than seeing high memory usage (which is its own PhD thesis alone), did you actually observe this supposed slowdown or are you just inferring, assuming? Like mentioned, glibc normally has logarithmically increasing memory usage (meaning; the memory usage flattens out over time and stays flatlined at high usage).

uNetworkingAB commented 1 month ago

Ooh, this actually gives me an idea. I could actually just ship tcmalloc as part of the binary. It doesn't matter that Node.js uses glibc, we can use tcmalloc. Hold on, I will make a release with tcmalloc

uNetworkingAB commented 1 month ago

You can also just try with

LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libtcmalloc_minimal.so.4 node lallala.js

where /usr/lib/x86_64-linux-gnu/libtcmalloc_minimal.so.4 is the location of sudo apt-get -y install libtcmalloc-minimal4 installation.

uNetworkingAB commented 1 month ago
                client.on("data", (data) => {
                    ws.send(dataPacketMaker(wispFrame, data), true);
                });

                You have no backpressure handling here.
uNetworkingAB commented 1 month ago

I can definitely trigger you issue

theogbob commented 1 month ago

Btw; other than seeing high memory usage (which is its own PhD thesis alone), did you actually observe this supposed slowdown or are you just inferring, assuming? Like mentioned, glibc normally has logarithmically increasing memory usage (meaning; the memory usage flattens out over time and stays flatlined at high usage).

Yes, the slow down is quite noticeable and the request per second in the benchmark drops significantly

theogbob commented 1 month ago

You can also just try with

LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libtcmalloc_minimal.so.4 node lallala.js

where /usr/lib/x86_64-linux-gnu/libtcmalloc_minimal.so.4 is the location of sudo apt-get -y install libtcmalloc-minimal4 installation.

Ill try this

theogbob commented 1 month ago

You can also just try with LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libtcmalloc_minimal.so.4 node lallala.js where /usr/lib/x86_64-linux-gnu/libtcmalloc_minimal.so.4 is the location of sudo apt-get -y install libtcmalloc-minimal4 installation.

Ill try this

image

uNetworkingAB commented 1 month ago

Yeah I tried it also, made no difference. I am looking at this. You only have a single websocket connection and it closes in the end, so question is what kind of stuff is retained

theogbob commented 1 month ago
                client.on("data", (data) => {
                    ws.send(dataPacketMaker(wispFrame, data), true);
                });

                You have no backpressure handling here.

Help me understand how to properly add back pressure since i tried following the docs and examples but didnt seem to work

uNetworkingAB commented 1 month ago

The leak is the

const client = new net.Socket();

even though they all are properly destroyed() in the end. It's not an issue with backpressure in uWS, removing ws.send does nothing.

Question is why these net.Socket are never freed

uNetworkingAB commented 1 month ago

Even if uWS would leak the ws object, the fact you destroy() and clear the map in the end should make that irrelevant. Leaking an empty JS object with only 1 single ws connection aka 1 single such object, cannot be 16gb of data. The net.Socket are somehow not freed

theogbob commented 1 month ago

Even if uWS would leak the ws object, the fact you destroy() and clear the map in the end should make that irrelevant. Leaking an empty JS object with only 1 single ws connection aka 1 single such object, cannot be 16gb of data. The net.Socket are somehow not freed

hm, is there a way to force free them?

uNetworkingAB commented 1 month ago

3gb of data flow in via ws "message", at the end of run

{ rss: 3455897600, heapTotal: 64700416, heapUsed: 24171528, external: 3099758720, arrayBuffers: 3091087715 }

you have 3gb allocated in Buffer

theogbob commented 1 month ago

Why is it doing that lol

uNetworkingAB commented 1 month ago

If you look from the perspective of uWS:

So, it looks like something is keeping your data alive in some buffer. Because my separate test pretty much replicates your use, yet have 0 issues with memory, it looks like something in your app.

I'm echoing 30kb of data over 1 websocket, sending it back to the client, so I stress both receiving data and sending data. I see no memory increase in my separate test. Pretty sure some of your Buffer.concat does something odd, I don't understand what your app does fully

theogbob commented 1 month ago

The app is based of this protocol. 1) is the 3gb data recived from the stress testing script/setup correct? 2) Im confused on that last point you made about the seperate stress test

Explain more about your seperate test and what you changed or are doing to test it

uNetworkingAB commented 1 month ago

This is the test: it's just echoing 30kb, I also made it use Buffer.from to see if that was an issue.

message: (ws, message, isBinary) => { / Ok is false if backpressure was built up, wait for drain / let ok = ws.send(Buffer.from(message), isBinary); },

What I would do, is to change all ws.connections to instead be ws.id (integer) and then use a separate Map that doesn't touch the ws object but only uses the ws.id as key in that separate Map. This way, you can make sure it's not the ws object that leaks. Because 1 integer should not be a problem.

Just as a debugging step

uNetworkingAB commented 1 month ago

I did a find and replace from "ws.connections" to "ws_connections" and made a let ws_connections in the top. This way, all the data is stored on an object that does not in any way touch the uWS websocket. Therefore, any leaks from it cannot be attributed to uWS. I still see the same accumulation of memory.

This is valid since you only ever make 1 websocket connection

uNetworkingAB commented 1 month ago

So now we know that:

  1. There is no backpressure issues in uWS (ws.getBufferedAmount() is always 0)
  2. You never attach any user data to the WebSocket object (ws_connections)
  3. An echo test that sends and received more than the data you do, does not produce any leaks
  4. Changing allocator from glibc to tcmalloc does nothing

This means, even if I cannot find anything in your app, that the app probably is broken in some way and not uWS

theogbob commented 1 month ago

Okay, thank you so much for your help ill keep this open to keep you updated if i find the reason or something comes up.

uNetworkingAB commented 1 month ago

Are you aware that the buffer you get from uWS needs to be copied if you plan to use it outside of the handler?

theogbob commented 1 month ago

?

uNetworkingAB commented 1 month ago

.message gives you an ArrayBuffer. You can't keep it longer than the handler. So you can try to copy it to make sure

theogbob commented 1 month ago

What would i be copying it for?

uNetworkingAB commented 1 month ago

I have fixed your app. Now it uses 1% RAM.

Your issue is that you implicitly capture the whole wispFrame in lambdas only to read its streamID. I made a variable called wispFrame_streamID and captured that one instead. Now it stays at 1% RAM.

I found this by using allocation instrumentation - all of this completely new to me.

Consider this a random free service, just because I wanted to make sure & this actually taught me something 👍

uNetworkingAB commented 1 month ago
import uWS from 'uWebSockets.js';
import net from 'node:net';
import dgram from 'node:dgram';
import dns from 'node:dns/promises';

const CONNECT_TYPE = {
    CONNECT: 0x01,
    DATA: 0x02,
    CONTINUE: 0x03,
    CLOSE: 0x04
};

const STREAM_TYPE = {
    TCP: 0x01,
    UDP: 0x02
};

function wispFrameParser(data) {
    const uint8arrayView = new Uint8Array(data);
    const dataView = new DataView(uint8arrayView.buffer);
    const type = dataView.getUint8(0);
    let streamID = dataView.getUint32(1, true);
    let payload = uint8arrayView.slice(5, uint8arrayView.byteLength);
    return {type, streamID, payload};
}

function connectPacketParser(payload) {
    const dataview = new DataView(payload.buffer);
    const streamType = dataview.getUint8(0);
    const port = dataview.getUint16(1, true);
    const hostname = new TextDecoder("utf8").decode(dataview.buffer.slice(3, dataview.buffer.byteLength));
    return {streamType, port, hostname};
}

function continuePacketMaker(streamID, queue) {
    const initialPacket = new DataView(new Uint8Array(9).buffer);
    initialPacket.setUint8(0, CONNECT_TYPE.CONTINUE);
    initialPacket.setUint32(1, streamID, true);
    initialPacket.setUint32(5, queue, true);
    return initialPacket.buffer;
}

function closePacketMaker(streamID, reason) {
    const closePacket = new DataView(new Uint8Array(9).buffer);
    closePacket.setUint8(0, CONNECT_TYPE.CLOSE);
    closePacket.setUint32(1, streamID, true);
    closePacket.setUint8(5, reason);
    return closePacket.buffer;
}

function dataPacketMaker(streamID, data) {
    const dataPacketHeader = new DataView(new Uint8Array(5).buffer);
    dataPacketHeader.setUint8(0, CONNECT_TYPE.DATA);
    dataPacketHeader.setUint32(1, streamID, true);
    return Buffer.concat([
        Buffer.from(dataPacketHeader.buffer),
        data
    ]);
}

const uwsServer = uWS.App();

uwsServer.ws('/*', {
    compression: 0,
    maxPayloadLength: 16 * 1024 * 1024,
    idleTimeout: 32,
    open: (ws) => {
        ws.connections = new Map();
        ws.send(continuePacketMaker({
            streamID: 0
        }, 127), true, true);
    },
    message: (ws, message, isBinary) => {
        if (!isBinary) {
            return;
        }
        if (ws.closed) {
            return;
        }

        const wispFrame = wispFrameParser(Buffer.from(message));

        try {
            if (wispFrame.type === CONNECT_TYPE.CONNECT) {
                const connectFrame = connectPacketParser(wispFrame.payload);

                if (connectFrame.streamType === STREAM_TYPE.TCP) {
                    const client = new net.Socket();
                    client.connect(connectFrame.port, connectFrame.hostname);

                    ws.connections.set(wispFrame.streamID, {client, buffer: 127});

                    let wispFrame_streamID = wispFrame.streamID;

                    client.on("connect", () => {
                        ws.send(continuePacketMaker({
                            streamID: wispFrame_streamID
                        }, 127), true);
                    });

                    client.on("data", (data) => {
                        ws.send(dataPacketMaker(wispFrame_streamID, data), true);
                    });

                    client.on("error", (error) => {
                        if (ws.readyState === ws.OPEN) {
                            try {
                                ws.send(closePacketMaker(wispFrame_streamID, 0x03), true);
                                ws.connections.delete(wispFrame_streamID);
                            } catch (error) {
                                // Handle error
                            }
                        }
                    });

                    client.on("close", () => {
                        if (ws.readyState === ws.OPEN) {
                            try {
                                ws.send(closePacketMaker(wispFrame_streamID, 0x02), true);
                                ws.connections.delete(wispFrame_streamID);
                            } catch (error) {
                                // Handle error
                            }
                        }
                    });

                } else if (connectFrame.streamType === STREAM_TYPE.UDP) {
                    (async () => {
                        let iplevel = net.isIP(connectFrame.hostname);
                        let host = connectFrame.hostname;

                        if (iplevel === 0) {
                            try {
                                host = (await dns.resolve(connectFrame.hostname))[0];
                                iplevel = net.isIP(host);
                            } catch (e) {
                                return;
                            }
                        }

                        if (iplevel != 4 && iplevel != 6) {
                            return;
                        }

                        const client = dgram.createSocket(iplevel === 6 ? "udp6" : "udp4");
                        client.connect(connectFrame.port, host);

                        ws.connections.set(wispFrame.streamID, {client, buffer: 127});

                        client.on("connect", () => {
                            ws.send(continuePacketMaker({
                                streamID: wispFrame.streamID
                            }, 127), true);
                        });

                        client.on("message", (data, rinfo) => {
                            ws.send(dataPacketMaker(wispFrame, data), true);
                        });

                        client.on("error", (error) => {
                            if (ws.readyState === ws.OPEN) {
                                try {
                                    ws.send(closePacketMaker(wispFrame, 0x3), true);
                                    ws.connections.delete(wispFrame.streamID);
                                } catch (error) {
                                    // Handle error
                                }
                            }
                            client.close();
                        });

                        client.on("close", () => {
                            if (ws.readyState === ws.OPEN) {
                                try {
                                    ws.send(closePacketMaker(wispFrame, 0x02), true);
                                    ws.connections.delete(wispFrame.streamID);
                                } catch (error) {
                                    // Handle error
                                }
                            }
                        });
                    })();
                }
            }

            if (wispFrame.type === CONNECT_TYPE.DATA) {
                const stream = ws.connections.get(wispFrame.streamID);
                if (stream && stream.client) {
                    if (stream.client instanceof net.Socket) {
                        stream.client.write(wispFrame.payload);
                    } else if (stream.client instanceof dgram.Socket) {
                        stream.client.send(wispFrame.payload);
                    }
                    stream.buffer--;
                    if (stream.buffer === 0) {
                        stream.buffer = 127;
                        ws.send(continuePacketMaker(wispFrame, stream.buffer), true);
                    }
                } else {
                    // Handle missing stream
                }
            }

            if (wispFrame.type === CONNECT_TYPE.CLOSE) {
                const stream = ws.connections.get(wispFrame.streamID);
                if (stream && stream.client) {
                    if (stream.client instanceof net.Socket) {
                        stream.client.destroy();
                    } else if (stream.client instanceof dgram.Socket) {
                        stream.client.close();
                    }
                }
                ws.connections.delete(wispFrame.streamID);
            }
        } catch (e) {
            ws.end();
            for (const {client} of ws.connections.values()) {
                if (client instanceof net.Socket) {
                    client.destroy();
                } else if (client instanceof dgram.Socket) {
                    client.close();
                }
            }
            ws.connections.clear();
        }
    },
    close: (ws, code, message) => {
        if (ws.closed) {
            return;
        }
        for (const {client} of ws.connections.values()) {
            if (client instanceof net.Socket) {
                client.destroy();
            } else if (client instanceof dgram.Socket) {
                client.close();
            }
        }
        ws.connections.clear();
    }
});

export { uwsServer };
theogbob commented 1 month ago

This does seem to work! The only downside is the huge performance drop compared to the previous version, Currently it did 1270 packets in 10 seconds image While the previous version did 527k image Ill take a look and see what might be causing that but thank you!

theogbob commented 1 month ago

This might be an issue with the test itself (atleast from few minutes of checking stuff)

e3dio commented 1 month ago

This WISP protocol seems badly designed to me. Since this is a TCP/UDP proxy you need to be extremely efficient transferring data. Currently you are making unnecessary copy in JS of all data that comes thru the server: index.mjs#L55 and index.mjs#L23. If the protocol allowed sending the Header first then Payload next websocket message, you would not need to create a new buffer and copy data into 1 Header/Payload message. At a minimum you should change this .slice() index.mjs#L23 which makes a copy and use subarray() which creates a view. Plus all the other more minor issues w the code