futureverse / future.apply

:rocket: R package: future.apply - Apply Function to Elements in Parallel using Futures
https://future.apply.futureverse.org
211 stars 16 forks source link

Adding customChunks to future.apply functions #83

Open bgall opened 3 years ago

bgall commented 3 years ago

Chunking in future.apply

future.apply currently relies on the internal makeChunks.R function to partition elements for processing into "chunks" that are sent to workers for processing. makeChunks outputs a list of integer vectors, where each vector is a "chunk" and its elements are the indices representing the elements to be processed in the input object (often a list).

Users have some control over the generation of chunks via the future.apply arguments future.chunk.size (which specifies the average number of elements per chunk a user prefers) and future.scheduling (which specifies the order of chunk processing). Furthermore, they can control the processing order of chunks with the ordering attribute of future.chunk.size or future.scheduling.

Nevertheless, this control is limited and even the sensible defaults of makeChunks can produce substantial load imbalance across workers and resulting inefficiency. Some of this inefficiency could be reduced if users were able to better-control chunk generation. While some of this inefficiency may be averted by the dynamic balancing of future.apply. the costs of dynamic balancing itself can be non-trivial.

The Purpose of makeChunks

Currently, makeChunks accomplishes two tasks:

  1. Generates chunks by partitioning elements to be processed.
  2. Specifies the order in which chunks are processed.

Ideally, the former is redundant as the elements of the object users pass future.apply would be the chunks they want processed and nbrOfElements == nbrOfWorkers and the latter is redundant as chunks are generated such that chunks are indexed in the order in which they should be processed. This allows for efficient static load balancing with chunks already balanced and one chunk per worker. However, users often pass objects where the ordering is ad-hoc and chunking not planned.

Adding customChunks

I envision two approaches to improving the flexibility of chunking in future.apply:

  1. Add a customChunks argument to future.apply functions

Users could pass a list to customChunks. future.applywould use this list instead of the list thatmakeChunksreturns. Ifis.null(customChunks) == TRUE, then the status quo internalmakeChunksfunction is used. Ifis.null(customChunks) == FALSE,makeChunks` is not executed and all other chunk-related arguments are ignored.

The primary motivation for this is that users may wish to (a) have complete control over chunking and ordering of chunks, (b) do so without modifying their input to `future.apply (i.e. avoid creating deeper objects or repeatedly rearranging the elements of their object just for processing) and (c) create more interpretable code distinguishing between the input object, the plan for processing, and processing itself. This also helps decouple functions for working in parallel from functions for serial pre-processing.

  1. Add a customChunks argument to future.apply functions and export makeChunks

Users could pass their object to makeChunks and pass the result to to the customChunks argument of future.apply. In the event that customChunks == NULL, future.apply would call makeChunks as usual. This would allow users to generate chunks with makeChunks either inside or outside of future.apply. The upside to this is that users can directly observe and edit the output of makeChunks.

I could submit a pull request implementing this, but I'm not sure when/how makeChunks is called internally - don't see it in makefiles or the definitions of the future.apply functions.

HenrikBengtsson commented 3 years ago

Thanks for this. I've thought a little bit more about your use case that you mentioned on Twitter. I agree, those needs requires something that's ortogonal to controlling the processing order of elements.

Here's how I understand the problem and your ask:

Let's assume we've got a list X of six elements to be processed. They're indexed 1:6. Let's assume that the first element, X[[1]], takes 6 minutes to process, elements two and three, X[[2]] and X[[3]], each takes 3 minutes to process, and the remaining elements, X[[4]], X[[5]], and X[[6]], each takes 2 minute to process.

If we process X sequentially, the total processing time will be sum(c(6, 3, 3, 2, 2, 2)) = 18 minutes. If we would run them in parallel on 6 workers using scheduling = 1.0, then each worker will process one element and it will all complete in max(c(6, 3, 3, 2, 2, 2)) = 6 minutes.

Now, let's assume we have 3 workers. If we use scheduling = 1.0, the elements will be chunked in groups of two elements; X[1:2], X[3:4], and X[5:6], and the processing will complete after `max(c(sum(c(6, 3)), sum(c(3, 2)), sum(c(2, 2)))) = max(c(9, 5, 4)) = 9 minutes.

What you're asking for is a way for the 3 workers to instead process the elements in 3 differently sized chunks so that it will complete sooner. In this case, it's more efficient if the chunking would be X[1], X[1:2], and X[4:6], which would complete in max(c(6, sum(c(3, 3)), sum(c(2, 2, 2)))) = max(c(6, 6, 6)) = 6 minutes.

One can of course also consider the case with, say, 2 or 4 workers. For example, with 2 workers, one might want to chunk it up as X[c(1,6)], X[c(2:5)], which would finish after max(c(sum(c(6, 2)), sum(c(3, 3, 2, 2)))) = max(c(8, 10)) = 10 minutes.

Did I understand it correctly? If so, I'll follow up with more comments.

bgall commented 3 years ago

@HenrikBengtsson Yes, that summarizes it. It's my understanding that dynamic balancing could produce the efficient distribution you mentioned, but would be strictly less efficient than the static approach:

it's more efficient if the chunking would be X[1], X[1:2], and X[4:6], which would complete in max(c(6, sum(c(3, 3)), sum(c(2, 2, 2)))) = max(c(6, 6, 6)) = 6 minutes.

Building off a point you raised earlier and continuing with the case of wishing to process a 6-element list, I see two ways in which users could incorporate information about complexity of each element:

  1. Replacing the output of makeChunks with their own list. This is what I focus on in the first comment.
  2. Passing a length-6 numeric vector characterizing the relative complexity of each item to makeChunks, which somehow uses this to generate better-balanced chunks.

