Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't pass an existing SQS/SNS url to the adapter. #4

Open
awalsystems opened this issue Jul 9, 2024 · 4 comments
Open

Can't pass an existing SQS/SNS url to the adapter. #4

awalsystems opened this issue Jul 9, 2024 · 4 comments
Labels
question Further information is requested

Comments

@awalsystems
Copy link

When initializing the adapter, it will create a new SQS every time.

That behavior would imply the creation of a new Queue every time a node/task within my scaling group is mounted ! So how can't use the same queue across my nodes for persistance & central brodcasting ( the main motivations behind using an adapter in socket.io )

( For Redis & MongoDb adapters, you can pass the url or table names to use )

@darrachequesne
Copy link
Member

Hi! That's because the queue must start at the last offset of the SNS topic, so the node only receives the messages published after that (and not replay old messages).

Calling io.close() will remove the queue.

process.on("SIGTERM", () => {
  io.close();
});

Another solution would have been to reuse an existing queue, and discard old messages.

@darrachequesne darrachequesne added the question Further information is requested label Jul 10, 2024
@vanhumbeecka
Copy link

There is still an issue with closing SQS queues.
When I call io.close() inside my node program, it does work and successfully destroys the queue.
However, when I run it inside listeners like you mention, it won't delete the queue.

Example code:

const closeServer = (io: Server) => {
    io.close((e) => {
        if (e) {
            logger.error('Failed to close socket.io server', { error: serializeError(e) })
        } else {
            logger.debug('Socket.io server closed')
        }
    })
}

 process.on('SIGTERM',  () => closeServer(io)) // close server on SIGTERM
 process.on('SIGINT',  () => closeServer(io)) // close server on SIGINT

When running locally (and connected to AWS), when I Ctrl+C (and hence send a SIGINT), I'm getting the message Socket.io server closed right before the actual process exit, but the queue remains.

I'm assuming it's simply because the SIGINT will not wait for async functions (like deleting the queues) to close before exiting.
Not sure if there is an easy solution for this? Only thing I can think of is have an separate (outside) job somewhere that deletes all queues with the same prefix before actually starting those new node processes?
Ideas are welcome ;)

Below are the DEBUG logs coming from the package (no mention of even trying to delete the queues):

2024-07-18T14:00:58.222Z [info]         Server Running on port 8484
2024-07-18T14:00:58.222Z [info]         ----------    READY    ----------
  socket.io-aws-sqs-adapter topic [socketio] was successfully created +2s
  socket.io-aws-sqs-adapter creating queue [socketio-58967669ff4722ce] +0ms
  socket.io-aws-sqs-adapter queue [socketio-58967669ff4722ce] was successfully created +2s
  socket.io-aws-sqs-adapter queue [socketio-58967669ff4722ce] has successfully subscribed to topic [socketio] +713ms
  socket.io-aws-sqs-adapter polling for new messages +0ms
  socket.io-aws-sqs-adapter received 1 message(s) +242ms
  socket.io-aws-sqs-adapter ignore message from self +1ms
  socket.io-aws-sqs-adapter polling for new messages +169ms
  socket.io-aws-sqs-adapter received 1 message(s) +5s
  socket.io-aws-sqs-adapter ignore message from self +0ms
  socket.io-aws-sqs-adapter polling for new messages +168ms
  socket.io-aws-sqs-adapter received 1 message(s) +5s
  socket.io-aws-sqs-adapter ignore message from self +0ms
  socket.io-aws-sqs-adapter polling for new messages +179ms
  socket.io-aws-sqs-adapter received 1 message(s) +5s
  socket.io-aws-sqs-adapter ignore message from self +0ms
  socket.io-aws-sqs-adapter polling for new messages +169ms
  socket.io-aws-sqs-adapter received 1 message(s) +5s
  socket.io-aws-sqs-adapter ignore message from self +0ms
  socket.io-aws-sqs-adapter polling for new messages +172ms
^C2024-07-18T14:01:23.937Z [debug]      Socket.io server closed

Process finished with exit code 130 (interrupted by signal 2:SIGINT)

@darrachequesne
Copy link
Member

@vanhumbeecka it seems that the sqsClient.receiveMessage() method used here accepts a signal option to cancel a pending request, that could potentially solve your issue (instead of waiting for 5 seconds). What do you think?

@vanhumbeecka
Copy link

vanhumbeecka commented Jul 22, 2024

@vanhumbeecka it seems that the sqsClient.receiveMessage() method used here accepts a signal option to cancel a pending request, that could potentially solve your issue (instead of waiting for 5 seconds). What do you think?

I think it's worth a try, although I'm not entirely sure we get guarantees this function will run to the end (where it deletes the queues) before process termination happens. It might though.

EDIT: Alternatively, what's missing here is being able to get the names (or better, ARNs) of those queues somehow, in order to manage these outside of this process. (e.g. we could store these names in a datastore to check if these are still being used in a different process or something similar).

Something like this:

const snsClient = new SNS()
const sqsClient = new SQS()

const adapter = createAdapter(snsClient, sqsClient, {
    topicName: 'socketio',
    queuePrefix: 'socketio',
})

const io = new Server(httpServer, {
    adapter: adapter
})

const details = adapter.getDetails() // <-- generic adapter object which contains the queue names in case of SQS adapter

EDIT 2: Changed my mind again 😅 : we could leverage the queueTags to achieve the same results, so no need for this extra method to get queue-names.
E.g. by tagging the queues with a particular version of your deployment, we could distinguish between them.

const adapter = createAdapter(snsClient, sqsClient, {
    topicName: 'socketio',
    queuePrefix: 'socketio',
    queueTags: { Application: 'my-app', Version: '1.2.3' }
})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants