ebonnal / streamable

[Python] Stream-like manipulation of iterables.
Apache License 2.0
131 stars 0 forks source link

add unordered option to mapping operations via new `ordered: bool` param (closes #18) #19

Closed ebonnal closed 2 months ago

ebonnal commented 2 months ago

With @erezsh (issue #18).

Add optional unordering for faster concurrent .map/.amap/.foreach/.aforeach in case of heterogeneously expensive func calls.

codecov-commenter commented 2 months ago

Welcome to Codecov :tada:

Once you merge this PR into your default branch, you're all set! Codecov will compare coverage reports and display results in all future pull requests.

Thanks for integrating Codecov - We've got you covered :open_umbrella:

erezsh commented 2 months ago

Yes, that looks like the correct behavior.

My one concern is that calling as_completed for each iteration might affect the performance: Every time it's called, it instantiates and appends waiters for each future, and then finally also removes the waiters: https://github.com/python/cpython/blob/4b89c5ebfc7d5d4f008eee0ae6da765dfc28e3a9/Lib/concurrent/futures/_base.py#L251

It may not be a big concern right now, but I can imagine situations in which it would be noticeable.

The solution would probably be to reimplement as_completed, but as a class (e.g. class AsCompleted(FuturesManager)) that maintains state and allows adding new futures.

Anyway, this probably belongs in a separate PR, but I thought it's worth mentioning.

ebonnal commented 2 months ago

Oh nice catch! Completely agree we need to get rid of this overhead. I will merge this PR and open an issue. Would be great to tackle this before publishing a release.

(And I can't check now but the async side may have the same issue)

erezsh commented 2 months ago

P.S. in the tests, you have a typo in indentity_sleep

ebonnal commented 2 months ago

Thanks for the typo catch @erezsh 🙏🏻

And also my bad, the next(iter(as_completed(...))) partial iteration approach would be leaky, actually never removing the waiters. I went for a proper done_futures, _ = wait(futures, return_when=FIRST_COMPLETED).

Here is the follow up issue to reuse waiters #20 (note that async side can benefit from this logic too).

If it sounds good I will merge and make a release. Thanks again for collaborating, appreciate it.