socketio / socket.io-cluster-adapter

The Socket.IO official cluster adapter, allowing to broadcast events between several Socket.IO servers.
https://socket.io
MIT License
15 stars 9 forks source link

Error trying to call function .fetchSockets(): timeout reached: only 4 responses received out of 5 #11

Open Henrriky opened 7 months ago

Henrriky commented 7 months ago

I am facing an error when fetching sockets with the .fetchSockets function. I'm using cluster-adapter + sticky + pm2 to manage workers.

Code snippet that calls the fetchSockets() function:

const sockets= await io
          .in(`${plataform}-${client}`)
          .fetchSockets();

//In other line          
const sockets = (
          await io
            .in(`${plataform}-${client}-${userId}`)
            .fetchSockets()
        )[0];

You have triggered an unhandledRejection, you may have forgotten to catch a Promise rejection: Error: timeout reached: only 4 responses received out of 5 at Timeout._onTimeout (/opt/server/node_modules/@socket.io/cluster-adapter/dist/index.js:358:28) at listOnTimeout (node:internal/timers:573:17) at process.processTimers (node:internal/timers:514:7)

I've already tried everything. Furthermore, I already put a try catch around this promise

darrachequesne commented 7 months ago

This might happen if one worker gets killed. In that case, you can simply retry:

const MAX_CALLS = 3

async function fetchSockets() {
  for (let i = 0; i < MAX_CALLS; i++) {
    try {
      return await io.fetchSockets();
    } catch (e) {
      // let's retry
    }
  }
  throw "too many errors";
}
Henrriky commented 7 months ago

Just for context, I'm using the pm2 fork that manages the cluster. I am facing problems in production because of this error, it is a telephony application that manages several clients simultaneously. Could implementing this help with the error? What if the worker dies and triggers the error after the maximum number of attempts?

I don't want to have to quit Socket.io simply because of this error. Also, before this error I was facing connection timeout problem and 100% CPU, I implemented the cluster and it worked, however, now this error haunts me

Edit: It seems that when I use the nodejs native cluster module without using pm2 socket.io it works, however, in scenarios with many simultaneous connections the application starts to trigger "timeout" errors to the client

darrachequesne commented 7 months ago

Could implementing this help with the error?

Yes, it should handle the case when a worker suddenly dies.

Also, before this error I was facing connection timeout problem and 100% CPU however, in scenarios with many simultaneous connections the application starts to trigger "timeout" errors to the client

How many simultaneous connections?

See also: https://socket.io/docs/v4/performance-tuning/#at-the-os-level

Henrriky commented 7 months ago

I implemented these things you gave me in this link, however, not much changed. According to the surveys I carried out, I had more than 2000 connections on socket.io, with several rooms and events, because application is multitenant.

Henrriky commented 7 months ago

Can you help me?

I went back and made a simple script to test Socket.io with load testing with Artillery and get same error. When calling fetchSockets function.

const cluster = require("cluster");
const http = require("http");
const { Server } = require("socket.io");
const numCPUs = require("os").cpus().length;
const { setupMaster, setupWorker } = require("@socket.io/sticky");
const { createAdapter } = require("@socket.io/mongo-adapter");
const { MongoClient } = require("mongodb");

const DB = "mydb";
const COLLECTION = "socket.io-adapter-events";

async function main() {
  if (cluster.isMaster) {
    console.log(`Master ${process.pid} is running on port 3000`);

    const httpServer = http.createServer();

    setupMaster(httpServer, {
      loadBalancingMethod: "least-connection",
    });

    httpServer.listen(3000);

    for (let i = 0; i < numCPUs; i++) {
      cluster.fork();
    }

    cluster.on("exit", (worker) => {
      console.log(`Worker ${worker.process.pid} died`);
      cluster.fork();
    });
  } else {

    console.log(`Worker ${process.pid} started`);
    // const mongoClient = new MongoClient("mongodb://localhost:27017/?replicaSet=rs0");
    const mongoClient = new MongoClient("mongodb://localhost:27017/?directConnection=true");
    await mongoClient.connect();
    try {
      await mongoClient.db(DB).createCollection(COLLECTION, {
        capped: true,
        size: 1e6
      });
    } catch (e) {
      console.log("COLLECTION ALREADY EXISTS")
    }
    const mongoCollection = mongoClient.db(DB).collection(COLLECTION);

    const httpServer = http.createServer();
    const io = new Server(httpServer);

    io.adapter(createAdapter(mongoCollection));
    setupWorker(io)

    io.engine.on("connection", (rawSocket) => {
      rawSocket.request = null;
    });

    io.on('connection', async (socket) => {

      console.log('Novo cliente conectado:', socket.id);

      socket.join(`-tenant-${socket.id}`)
      socket.emit(`-tenant-${socket.id}`, "ola")
      socket.join(`-electron-${socket.id}`)
      socket.join(`-teams-${socket.id}`)
      try {
        const electronSockets = await io.in(`-tenant-${socket.id}`).fetchSockets();
        socket.emit(electronSockets.toString());
      } catch (error) {
        console.log("==============================> JA ERA CAIU")
      }

      io.in(`${socket.id}-teste`).emit("hello")

      socket.on('chat message', (msg) => {
        console.log('Mensagem recebida:', msg);
        io.emit('chat message', msg);
        io.emit('teste', msg)
      });

      socket.on('disconnect', () => {
        console.log('Cliente desconectado:', socket.id);
      });
    });

    io.engine.on("connection_error", (error) => {
      console.log("=========================> ERRO ", error.message)
    });
  }
}

main();

This is my file of Artillery:

config:
  target: "http://ipaddress:3000"
  phases:
    - duration: 5
      arrivalRate: 10000
  socketio:
    transports: ["websocket"]

scenarios:
  - name: "Simular conexões e eventos"
    engine: socketio
    flow:
        - think: "2"
        - emit:
            channel: "chat message"
            data: "Henrriky"
        - think: 10
        - emit:
            channel: "join"
            data: "test"
vr7bd commented 3 months ago

I'm facing the same issue. I'm able to reproduce it with this script. I set the number of clients to 5000. Is there any fix for this?

Henrriky commented 3 months ago

I'm facing the same issue. I'm able to reproduce it with this script. I set the number of clients to 5000. Is there any fix for this?

I solve this migrating the source code to Socketioxide of RUST

vr7bd commented 3 months ago

I solve this migrating the source code to Socketioxide of RUST

Are there APIs like fetchSockets to handle clustering or are you running it as a single instance?

Henrriky commented 3 months ago

I solve this migrating the source code to Socketioxide of RUST

Are there APIs like fetchSockets to handle clustering or are you running it as a single instance?

Single instance