Open lmmx opened 3 years ago
After thinking about this more, it's not clear if rewriting in this way might risk retrieving too much to disk at once...
There’s no need to ‘retrieve too much to disk at once’, in particular if garbage collection is done routinely it should remain minimal. This is currently being done in the synchronous procedure.
However, the retries for the async procedure are currently being done around the entire loop, which seems like a mistake (but the comment says it worked originally). I suspect it may work since the ones already in the database may be being skipped (or if not, that’d be a good way to skip them on the retry round). However, the limit of 3 retries then applies to the entire listings, so I don’t think it’s a sensible decision.
On re-reading the code, I’m also uncertain whether the starmap iteration over the AsyncClient context manager block is actually going to be amenable to the range stream approach (they may not be compatible). I may be misreading it, but it looks like that block is doing requests itself (not just the callbacks to process_archive which calls inflate_archive, which would be fine if so):
This line in particular shows the use of fetch
, which calls:
so this is sending plain GET requests. This will make it look like range requests are being used, but they’re not (the problem is that the stream’s URL itself is what runs through this loop, not the stream object, so to reach that you’d need to cross-reference the stream list by the URL I guess — or rewrite it to pass the stream through, maybe that’d be OK).
This isn't working nicely because the async isn't really compatible with the streaming approach, and the multiprocessing seems to not really have been implemented?
Essentially, the approach of an async job is to go grab all the data, store it on the frozen_archive
attribute of the CondaArchiveStream
(Multi)processing it afterwards from that attribute [on all cores] means amassing a very large amount of data on disk, and I don't really think that's wise (to multiprocess). Should probably remove the multiprocessing and just process on the fly.
The async could work fine with the range streams, but it'd need to be put somewhere more carefully (within the program logic) than you need when using non-streaming responses (just calling response.aread()
).
Essentially I don't think you can mix these paradigms, at least not while delegating the details to the range_streams
package.
I suggest instead of trying to make generate_db_async_streaming.py
work, downgrade expectations to generate_db_async.py
. Also note that this will be incompatible with restarting the DB population (must entirely renew at each time or else risk either [not catchable] missing packages and/or [catchable] repeat insertion).
To clarify: range-streams does not currently support async clients (https://github.com/lmmx/range-streams/issues/26) so this can't be implemented until it does.
The routine in
generate_db.py
is synchronous (blocking), and also completely sequential (not parallelised)Both of these should be addressed:
Note on terminology/analogy:
impscan.CondaArchive
is analogous tobeeb.Schedule
CondaArchive
is in a “deferred” state, i.e. not sending a GET request to the package source URL upon initialisation, instead whenpull_and_parse
is calledCondaArchive
pulled archive is “raw” (not yet inflated)This decouples retrieval and inflation. The inflation looks something like: