Open fvdbeek opened 4 years ago
Examples are intended to be worth-a-thousand-words to connect familiar ideas with their specific usages in aio. I thought I had an example with aio-make-callback and aio-chain somewhere, but, you're right, it's difficult to understand out how it works. I'll have to fix that.
However, aio-chain is not designed for your needs here. Your case has a simple solution if using semaphores: Initialize the semaphore to the per-second rate limit, and before posting on the semaphore after completing a request, await on an aio-sleep for one second. This will delay returning "tickets" and cap the request rate. If the rate is expressed in N requests per minute, then initialize the semaphore to N and sleep for one minute instead.
You said you need to maintain the request order. I'm a little hesitant to say that aio does this automatically. It use fair queues and I don't expect this to ever change, but maybe I want to leave the options open. Currently waiters on a semaphore will be awoken in the same order they queued. If you want to be explicit about order, and especially if you want it to be bounded (back pressure is important), you could build your own bounded queue out of a couple of semaphores: one holds the number of items in the queue, the other holds the number of empty slots.
https://lucumr.pocoo.org/2020/1/1/async-pressure/
I highly recommend checking out The Little Book of Semaphores. It's ostensibly about threads, but it applies equally well to any concurrent system design with ordering constraints, including aio, Python asyncio, etc.
Thank you very much. I will use semaphores then. Thanks for the pointer to the book as well, I always like a good read.
About maintaining the request order: the following test suggests the order is not maintained? I would expect the output to be an increasing number from 0 to 99, but it is not.
Am I doing something wrong here?
(defun test-semaphore (max-parallel requests)
(let ((semaphore (aio-sem max-parallel)))
(dotimes (i requests)
(aio-with-async
(aio-await (aio-sem-wait semaphore))
(destructuring-bind (status . buffer)
(aio-await
(let ((url-request-method "POST")
(url-request-extra-headers `(("Content-Type" . "application/x-www-form-urlencoded")))
(url-request-data (concat "counter=" (number-to-string i))))
(aio-url-retrieve "https://httpbin.org/anything")))
(aio-sem-post semaphore)
(funcall (lambda (string)
(let* ((json-object-type 'plist)
(json-object (json-read-from-string string)))
(print (plist-get (plist-get json-object :form) :counter))))
(with-current-buffer buffer
(prog1
(buffer-substring (1+ url-http-end-of-headers) (point-max))
(kill-buffer)))))))))
(test-semaphore 10 100)
The re-ordering you're seeing is from url-retrieve. Requests are handed to url-retrieve in order per the aio semaphore, but not only are they resolved out of order, the requests aren't even made in the order they were handed to url-retrieve. I verified this by pointing the requests at my own web server. You can see this for yourself by skipping aio and just using url-retrieve directly, meaning you'd be facing this problem even without aio.
Unfortunately I don't think there's a way around this. This is par for the course with url-retrieve, where this is just one of many issues. It's why in Elfeed I switched to a custom solution based on curl.
Aha, I now remember your blog post about the url-retrieve misbehavior. Well, I guess I have to find another solution. Maybe I will work around my ordering issue by retrieving the urls in a random order and sorting them afterwards. Thank you very much for your work and your response to this issue.
I think this mechanism would work well for an aio adaptation for the websocket package: https://github.com/ahyatt/emacs-websocket but I, too, don't understand how to use it. Any more docs or an example (pseudo-code) that would get me started?
You're right that this is under-documented. The only example at the moment is in aio-test.el.
I would like to use emacs-aio for my package emacs-pubmed. The aim is to retrieve multiple urls in queue, i.e. while maintaining the order (see issue 3). Meanwhile, I have to respect a limit of requests/second.
I find this difficult to achieve with the semaphore API, and thought maybe the
aio-make-callback
function andaio-chain
macro could be more appropriate. However, I don't quite understand how they work.Could you provide an example?