UCLH-Foundry / PIXL

PIXL Image eXtraction Laboratory
Apache License 2.0
8 stars 0 forks source link

Enable asynchronous consuming of messages #324

Closed stefpiatek closed 5 months ago

stefpiatek commented 5 months ago

Had a great time, turns out the way we configured pika-aio was never switching to pick up the next message from the queue while in an await block. You can follow my fun debugging process if you want a laugh. But it now does what it was intended to do 🎉

Implemented their quickstart for async processing, basically queue.consume(callback) so that aio-pika will do its async magic.

The qos set_qos(prefetch_count argument sets the total number of messages which can be asynchronously processesed at once. Single threaded we average one message every 5 seconds, so 100 would allow for a maximum of 20 messages processed per second which allows us to use the token bucket to go to 12-15 with some wiggle room in case some message types take more

peshence commented 5 months ago

Looks great, for my understanding though - the async loop waits for each message to be processed before continuing to the next, while now if one message is being awaited, the next can start being processed?

stefpiatek commented 5 months ago

Looks great, for my understanding though - the async loop waits for each message to be processed before continuing to the next, while now if one message is being awaited, the next can start being processed?

Yep, an async for loop doesn't actually parallelise the iterable objects, it allows blocking iteration of the iterable