sbsdev / mdr2

Production Management Tool for DAISY Talking Books
GNU Affero General Public License v3.0
0 stars 0 forks source link

Rethink the use of immutant message queues #73

Closed egli closed 1 year ago

egli commented 3 years ago

Message queues are used to build some sort of a state machine that move a production from one state to the next.

  1. If a production has been recorded an external event triggers a function which puts the production on the encoding queue.
  2. If a production is ready to be archived again an external event triggers a function that places the production on the archiving queue
  3. There's also a queue to handle file-export of production state to the company wide ERP system

I guess the queues are implementing some sort of worker queues especially in the case of encoding and archiving which are both costly operations.

The implementation works fairly well but there are some down sides to it:

  1. If the handler i.e. the subscriber of the queue throws an exception the message is sent again. This does causes more problems than it solves, as the underlying reason for the exception is most likely still there. I tried to configure the queue in such a way that re-delivery would not happen but wasn't successful. OTOH you could argue that the subscribers should be more robust and should be able to handle errors more gracefully.
  2. immutant is no longer maintained. It wouldn't hurt to move away from it.

So the questions are:

plexus commented 3 years ago

I've commented on the other ticket regarding error handling, I do think your handlers should handle their own errors, unless they deliberately don't because there is some mechanism higher up that handles it.

It sounds like your needs are fairly basic, you need a way to have an in-process event queue, and some kind of workers to split the work across threads, all inside the same process. In this case I think the stuff that Java gives you is pretty good. The java.concurrent package has a bunch of queue implementation. I think for instance the LinkedBlockingQueue is a good default.

Then you can instantiate a ThreadPoolExecutor, and simply send functions to it to execute.

something like this:

(def executor (ThreadPoolExecutor. (+ 1 (.availableProcessors (Runtime/getRuntime)))
                                 (+ 1 (.availableProcessors (Runtime/getRuntime)))
                                 (long 10000)
                                 TimeUnit/MINUTES
                                 (LinkedBlockingQueue.)))

(.execute executor (fn [] ,,,))
egli commented 1 year ago

The 0.9 release is moving away from immutant and uses plain old core.async. So this issue is no longer relevant