socketio / socket.io-aws-sqs-adapter

The Socket.IO adapter for AWS Simple Queue Service (SQS), allowing to broadcast events between several Socket.IO servers.
https://www.npmjs.com/package/@socket.io/aws-sqs-adapter
MIT License
4 stars 1 forks source link

Can't pass an existing SQS/SNS url to the adapter. #4

Open awalsystems opened 1 month ago

awalsystems commented 1 month ago

When initializing the adapter, it will create a new SQS every time.

That behavior would imply the creation of a new Queue every time a node/task within my scaling group is mounted ! So how can't use the same queue across my nodes for persistance & central brodcasting ( the main motivations behind using an adapter in socket.io )

( For Redis & MongoDb adapters, you can pass the url or table names to use )

darrachequesne commented 1 month ago

Hi! That's because the queue must start at the last offset of the SNS topic, so the node only receives the messages published after that (and not replay old messages).

Calling io.close() will remove the queue.

process.on("SIGTERM", () => {
  io.close();
});

Another solution would have been to reuse an existing queue, and discard old messages.

vanhumbeecka commented 1 month ago

There is still an issue with closing SQS queues. When I call io.close() inside my node program, it does work and successfully destroys the queue. However, when I run it inside listeners like you mention, it won't delete the queue.

Example code:

const closeServer = (io: Server) => {
    io.close((e) => {
        if (e) {
            logger.error('Failed to close socket.io server', { error: serializeError(e) })
        } else {
            logger.debug('Socket.io server closed')
        }
    })
}

 process.on('SIGTERM',  () => closeServer(io)) // close server on SIGTERM
 process.on('SIGINT',  () => closeServer(io)) // close server on SIGINT

When running locally (and connected to AWS), when I Ctrl+C (and hence send a SIGINT), I'm getting the message Socket.io server closed right before the actual process exit, but the queue remains.

I'm assuming it's simply because the SIGINT will not wait for async functions (like deleting the queues) to close before exiting. Not sure if there is an easy solution for this? Only thing I can think of is have an separate (outside) job somewhere that deletes all queues with the same prefix before actually starting those new node processes? Ideas are welcome ;)

Below are the DEBUG logs coming from the package (no mention of even trying to delete the queues):

2024-07-18T14:00:58.222Z [info]         Server Running on port 8484
2024-07-18T14:00:58.222Z [info]         ----------    READY    ----------
  socket.io-aws-sqs-adapter topic [socketio] was successfully created +2s
  socket.io-aws-sqs-adapter creating queue [socketio-58967669ff4722ce] +0ms
  socket.io-aws-sqs-adapter queue [socketio-58967669ff4722ce] was successfully created +2s
  socket.io-aws-sqs-adapter queue [socketio-58967669ff4722ce] has successfully subscribed to topic [socketio] +713ms
  socket.io-aws-sqs-adapter polling for new messages +0ms
  socket.io-aws-sqs-adapter received 1 message(s) +242ms
  socket.io-aws-sqs-adapter ignore message from self +1ms
  socket.io-aws-sqs-adapter polling for new messages +169ms
  socket.io-aws-sqs-adapter received 1 message(s) +5s
  socket.io-aws-sqs-adapter ignore message from self +0ms
  socket.io-aws-sqs-adapter polling for new messages +168ms
  socket.io-aws-sqs-adapter received 1 message(s) +5s
  socket.io-aws-sqs-adapter ignore message from self +0ms
  socket.io-aws-sqs-adapter polling for new messages +179ms
  socket.io-aws-sqs-adapter received 1 message(s) +5s
  socket.io-aws-sqs-adapter ignore message from self +0ms
  socket.io-aws-sqs-adapter polling for new messages +169ms
  socket.io-aws-sqs-adapter received 1 message(s) +5s
  socket.io-aws-sqs-adapter ignore message from self +0ms
  socket.io-aws-sqs-adapter polling for new messages +172ms
^C2024-07-18T14:01:23.937Z [debug]      Socket.io server closed

Process finished with exit code 130 (interrupted by signal 2:SIGINT)
darrachequesne commented 1 month ago

@vanhumbeecka it seems that the sqsClient.receiveMessage() method used here accepts a signal option to cancel a pending request, that could potentially solve your issue (instead of waiting for 5 seconds). What do you think?

vanhumbeecka commented 1 month ago

@vanhumbeecka it seems that the sqsClient.receiveMessage() method used here accepts a signal option to cancel a pending request, that could potentially solve your issue (instead of waiting for 5 seconds). What do you think?

I think it's worth a try, although I'm not entirely sure we get guarantees this function will run to the end (where it deletes the queues) before process termination happens. It might though.

EDIT: Alternatively, what's missing here is being able to get the names (or better, ARNs) of those queues somehow, in order to manage these outside of this process. (e.g. we could store these names in a datastore to check if these are still being used in a different process or something similar).

Something like this:

const snsClient = new SNS()
const sqsClient = new SQS()

const adapter = createAdapter(snsClient, sqsClient, {
    topicName: 'socketio',
    queuePrefix: 'socketio',
})

const io = new Server(httpServer, {
    adapter: adapter
})

const details = adapter.getDetails() // <-- generic adapter object which contains the queue names in case of SQS adapter

EDIT 2: Changed my mind again 😅 : we could leverage the queueTags to achieve the same results, so no need for this extra method to get queue-names. E.g. by tagging the queues with a particular version of your deployment, we could distinguish between them.

const adapter = createAdapter(snsClient, sqsClient, {
    topicName: 'socketio',
    queuePrefix: 'socketio',
    queueTags: { Application: 'my-app', Version: '1.2.3' }
})