HenrikBengtsson / Wishlist-for-R

Features and tweaks to R that I and others would love to see - feel free to add yours!
https://github.com/HenrikBengtsson/Wishlist-for-R/issues
GNU Lesser General Public License v3.0
133 stars 4 forks source link

parallel: Two types of 'cluster':s - one a subclass of the other #161

Open HenrikBengtsson opened 5 months ago

HenrikBengtsson commented 5 months ago

Background

The parallel package provides 'cluster' functions for processing tasks in concurrent R processes. Here is an example showing how to create a cluster of two parallel workers running in the background, then we assign x to 1 for the first worker and 2 for the second. Finally, we calculate x^2 across the workers and return the results in a list;

library(parallel)
cl <- makeCluster(2)
void <- clusterEvalQ(cl[1], { x <- 1L })
void <- clusterEvalQ(cl[2], { x <- 2L })

y <- clusterEvalQ(cl, { x^2 })
#> List of 2
#>  $ : int 1
#>  $ : int 4

y <- clusterEvalQ(rev(cl), { x^2 })
str(y)
#>List of 2
#> $ : num 4
#> $ : num 1

Issue

For certain types of parallel frameworks, it might not be able to address a specific worker, but expects that the whole set of workers at once. An example of this is was the mirai 0.13.2 package. mirai provides its own type of cluster;

library(parallel)
cl <- mirai::make_cluster(2)

With mirai (<= 0.13.2), if we repeat the above example, we would get the same results for:

void <- clusterEvalQ(cl[1], { x <- 1L })
void <- clusterEvalQ(cl[2], { x <- 2L })

y <- clusterEvalQ(cl, { x^2 })
#> List of 2
#>  $ : int 1
#>  $ : int 4

UPDATE 2024-05-04: mirai (>= 1.0.0) tries to protect against this; we now get an error when calling clusterEvalQ(cl[1], ...), because cl[1] no longer inherits from cluster - it's only a list.

However, if we attempt to retrieve the results from the parallel workers in reverse order, we get:

y <- clusterEvalQ(rev(cl), { x^2 })
str(y)
#>List of 2
#> $ : num 1
#> $ : num 4

Note how we got (1,4) and not (4,1). This is because the mirai cluster treats cl the same as rev(cl). This is also the reason for why we may get different results when we keep using cl[1], e.g.

y <- clusterEvalQ(cl[1], { x^2 })
str(y)
#> List of 1
#> $ : num 1

y <- clusterEvalQ(cl[1], { x^2 })
str(y)
#> List of 1
#> $ : num 4

y <- clusterEvalQ(cl[1], { x^2 })
str(y)
#> List of 1
#> $ : num 1

I'm not sure if this meets the original design intentions of the parallel package. Regardless, there might be a valid argument for why subsetting cluster objects, or changing the order of the cluster nodes, should not be supported. For instance, the underlying framework might not support addressing parallel workers this way. This is probably the reason why mirai clusters work they way they work.

Suggestion 1

If subsetting should not/cannot be supported, then we need a way to declare that so that we don't use the cluster in a way where it is assumed to work. For example, in the mirai case, we should not assume we can address individual workers via subsetting or re-ording. My suggestion would be to introduce a special class for this, e.g. 'nonAddressableCluster'. This way, we could have:

> cl <- mirai::make_cluster(2)
> class(cl)
[1] "miraiCluster" "nonAddressableCluster" "cluster"

instead of today's:

> class(cl)
[1] "miraiCluster" "cluster"

This way, we could programmatically check if the cluster object supports subsetting. For example, we can do:

stopifnot(!inherits(cl, "nonAddressableCluster"))

This puts the burden on the user and developer who uses the parallel functions to be aware of this.

Suggestion 2

An alternative solution proposed @shikokuchuo (cf. https://github.com/shikokuchuo/mirai/discussions/83#discussioncomment-9154792) would be to remove [.cluster from the parallel package and have parallel backends that support subsetting explicitly implement [ for the specific cluster class. For example, for:

cl <- parallel::makeCluster(2)
class(cl)
#> [1] "SOCKcluster" "cluster"    

we would have a [.SOCKcluster rather than [.cluster as today.

This would be a much bigger change, because it risks breaking existing code and packages.

shikokuchuo commented 4 months ago

First of all, thanks for helpfully updating above that mirai 1.0.0 release no longer suffers from the original issue you identified.

As the documentation efforts are already on Bugzilla, I actually think that this is your best solution rather than a solution in code.

First of all:

  1. My original suggestion (2) I think is now redundant. Replacing all uses of [.cluster in parallel would have been too invasive, and is now unnecessary in my opinion.

  2. Your suggestion (1) demands a new interface from parallel, and as you rightly point out, requires both (i) all backend developers to implement this and (ii) users to know to use it. From a behavioural standpoint, I would think it somewhat naive to assume that this would just happen. Also as you pointed out for suggestion (2), this is also not without risk – it is possible that users may have depended on the class for clusters being a character vector of length 2.

And as to what I am currently thinking:

Suggestion 3 (replaces 2 above)

I had originally missed that [.cluster was actually defined explicitly in the parallel package as an S3 method – I had simply assumed that the [ operator was used internally for subset operations. After all, mirai was the first alternative communications backend implementation and I was working with no existing documentation.

As soon as it emerged through the documentation effort that the [.cluster method existed and that it was defined very inefficiently by preserving class for each operation, it was self-evident that a cluster implementation-specific method should be defined. I believe that any other similar cluster back-end would do the same – it is simply implausible that they would choose to ignore this. The developer is hence incentivised to make the ‘right’ choice here.

Then all that would be required is something like:

isAddressableCluster <- function(x) inherits(x[1], “cluster”)

or

isAddressableCluster <- function(x) identical(class(x[1]), class(x))

As this is so trivial, I think this can simply be defined by a consuming package like your stopifnot(!inherits(cl, "nonAddressableCluster")) example above, rather than needing to be in parallel.

Suggestion 4 (for comparison only)

I think the incentives are powerful enough in (3) above, and the advantage is clear in requiring no change in parallel. However, just to provide an alternative for comparison:

Implement isAddressableCluster() as an S3 generic in parallel and make methods for all existing cluster types, which just return TRUE, but FALSE for the default method.

This would require the developer of a new back-end to explicitly add its own method to return TRUE, if it wants to assert that it supports subsetting.

This is much safer from a behavioural standpoint, and is exactly what the promises package (maintained by Posit) does from version 1.3.0 for the generic is.promising().

This was actually also in response to mirai now being one of the possibilities, whereas previously all the values were hardcoded.

mirai provides its own method which asserts that it may be used anywhere that accepts a 'promise'.