socketio / socket.io-cluster-adapter

The Socket.IO official cluster adapter, allowing to broadcast events between several Socket.IO servers.
https://socket.io
MIT License
15 stars 9 forks source link

fetchSocket not returning response for redis-stream-adapter ioredis #13

Open nilkanth987 opened 1 month ago

nilkanth987 commented 1 month ago

Bug description

Hello Guys, I am facing an issue where fetchSockets is not returning a response in cluster mode. I have tried using ioredis as well as redis connecter, removed polling so do not require sticky, tried using both redis-adapter and redis-stream-adapter but same results.

Steps to reproduce

I was able to reproduce it via a mocha test. Here is my configuration.

node v19.7.0
// package.json
"ioredis": "^5.3.2",
"socket.io": "^4.7.4",
"@socket.io/redis-streams-adapter": "^0.2.2",

socket.js

import { createAdapter } from '@socket.io/redis-streams-adapter'
import { getIoRedisClientForSocket } from 'redis.connection.js'
const io = new Server(httpServer, {
      transports: ['websocket']
    })
    io.adapter(createAdapter(getIoRedisClientForSocket()))
    io.on('connection', (socket) => {
        socket.on('join_a_room', async (params, callback) => {
        try {
          const socketsInRoom = (await io.in(params.roomName).fetchSockets())?.length
          console.log(':::::::::: SOCKETS IN ROOM ::::::: ', socketsInRoom)
          const isRoomEmpty = socketsInRoom === 0
          if (isRoomEmpty) {
            // Do some action here
          }
          socket.join(params.roomName)
          callback(true, { message: 'Joined room', socketsInRoom })
        } catch (error) {
          console.log('>>> ERRR::: ', error)
          callback(false, { message: error.message })
        }
      })
    })

redis.connection.js

import { Redis } from 'ioredis'
export const getIoRedisClientForSocket = () => {
  return new Redis(config.redis.url)
}

socket.test.js

describe.only('Multiple instance testing', () => {
    before(() => {
       // Default server is listing to 4001. Creating new server to listen to 4002. 
      const httpServer = http.Server(app)
      new Socket().listen(httpServer)
      httpServer.listen(4002, () => { console.log('Test server started on 4002') })
    })
    it.only('should get proper response', async () => {
      const userA = await createUser()
      const userB = await createUser({ phone: '9898989888', email: 'test2@test.com' })
      const userAToken = await getLoginToken(userA)
      const userBToken = await getLoginToken(userB)
      const userAsocket = await connectUserSocket(userAToken)
      const userBsocket = await connectUserSocket(userBToken, 'ws://127.0.0.1:4002')
      await new Promise((resolve) => {
        const eventCallback = (isSuccess, data) => {
          console.log('>>>>> userA joined:: ', data)
          expect(isSuccess).to.equal(true)
          resolve()
        }
        userAsocket.emit('join_a_room', {
          roomName: 'test'
        }, eventCallback)
      })
      await new Promise((resolve) => {
        const eventCallback = (isSuccess, data) => {
          console.log('>>>>> userB joined:: ', data)
          expect(isSuccess).to.equal(true)
          resolve()
        }
        userBsocket.emit('join_a_room', {
          roomName: 'test'
        }, eventCallback)
      })
    })
})

OUTPUT

Have added logs at some places