The first approach requires the user to not just have information about the complexity of each element but also to use that information to generate balanced groupings of elements and then build a list reflecting those groupings.

The second approach simply requires the users to know something about the complexity of each element. A new, internal optimization function could then use that information to try to find chunks generating better balance. Of course, the status quo and/or dynamic balancing might be more efficient than finding something like an optimum - and I imagine someone more well-versed in algorithms could state the precise tradeoff!

HenrikBengtsson commented 3 years ago

Great. So, one approach would then be to make it possible to specify the relative compute load of each element. For simplicitly, lets say compute load and processing time are synonyms. With the above example, we would the have compute loads, or "compute weights", that are:

weights <- c(6, 3, 3, 2, 2, 2)

Together with the information on the number of workers available, nbrOfWorkers(), it should be possible to come up with an algorithm that would produce chunks with as similar weights as possible. For example, with 3 workers such an algorithm could find the following solution:

chunks <- list(1L, 2:3, 4:6)

where the chunk weights are:

chunk_weights <- sapply(chunks, FUN = function(idxs) sum(weights[idxs]))
print(chunk_weights)
[1] 6 6 6

The advantage of having an algorithm that identifies chunks based on element "weights" is that it avoids having to know what the number of workers is. I think we want to stay away from the code having to be aware of the number of workers, or even the concept of a worker. That's part of the core design philosophy of the Future API.

Identifying the optimal chunk configuration given element weights is probably an computational expensive task, but my gut feeling is that an good-enough algorithm can do it pretty fast.

So, what do you think about being able to do something like:

y <- future_lapply(
       X,
       FUN = my_fcn,
       future.scheduling = structure(1.0, weights = c(6, 3, 3, 2, 2, 2))
     )

? Obviously, it's not hard to imagine something like:

y <- future_lapply(
       X,
       FUN = my_fcn,
       future.scheduling = structure(1.0, weights = estimate_compute_load(X))
     )

PS. Thinking ahead. Another advantage of the abstraction of element-wise "weights" and compute times is that it fits nicely with plans on being able to specify resource requirements for futures. If we specify the compute time for a future, then the parallel backend can make decisions on how and where it want to process such a future. That is something for instance HPC schedulers are good at, so it's not hard to imagine that the above weights can be passed in some form to a HPC scheduler with the future.batchtools backend, if that's being used.

bgall commented 3 years ago

A few thoughts:

  1. Both the current makeChunks function and your proposal rely on nbrOfWorkers, so I am not sure what you mean when you write:

The advantage of having an algorithm that identifies chunks based on element "weights" is that it avoids having to know what the number of workers

Note that the primary efficiency gains arise from setting nbrOfWorkers() == length(chunks) since chunking is a pre-processing step where you use information about the number of workers to improve performance. If you did not know that information, you would typically just want to set each element as its own chunk. Of course, this is assuming the # of workers is constant across the makespan.

  1. Regardless of whatever balancing and chunking algorithm is built into future.apply, it seems beneficial and relatively trivial to implement the option for users to pass a list to a customChunks argument which is then then used in lieu of makeChunks or a weight-based alternative. The benefit of this is that users can chunk on whatever criteria they prefer (based on weights, numbers of elements, whatever) using whatever algorithm they prefer to find optimal chunking. In my opinion, this is the first priority.

  2. Passing weights to scheduling seems like a confusing way of implementing this and it would be more transparent what is happening if the weights and number of chunks desired were passed to an exported makeChunks function. Defaults can ensure that this doesn't break any code (i.e. makeChunks is called internally by future.apply [as it is now] if future.apply doesn't receive an object of type chunk).

  3. The optimization problem is the minimum makespan problem or multiway number partitioning problem. These vary in their computational complexity.

HenrikBengtsson commented 3 years ago
  1. Both the current makeChunks function and your proposal rely on nbrOfWorkers, so I am not sure what you mean when you write: ...

Yes, future.apply knows about it internally, but there is no need for developers/users to know about it. Part of the design of the future framework is that code that make use of it should be agile to whether it runs sequential or in parallel, and if in parallize, how and where. As a developer you should not make assumptions that future_*** runs in parallel. The objective of this "conservative" approach is that it allows developers to stay away from low-level details, it lowers the risk of code being written that limits on what backends the code can run. Note that the Future API and these higher-level map-reduce API are not like the low-level mclapply()/parLapply() APIs, that allows you to make specific assumptions about the parallel workers.

  1. Regardless of whatever balancing and chunking algorithm is built into future.apply, it seems beneficial and relatively trivial to implement the option for users to pass a list to a customChunks argument which is then then used in lieu of makeChunks or a weight-based alternative. The benefit of this is that users can chunk on whatever criteria they prefer (based on weights, numbers of elements, whatever) using whatever algorithm they prefer to find optimal chunking. In my opinion, this is the first priority.

Another reason why I'm very conservative when it comes to adding/opening up for custom features is that it risks limiting what the future framework can evolve into later on. I look at every feature request and ask myself several questions: "How will this work with when run sequentially, or with another parallel backend?", "Will this limit or prevent us from implementing what's on the roadmap?", "What about the things we still don't know should be on the roadmap, but that someone eventually will ask about?", ...

In this case, what happens if the developer defines chunks based on the assumption that the code is parallelizing on the local machine, but the user choose to run on some other backend, e.g. a mix of local and remote workers, or on a job scheduler? I don't know the full answer, which is why I'm very careful to not open up low-level control that will be hard to undo. You'll see me take the same approach regarding

BTW, one reason for why future.apply:::makeChunks() is not exported is that its API is not settled and in flux. The purpose of the planned future.mapreduce package is to settle on a generic API that map-reduce APIs such as future.apply, furrr, and doFuture can build upon. (However, I don't think that it'll ever be meant to be used by end-users; it will target map-reduce API developers).