Open dmpetrov opened 7 months ago
Note that with the current architecture, pre_fetch won't do much, since only one File
object exists at a time (assuming no batching).
@rlamy we should change it in a way that pre-caching helps.
Depends on the file API refactoring. Moving indexing to the app level. For now moving back to backlog.
Since we are done with indexing more or less, moving it back to the ready stage cc @rlamy . Might still depend on some work that Ronan is doing now with decoupling datasetquery and datachain.
One of the use cases I have atm is:
One thing that is a bit annoying is that some tools (OpenCV) seems to require a local path. Yes, cache helps in that case and pre-fetch can help - but both require downloading the whole file, while for some operations I just need some header. If someone has ideas how that can be improved - let me know. Is there a way to create file-like-looking object but that is a stream from the cloud underneath?
Some notes:
DatasetQuery.extract()
before (or maybe in) udf.run()
.ArrowRow
(which contains a File
but doesn't inherit from it), we should fetch the row, not the whole file.This means that udf.run()
should receive model instances, not raw DB rows, which requires some refactoring...
This means that udf.run() should receive model instances, not raw DB rows, which requires some refactoring...
where do we receive raw DB rows there? (I wonder if this related or should be taken into account - https://github.com/iterative/studio/issues/10531#issuecomment-2379390308 )
After probably too much refactoring, I can confirm that this can be implemented inside udf.run()
which means that:
pre_fetch
async workersIgnoring a lot of details, the basic idea is to change the implementation of udf.run()
from this:
for db_row in udf_inputs:
obj_row = self._prepare(db_row)
obj_result = self.process(obj_row)
yield self._convert_result(obj_result)
to this:
obj_rows = (self._prepare(db_row) for db_row in udf_inputs)
obj_rows = AsyncMapper(_prefetch_row, obj_rows, workers=pre_fetch)
for obj_row in obj_rows:
obj_result = self.process(obj_row)
yield self._convert_result(obj_result)
where prefetch_row
looks like
async def prefetch_row(row):
for obj in row:
if isinstance(obj, File):
await obj._prefetch()
return row
Note that the latter can easily be generalised to arbitrary models, if we define some kind of "prefetching protocol".
this can be implemented inside
udf.run()
It looks like the right way of solving this. Thank you!
The proposed implementation has a problem: it hangs when run in distributed mode, i.e. when using something like .settings(prefetch=2, workers=2)
. Here's what happens (with some simplifications!) when running a mapper UDF in that case:
UDFDistributor
groups rows into tasks and sends them to distributed.UDFWorker
.task_queue
, to be processed in .run_udf_internal_other_thread()
.input_queue
and sets up udf_results = dispatch.run_udf_parallel(None, n_workers, input_queue)
.udf_results
to put them in the DB.UDFDispatcher
spawns dispatch.UDFWorker
subprocesses that take rows from the input_queue and put results on done_queue
. UDFDispatcher.run_udf_parallel
gets results from done_queue
and yields them.UDFWorker
calls udf.run()
using UDFWorker.get_inputs()
as the value for udf_inputs
.Mapper.run()
, AsyncMapper
starts iterating over UDFWorker.get_inputs()
which eventually blocks, waiting for more input. That blocks the event loop, which blocks AsyncMapper.iterate()
, which means nothing goes to done_queue
, which blocks iterating over udf_results
...AsyncMapper.produce()
doesn't block the event loop by running next(iter(self.iterable))
in a separate thread.Using threading in AsyncMapper.produce()
runs into the issue that iteration needs to be thread-safe, but that seems fixable, see #521. That PR only deals with Mapper
and Generator
though. Regarding the other 2 classes:
Aggregator
.BatchMapper
, but that would probably require the batching to be done inside udf.run()
(i.e. create the batches after prefetching, sending the file objects to the UDF when they're ready) which requires some refactoring in parallel and distributed mode. def get_inputs(self):
while (batch := get_from_queue(self.task_queue)) != STOP_SIGNAL:
yield batch
minor observation - batch
can be renamed here - it's not really a batch, right?
Aggregator
and BatchMapper
should be related to each other, no? both send iterate probably on batches of rows and send them to UDF?
I think prefetch still makes sense (can start fetching the next batch?). I think definitely can be a followup / separate ticket to discuss and prioritize.
We need to download items in async mode before processing them:
pre_fetch
this should enable async file download (per thread) for a given limit of files (like, pre_fetch=10). Like pre_fetch in pytorch datasets. Default should be pre_fetch=2OUTDATED: