apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.68k stars 3.56k forks source link

[R Package] map_batches in parallel #12919

Open KnutJaegersberg opened 2 years ago

KnutJaegersberg commented 2 years ago

Regular user of disk.frame. I love that packages map function, which allows to apply custom R functions on a larger than ram dataset in paralllel over the batches by using the future package. Am I right map_batches right now supports single core processing as it is implemented as a while loop over the batches? Do you plan to bring this very handy capability to R arrow, too? Sure it must be slower than arrows c++ based functionality, but it is still super useful.

wjones127 commented 2 years ago

Am I right map_batches right now supports single core processing as it is implemented as a while loop over the batches?

Yes, right now it only uses a single core. Other operations implemented in C++ (like the dplyr ones) are multi-threaded, but I don't think we have a way to arbitrary R code in multiple threads.

Do you plan to bring this very handy capability to R arrow, too?

Maybe. From Futures package:

This package implements sequential, multicore, multisession, and cluster futures. With these, R expressions can be evaluated on the local machine, in parallel a set of local machines, or distributed on a mix of local and remote machines.

I'm not sure if we would target non-local execution, but we might target multisession if we had a good way of sharing the Arrow RecordBatches with subprocesses easily.

I might research this a bit. @paleolimbot you have any thoughts on this?

paleolimbot commented 2 years ago

There's a good discussion on the mailing list about this in the context of Python user-defined-functions, which were just added ( https://lists.apache.org/thread/hwcrnf77j8p4dvyzoc3v5cwgws83nvqp ). In the next few months we'll have R user-defined functions too, which will serve a related purpose (do things with the Arrow compute engine that aren't implemented in C++). The gist of the mailing list discussion is that a lot of the parallelism is already handled by Arrow and when R/Python code dispatches more workers it might not result in the performance gain that one might expect.

For map_batches() it's a little easier to think of a use-case where dispatching incoming RecordBatches to workers running on other cores would help (my mind jumps to performing a spatial join, which won't be in the query engine anytie soon).

I think we have almost all the infrastructure we need to do this...we can read/write from R connections (like socketConnection()), which gives us some options for interprocess communication if they're not already present/too slow in the future package abstraction.

I had some fun trying to wire all of that up and ran into some problems, which expose some of the limits of our current infrastucture. Perhaps a good scope of what we should support in Arrow is the tools to make this work (while the actual implementation could/should live in an extension package?)

``` r make_arrow_worker <- function(schema, .f, ..., port = 1234) { fun <- function(..., port) { dots <- list(...) input <- arrow:::MakeRConnectionInputStream(socketConnection(port = port, server = TRUE, blocking = TRUE)) output_con <- pipe("cat", "wb") output <- arrow:::MakeRConnectionOutputStream(output_con) reader <- arrow::RecordBatchStreamReader$create(input) writer <- arrow::RecordBatchStreamWriter$create(output, schema) while (!is.null(batch <- reader$read_next_batch())) { args <- c(list(batch), dots) result <- do.call(.f, args) writer$write(as_record_batch(result)) flush(con) } } callr::r_bg(fun, list(..., port = port)) } worker <- make_arrow_worker( arrow::schema(x = arrow::int32()), function(batch, offset = 0) { arrow::record_batch(x = arrow::int32()) batch$x <- batch$x + offset batch }, offset = 123, port = 4837 ) input_con <- socketConnection(Sys.info()["nodename"], port = 4837) #> Warning in socketConnection(Sys.info()["nodename"], port = 4837): Deweys- #> MacBook-Air-2.local:4837 cannot be opened #> Error in socketConnection(Sys.info()["nodename"], port = 4837): cannot open the connection input_stream <- arrow:::MakeRConnectionOutputStream(input_con) #> Error in arrow:::MakeRConnectionOutputStream(input_con): object 'input_con' not found writer <- arrow::RecordBatchStreamWriter$create(input_stream, arrow::schema(x = arrow::int32())) #> Error in is.string(sink): object 'input_stream' not found writer$write_batch(arrow::record_batch(x = 2L)) #> Error in eval(expr, envir, enclos): object 'writer' not found worker$read_output() #> [1] "" worker$read_error() #> [1] "Error in socketConnection(port = port, server = TRUE, blocking = TRUE) : \n cannot open the connection\nIn addition: Warning message:\nIn socketConnection(port = port, server = TRUE, blocking = TRUE) :\n port 4837 cannot be opened\n" ``` Created on 2022-05-20 by the [reprex package](https://reprex.tidyverse.org) (v2.0.1)