Closed benjeffery closed 5 months ago
I'm sure we have an order preserving iterator lying around the place somewhere - ping me if you start implementing this and I'll dig.
There is threaded_map
at https://github.com/tskit-dev/tsinfer/blob/main/tsinfer/threads.py#L52 that does a similar thing. This can't be used directly as we need to use dask futures - although I think I could refactor it to factor out the specific future class used.
When using dask for ancestor matching we currently batch the ancestors into batches of 5000, and send these to dask serially. This lets us store batch results in the resume database. However as some ancestors take longer than others, we waste up to 30% of worker CPU waiting for the final ancestors in a batch to complete, depending on the ratio of workers to batch size.
A better approach would be, for a batch size N to stream N futures to dask at any given point in time. The batch would still be saved, but N-M of the next batch would be queued after M of the current batch are complete. This is straightforward to implement using an iterator that buffers results and returns them in-order.
This is more important where the number of workers approaches N, such as in ukb.