Project-OMOTES / omotes-system

GNU General Public License v3.0
0 stars 0 forks source link

Clean up left-over job status, progress & result queues #80

Open lfse-slafleur opened 2 months ago

lfse-slafleur commented 2 months ago

In certain faulty situations it may happen that a queue for job status, progress & results may not be removed and it will remain indefinitely. In order to properly clean up the resources, we need to:

If this happens, it happens probably because the relevant SDK is offline and did not reconnect to the relevant queues and it will not reconnect. Therefore, we cannot rely on the SDK normal operation for the clean up. However, we need to make sure we NEVER delete a queue that contains messages and may be read at some point in the future. 3 components appear candidates:

1) RabbitMQ broker using queue ttl

We also need to ensure that the SDK reconnect can handle if a queue is not available (throw a proper exception instead of just crashing) e.g. use the callback_on_no_message over here: https://github.com/Project-OMOTES/omotes-sdk-python/blob/main/src/omotes_sdk/omotes_interface.py#L176C13-L176C35

lfse-slafleur commented 1 month ago

Another resource: look for auto-delete https://www.rabbitmq.com/docs/queues

cwang39403 commented 1 month ago

Hey @lfse-slafleur ,

I made two preliminary commits in SDK and Orchestrator repo. as attempts to address this issue.

The automatic job queues cleaning-up approach is based on utilizing queue and message TTL arguments when declaring the queues. If we specify message_ttl < queue_ttl, additionally with dead_letter_exchange and dead_letter_routing_key, the job result will be first dead-lettered and republished to the dead letter queue before the queue TTL is reached and deleted.

Considering the requirements listed above:

Additionally, with the current setup, the job results are dead-lettered and retained permanently in the dead letter queue until any further intervention. Do we want to do anything with these dead-lettered messages? (e.g. also define TTL on these messages? expose and consume these messages by the clients in some way?)

lfse-slafleur commented 4 weeks ago

Hea @cwang39403 !

I like the approach. From an architectural point-of-view, I do believe that the timeout on how long a queue will survive needs to be set by the SDK which is what you do over here: https://github.com/Project-OMOTES/omotes-sdk-python/commit/678dfd09ec78628d264d32fedd1169660759e8b3#diff-88ccb7d1df8ecf0ace917d21e9a7ad6561ac792085c16a56f377d927181714a3R108 . This is great! Perhaps we can make this configurable just so an SDK can turn off this feature if necessary (by setting the timeouts to None/infinite or something).

With your approach, the (result) messages are already deadlettered which is great! This allows us to log clean ups based on messages instead of queues so we should be able to fulfill the requirement if we change it a little: Log that the deadlettered messages which were found and removed This is something the orchestrator could do so perhaps we make the orchestrator a subscriber to the deadletter queue? Regarding names, perhaps we can keep it to something like job-result-message-dlq (dlq is the acronym for dead-letter queue) as sometime in the future we may have multiple dead letter queues.

Could you also confirm if a queue may be persistent AND have a queue-TTL and contains messages with message-TTL? What happens when RabbitMQ is rebooted and comes online before the TTL should take into effect? What happens when RabbitMQ is rebooted and comes online after the TTL should take into effect?

Curous to your thoughts!

lfse-slafleur commented 4 weeks ago

Curious also to your findings on what a dead-lettered message looks like. Perhaps it will show us the queue name it was originally from as well. (see comment on the PR)

cwang39403 commented 4 weeks ago

Hey @lfse-slafleur , thanks for a couple of nice suggestions and questions! My first reaction to these comments:

cwang39403 commented 3 weeks ago

I just updated the comment above with the finding of What happens when RabbitMQ is rebooted and comes online before the TTL should take into effect? What happens when RabbitMQ is rebooted and comes online after the TTL should take into effect?

cwang39403 commented 3 weeks ago

Hey @lfse-slafleur,

A follow-up on the found issue mentioned above: Job result message is deleted but not dead-lettered to the DLQ after RabbitMQ is rebooted after the TTL

I repeated the similar test steps above just to be sure. This time with JOB_QUEUES_TTL and JOB_RESULT_MESSAGE_TTL set as 2 and 1 mins respectively. The observation result stays the same, the job result message is deleted but not found in the DLQ, and the queue is deleted after another 2 minutes.

After some Googling, this might be explained with the following:

lfse-slafleur commented 1 day ago

Orchestrator will be updated with https://github.com/Project-OMOTES/omotes-system/pull/101

Should be used together with SDK >= 3.1.0

Missing: system test(s) & documentation

lfse-slafleur commented 1 day ago

This ticket will be followed up by a new change. In the current design there is a message TTL & a queue TTL. However, in some cases it is possible that the message TTL is reached (and the message expired) but the queue TTL is not reached. For example, if a result in a job_result queue has a message TTL of 2 hours but the queue job_result has a queue TTL of 4 hours, than if the SDK reconnects after 3 hours then the message will have been expired but the queue will stay indefinitely. The SDK will wait indefinitely for a message.

@cwang39403 has been working on this feature and through his experience we have found that the combination of message TTL and queue TTL is too complex. Expiration needs to be an atomic step or else the SDK might have a different conclusion than the broker and/or orchestrator.

Therefor, we propose to drop the message TTL requirement and not use deadletter queues. Instead, the queue will be dropped upon reaching the queue TTL including any messages it contains.

This still fits with most requirements:

However, we will drop the requirement that we will log a message/queue/job being considered stale/expired when it reaches the expiration criterium. Instead, we will leverage the SDK to figure out something went wrong.

This is allowed because:

TODO:

Work will be continued in this ticket: https://github.com/Project-OMOTES/omotes-sdk-python/issues/67

This change solves the following issues:

See https://github.com/Project-OMOTES/omotes-sdk-python/pull/63 for more indepth discussions on these issues.

cwang39403 commented 1 day ago

@lfse-slafleur About the TODO

_We need to check if a passive queue declare when the queue does not exist results in a channel closed. If so, we need to change the underlying code of brokerinterface.py to use a new channel on each queue subscription instead of putting all traffic instead a single AMQP channel per connection.

At the moment, there is a reconnect argument in connect_to_submitted_job in https://github.com/Project-OMOTES/omotes-sdk-python/pull/63/files, and use self.broker_if.queue_exists (namly https://aio-pika.readthedocs.io/en/latest/apidoc.html#aio_pika.Channel.get_queue) to check if the queue exists. Does that suffice?