HenrikBengtsson / future

:rocket: R package: future: Unified Parallel and Distributed Processing in R for Everyone
https://future.futureverse.org
951 stars 83 forks source link

Incremental async reduction, e.g. value(fs, reduce = ...) #681

Open bwlewis opened 1 year ago

bwlewis commented 1 year ago

Asynchronous incremental reduction

Asynchronous incremental reduction applies a reduction function to incremental results as they are delivered (that is, asynchronously). It's implemented in foreach using the .combine function argument. That argument is simply a standard reduction function similar the argument used by standard R Reduce. Because no order is guaranteed, the reduction function must be commutative and associative (however, foreach also provides an .inorder argument that caches results locally untill order can be guaranteed).

This can be (very) useful in cases when the full Map result is too large to fit in memory and the reduction function output is small. Using incremental async reduce in those cases lowers memory pressure on the coordinating R process. Note that, as far as I know, async reduce is only implemented in doRedis (edit: on CRAN anyway, I know that there were a number of forks of doMPI that had something similar).

In principle, this might be achieved in future using the resolve function by adding a reduction function argument? That could be an elegant solution.

Alternatively, the resolved function plus looping and polling might be able to achieve something similar, as sketched in the doc for resolved:

It should also be possible to use the method for
polling the future until it is resolved (without having to wait
infinitely long)

But there are no examples, and this also seems cumbersome.

HenrikBengtsson commented 1 year ago

In principle, this might be achieved in future using the resolve function by adding a reduction function argument? That could be an elegant solution.

Segue: value() is a generic function. The S3 methods for lists, environments, and list environments (listenv package) gather the results for the input set of futures as they get ready. The results are return in the same order as the input. Now, they're implemented to also preserve the relaying order of stdout and conditions, but as soon as they can, they re-output/re-signal them. (There's also a mechanism to drop them from memory when no longer needed).

I can imagine that the value() function could support an optional reduction argument for doing reduction "on the fly".

The only way I can see a reduction function to work properly is that it reduces the elements in the same order as Reduce() would do on the final result. That is, we need to reduce the "head" and the next element in order. (This is the same constraint that stdout and conditions are relayed).

Q. In the extreme, if the result of the first element comes in last, then you cannot do any reduction (same for relaying of output and conditions). Is this how you also anticipate this to work? Or do you think there should be support for relaxing this constrain, e.g. reduce as soon as possible?

HenrikBengtsson commented 1 year ago

Q. In the extreme, if the result of the first element comes in last, then you cannot do any reduction (same for relaying of output and conditions). Is this how you also anticipate this to work? Or do you think there should be support for relaxing this constrain, e.g. reduce as soon as possible?

Oh, I forgot that you already wrote "Because no order is guaranteed, the reduction function must be commutative and associative". So, yes, that would make it possible to reduce ASAP. And then you wrote "(however, foreach also provides an .inorder argument that caches results locally untill order can be guaranteed)", which I think should also be supported.

So, maybe something like:

fs <- lapply(1:3, function(x) future(sqrt(x))
y <- value(fs, reduce = `+`)

and

fs <- lapply(1:3, function(x) future(sqrt(x))
y <- value(fs, reduce = structure(`-`, inorder = TRUE))

Maybe inorder = TRUE should be the default, to minimize the risk for silent bugs.

bwlewis commented 1 year ago

yes, looks similar to foreach approach but retains the flexibility of future.

I guess it needs an init argument too, like Reduce and foreach. Foreach also has arguments to control behavior of reduction functions with more than 2 arguments (multicombine/maxcombine).

It's unfortunate all those complicated control parameters are needed, but I don't see a more elegant way.

Anyway, I think this would be a valuable addition to future!

Even in the sequential case it is useful. Unlike Reduce(Map) or tidy reduce(map)), value(future, reduce) would not need to materialize the full mapped result.

On 5/7/23, Henrik Bengtsson @.***> wrote:

Q. In the extreme, if the result of the first element comes in last, then you cannot do any reduction (same for relaying of output and conditions). Is this how you also anticipate this to work? Or do you think there should be support for relaxing this constrain, e.g. reduce as soon as possible?

Oh, I forgot that you already wrote "Because no order is guaranteed, the reduction function must be commutative and associative". So, yes, that would make it possible to reduce ASAP. And then you wrote "(however, foreach also provides an .inorder argument that caches results locally untill order can be guaranteed)", which I think should also be supported.

So, maybe something like:

fs <- lapply(1:3, function(x) future(sqrt(x))
y <- value(fs, reduce = `+`)

and

fs <- lapply(1:3, function(x) future(sqrt(x))
y <- value(fs, reduce = structure(`-`, inorder = TRUE))

Maybe inorder = TRUE should be the default, to minimize the risk for silent bugs.

-- Reply to this email directly or view it on GitHub: https://github.com/HenrikBengtsson/future/issues/681#issuecomment-1537288362 You are receiving this because you authored the thread.

Message ID: @.***>

HenrikBengtsson commented 1 year ago

What's an example where multicombine/maxcombine is needed? Why wouldn't pairwise reduce be sufficient?

bwlewis commented 1 year ago

Yes indeed think pairwise reduce is sufficient. The multicombine option was probably an example of premature optimization that's not really needed. (the original idea was to slightly more efficiently support functions that take an ... argument like c, sum, rbind, etc.)

On 5/7/23, Henrik Bengtsson @.***> wrote:

What's an example where multicombine/maxcombine is needed? Why wouldn't pairwise reduce be sufficient?

-- Reply to this email directly or view it on GitHub: https://github.com/HenrikBengtsson/future/issues/681#issuecomment-1537439431 You are receiving this because you authored the thread.

Message ID: @.***>