>>>> Stream Adapter >>  {"type":1,"uid":"c42282ebea7991ca","nsp":"/"}
Test server started on 4002
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":1}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":1} 1716207983034-0
[0d79ac3ff828190f] new event of type 1 from c42282ebea7991ca
>>>> Stream Adapter >>  {"type":2,"uid":"0d79ac3ff828190f","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":2}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":2} 1716207983037-0
[0d79ac3ff828190f] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":2}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":2} 1716207983037-0
[c42282ebea7991ca] new event of type 2 from 0d79ac3ff828190f
>>>> SOC : broadcast operator rooms Set(1) { '664b416ea5b08951db0a5c5d' }
>>>> SOC : broadcast operator flags {}
>>>>> SOC cluster adapter counts [] 2
>>>> Stream Adapter >>  {"type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"},"uid":"0d79ac3ff828190f","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}} 1716207983152-0
[c42282ebea7991ca] new event of type 7 from 0d79ac3ff828190f
>>>> SOC onMessage ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}}
>>>> SOC :: cluster adapter :: [c42282ebea7991ca] calling fetchSockets with opts {"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}}
>>>> Stream Adapter >>  {"type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]},"uid":"c42282ebea7991ca","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}} 1716207983152-0
[0d79ac3ff828190f] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}} 1716207983343-0
[c42282ebea7991ca] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}} 1716207983343-0
[0d79ac3ff828190f] new event of type 8 from c42282ebea7991ca
>>>> SOC onMessage ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}}
>>>>> SOC cluster adapter Fetch response received ::  6a404d7bfcfbcffa
>>>> SOC : broadcast operator rooms Set(1) { '664b416fa5b08951db0a5c5f' }
>>>> SOC : broadcast operator flags {}
>>>>> SOC cluster adapter counts [] 2
>>>> Stream Adapter >>  {"type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"},"uid":"c42282ebea7991ca","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}} 1716207983546-0
[0d79ac3ff828190f] new event of type 7 from c42282ebea7991ca
>>>> SOC onMessage ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}}
>>>> SOC :: cluster adapter :: [0d79ac3ff828190f] calling fetchSockets with opts {"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}}
>>>> Stream Adapter >>  {"type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]},"uid":"0d79ac3ff828190f","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}} 1716207983546-0
[c42282ebea7991ca] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}} 1716207983748-0
[c42282ebea7991ca] new event of type 8 from 0d79ac3ff828190f
>>>> SOC onMessage ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}}
>>>>> SOC cluster adapter Fetch response received ::  49de3c88dbea8de6
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}} 1716207983748-0
[0d79ac3ff828190f] ignore message from self
>>>> SOC : broadcast operator rooms Set(1) { 'test' }
>>>> SOC : broadcast operator flags {}
>>>>> SOC cluster adapter counts [] 2
>>>> Stream Adapter >>  {"type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"},"uid":"0d79ac3ff828190f","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}} 1716207983951-0
[0d79ac3ff828190f] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}} 1716207983951-0
[c42282ebea7991ca] new event of type 7 from 0d79ac3ff828190f
>>>> SOC onMessage ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}}
>>>> SOC :: cluster adapter :: [c42282ebea7991ca] calling fetchSockets with opts {"rooms":["test"],"except":[],"flags":{}}
>>>> Stream Adapter >>  {"type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]},"uid":"c42282ebea7991ca","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}} 1716207984153-0
[c42282ebea7991ca] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}} 1716207984153-0
[0d79ac3ff828190f] new event of type 8 from c42282ebea7991ca
>>>> SOC onMessage ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}}
>>>>> SOC cluster adapter Fetch response received ::  de66c29c37ca54b5
:::::::::: SOCKETS IN ROOM :::::::  0
>>>>> userA joined::  { message: 'Joined room', socketsInRoom: 0 }
>>>> SOC : broadcast operator rooms Set(1) { 'test' }
>>>> SOC : broadcast operator flags {}
>>>>> SOC cluster adapter counts [] 2
>>>> Stream Adapter >>  {"type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"},"uid":"c42282ebea7991ca","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}} 1716207984356-0
[0d79ac3ff828190f] new event of type 7 from c42282ebea7991ca
>>>> SOC onMessage ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}}
>>>> SOC :: cluster adapter :: [0d79ac3ff828190f] calling fetchSockets with opts {"rooms":["test"],"except":[],"flags":{}}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}} 1716207984356-0
[c42282ebea7991ca] ignore message from self
>>> ERRR:::  Error: timeout reached: missing 1 responses
    at Timeout._onTimeout (/Users/nilkanthparmar/Documents/Projects/Ellu/ellu_backend/node_modules/socket.io-adapter/dist/cluster-adapter.js:611:28)
    at listOnTimeout (node:internal/timers:568:17)
    at processTimers (node:internal/timers:511:7)
>>>> Stream Adapter >>  {"type":2,"uid":"977b7eaed0b6bef1","nsp":"/"}
>>>>> userB joined::  { message: 'timeout reached: missing 1 responses' }
      1) should get proper response
      1) should get proper response
nilkanth987 commented 1 month ago

Looks like related to #11

nilkanth987 commented 1 month ago

I have been debugging the library and was able to find the bug in library. I have assigned an object with functions in socket.data and it was causing encoding library to fail and crash. Below is the line where the server crashed and didn't respond.

https://github.com/socketio/socket.io-redis-streams-adapter/blob/71ed4e4450ce0e59dd99e9d8c60e4e62bc8b15e0/lib/adapter.ts#L189

Proposed Solution