coopernurse / node-pool

Generic resource pooling for node.js
2.37k stars 258 forks source link

Cuncurrent problem with net socket data event #276

Closed RifeWang closed 4 years ago

RifeWang commented 4 years ago

I have a simple tcp server , it sends back data to client:

const net = require('net');

const server = net.createServer();
server.on('connection', (client) => {
    client.on('data', (chunk) => {
        client.write(chunk);
    });
    client.on('end', () => {
        console.log('socket end');
    });
    client.on('error', () => {
        console.log('socket error');
    })
});
server.listen(9000);

And I have a client , create a pool with net tcp socket :

const net = require('net');
const genericPool = require("generic-pool");

function createPool(remote_server) {
    const factory = {
        create: function() {
            return new Promise((resolve, reject) => {
                const host = remote_server.split(':')[0];
                const port = remote_server.split(':')[1];
                const socket = new net.Socket();
                socket.connect({
                    host: host,
                    port: port,
                });
                socket.setKeepAlive(true);
                socket.on('connect', () => {
                    console.log(`socket connected: ${socket.remoteAddress}:${socket.remotePort} , local: ${socket.localAddress}:${socket.localPort}` );
                    resolve(socket);
                });
                socket.on('error', error => {
                    console.log(`socket error: ${socket.remoteAddress}:${socket.remotePort} , ${error}`);
                    reject(error);
                });
                socket.on('close', hadError => {
                    console.log(`socker closed: ${socket.remoteAddress}:${socket.remotePort} , tanserror: ${hadError}`);
                });
            });
        },
        destroy: function(socket) {
            return new Promise((resolve) => {
                socket.destroy();
                resolve();
            });
        },
        validate: function (socket) {
            return new Promise((resolve) => {
                if (socket.destroyed || !socket.readable || !socket.writable) {
                    return resolve(false);
                } else {
                    return resolve(true);
                }
            });
        }
    };
    return genericPool.createPool(factory, {
        max: 5,
        min: 1,
        testOnBorrow: true
    });
}

const pool = createPool('127.0.0.1:9000');

// request
function request(data) {
    return new Promise(async (resolve, reject) => {
        const s = await pool.acquire();

        console.log('data listenerCount:', s.listenerCount('data')); // too many, why ?

        const bufs = [];

        // -------------------------------------------------------
        // will add many data listener , the pool does not work ?
        // and the received buf is mixed
        // -------------------------------------------------------
        s.on('data', async buf => {
            bufs.push(buf);

            // \r\n
            const END_BUF = Buffer.from('\r\n');
            if (END_BUF.equals(buf.slice(-2))) {

                try {
                    await pool.release(s);
                } catch {}

                const result = Buffer.concat(bufs).toString();
                return resolve(result);
            }
        });

        s.write(data);
    });
}

const testdata = 'fshfjksdfhksjfdsfdfdsfd\r\n';

(async () => {
    try {
        const arr = [];
        for (let i = 0; i < 100; i++) {
            arr.push(request(testdata));
        }
        await Promise.all(arr);
    } catch (error) {
        console.log('err:', error);
    }
})()

I used promise.all to test cuncurrency , but got some problem with the data event of socket . Why so many data event listener will be added and the data in callback is mixed ? It's a bug with this lib ? how can I resolve that problem .

Thanks.