Bioconductor / DelayedArray

A unified framework for working transparently with on-disk and in-memory array-like datasets
https://bioconductor.org/packages/DelayedArray
24 stars 9 forks source link

Help implementing 'parallel' writing to a RealizationSink with BiocParallel #20

Open PeteHaitch opened 6 years ago

PeteHaitch commented 6 years ago

I'm trying to implement writing to an arbitrary RealizationSink backend via BiocParallel::bplapply() with an arbitrary BiocParallelParam backend. That is, I want to be able to construct blocks of output, possibly in parallel, and then safely write each block to a HDF5Array, RleArray, etc. (controllable by the user) after each block is generated.

I've managed to get this set up for an HDF5RealizationSink (my main use case) by using the inter-process locks available in BiocParallel to ensure the writes are safe (@mtmorgan am I using these correctly?). But I've not had any luck using the same code for an arrayRealizationSink. Nor can I get it to work for the RleArray backend, although here the problem seems to be slightly different.

Below is a toy example that tries to construct a DelayedMatrix column-by-column using this strategy, in conjunction with different DelayedArray backends:

suppressPackageStartupMessages(library(DelayedArray))
suppressPackageStartupMessages(library(BiocParallel))

FUN_with_IPC_lock <- function(b, nrow, sink, grid, ipcid) {
    # Ensure DelayedArray package is loaded on worker (needed for backends
    # other than MulticoreParam).
    suppressPackageStartupMessages(library("DelayedArray"))
    if (is(sink, "HDF5RealizationSink")) {
        # Ensure HDF5Array package is loaded on worker, if required.
        # NOTE: Feels a little clunky that this is necessary; would be good if
        #       this was automatically loaded, properly.
        suppressPackageStartupMessages(library("HDF5Array"))
    }
    message("Processing block = ", b, " on PID = ", Sys.getpid())
    tmp <- matrix(seq_len(nrow) + (b - 1L) * nrow, ncol = 1)
    ipclock(ipcid)
    write_block_to_sink(tmp, sink, grid[[b]])
    ipcunlock(ipcid)
}

g <- function(FUN, BPPARAM, nrow = 100L, ncol = 10L) {
    # Construct RealizationSink
    backend <- getRealizationBackend()
    message("The backend is ", if (is.null(backend)) "NULL" else backend)
    sink <- DelayedArray:::RealizationSink(dim = c(nrow, ncol), type = "integer")
    # Consruct ArrayGrid over columns of RealizationSink
    grid <- RegularArrayGrid(dim(sink), c(nrow(sink), 1L))
    # Fill the RealizationSink
    n_block <- length(grid)
    # Construct an IPC mutex ID
    ipcid <- ipcid()
    # Apply function with BiocParallel
    bplapply(seq_len(n_block), FUN, BPPARAM = BPPARAM, nrow = nrow, sink = sink,
             grid = grid, ipcid = ipcid)
    # Return the RealizationSink as a DelayedArray
    as(sink, "DelayedArray")
}

# Everything works with the HDF5Array backend
setRealizationBackend("HDF5Array")
#> Loading required package: rhdf5
# Works
g(FUN_with_IPC_lock, SerialParam())
#> The backend is HDF5Array
#> Processing block = 1 on PID = 5740
#> Processing block = 2 on PID = 5740
#> Processing block = 3 on PID = 5740
#> Processing block = 4 on PID = 5740
#> Processing block = 5 on PID = 5740
#> Processing block = 6 on PID = 5740
#> Processing block = 7 on PID = 5740
#> Processing block = 8 on PID = 5740
#> Processing block = 9 on PID = 5740
#> Processing block = 10 on PID = 5740
#> <100 x 10> DelayedMatrix object of type "integer":
#>         [,1]  [,2]  [,3]  [,4] ...  [,7]  [,8]  [,9] [,10]
#>   [1,]     1   101   201   301   .   601   701   801   901
#>   [2,]     2   102   202   302   .   602   702   802   902
#>   [3,]     3   103   203   303   .   603   703   803   903
#>   [4,]     4   104   204   304   .   604   704   804   904
#>   [5,]     5   105   205   305   .   605   705   805   905
#>    ...     .     .     .     .   .     .     .     .     .
#>  [96,]    96   196   296   396   .   696   796   896   996
#>  [97,]    97   197   297   397   .   697   797   897   997
#>  [98,]    98   198   298   398   .   698   798   898   998
#>  [99,]    99   199   299   399   .   699   799   899   999
#> [100,]   100   200   300   400   .   700   800   900  1000
# Works: the IPC lock does its job!
g(FUN_with_IPC_lock, MulticoreParam(2L))
#> The backend is HDF5Array
#> <100 x 10> DelayedMatrix object of type "integer":
#>         [,1]  [,2]  [,3]  [,4] ...  [,7]  [,8]  [,9] [,10]
#>   [1,]     1   101   201   301   .   601   701   801   901
#>   [2,]     2   102   202   302   .   602   702   802   902
#>   [3,]     3   103   203   303   .   603   703   803   903
#>   [4,]     4   104   204   304   .   604   704   804   904
#>   [5,]     5   105   205   305   .   605   705   805   905
#>    ...     .     .     .     .   .     .     .     .     .
#>  [96,]    96   196   296   396   .   696   796   896   996
#>  [97,]    97   197   297   397   .   697   797   897   997
#>  [98,]    98   198   298   398   .   698   798   898   998
#>  [99,]    99   199   299   399   .   699   799   899   999
#> [100,]   100   200   300   400   .   700   800   900  1000
# Works: the IPC lock does its job!
g(FUN_with_IPC_lock, SnowParam(2L))
#> The backend is HDF5Array
#> Loading required package: HDF5Array
#> Loading required package: rhdf5
#> Processing block = 1 on PID = 5755
#> Processing block = 2 on PID = 5755
#> Processing block = 3 on PID = 5755
#> Processing block = 4 on PID = 5755
#> Processing block = 5 on PID = 5755
#> Loading required package: HDF5Array
#> Loading required package: rhdf5
#> Processing block = 6 on PID = 5761
#> Processing block = 7 on PID = 5761
#> Processing block = 8 on PID = 5761
#> Processing block = 9 on PID = 5761
#> Processing block = 10 on PID = 5761
#> <100 x 10> DelayedMatrix object of type "integer":
#>         [,1]  [,2]  [,3]  [,4] ...  [,7]  [,8]  [,9] [,10]
#>   [1,]     1   101   201   301   .   601   701   801   901
#>   [2,]     2   102   202   302   .   602   702   802   902
#>   [3,]     3   103   203   303   .   603   703   803   903
#>   [4,]     4   104   204   304   .   604   704   804   904
#>   [5,]     5   105   205   305   .   605   705   805   905
#>    ...     .     .     .     .   .     .     .     .     .
#>  [96,]    96   196   296   396   .   696   796   896   996
#>  [97,]    97   197   297   397   .   697   797   897   997
#>  [98,]    98   198   298   398   .   698   798   898   998
#>  [99,]    99   199   299   399   .   699   799   899   999
#> [100,]   100   200   300   400   .   700   800   900  1000

# Only SerialParam() works for the in-memory backend. The in-memory backend,
# an arrayRealizationSink, is implemented as an array inside an environment.
setRealizationBackend(NULL)
# Works
g(FUN_with_IPC_lock, SerialParam())
#> The backend is NULL
#> Processing block = 1 on PID = 5740
#> Processing block = 2 on PID = 5740
#> Processing block = 3 on PID = 5740
#> Processing block = 4 on PID = 5740
#> Processing block = 5 on PID = 5740
#> Processing block = 6 on PID = 5740
#> Processing block = 7 on PID = 5740
#> Processing block = 8 on PID = 5740
#> Processing block = 9 on PID = 5740
#> Processing block = 10 on PID = 5740
#> <100 x 10> DelayedMatrix object of type "integer":
#>         [,1]  [,2]  [,3]  [,4] ...  [,7]  [,8]  [,9] [,10]
#>   [1,]     1   101   201   301   .   601   701   801   901
#>   [2,]     2   102   202   302   .   602   702   802   902
#>   [3,]     3   103   203   303   .   603   703   803   903
#>   [4,]     4   104   204   304   .   604   704   804   904
#>   [5,]     5   105   205   305   .   605   705   805   905
#>    ...     .     .     .     .   .     .     .     .     .
#>  [96,]    96   196   296   396   .   696   796   896   996
#>  [97,]    97   197   297   397   .   697   797   897   997
#>  [98,]    98   198   298   398   .   698   798   898   998
#>  [99,]    99   199   299   399   .   699   799   899   999
#> [100,]   100   200   300   400   .   700   800   900  1000
# Doesn't work: sink isn't filled. The IPC mutex isn't doing its job?
g(FUN_with_IPC_lock, MulticoreParam(2L))
#> The backend is NULL
#> <100 x 10> DelayedMatrix object of type "integer":
#>         [,1]  [,2]  [,3]  [,4] ...  [,7]  [,8]  [,9] [,10]
#>   [1,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>   [2,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>   [3,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>   [4,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>   [5,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>    ...     .     .     .     .   .     .     .     .     .
#>  [96,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>  [97,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>  [98,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>  [99,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#> [100,]    NA    NA    NA    NA   .    NA    NA    NA    NA
# Doesn't work: sink isn't filled. The IPC mutex isn't doing its job?
g(FUN_with_IPC_lock, SnowParam(2L))
#> The backend is NULL
#> Processing block = 6 on PID = 5777
#> Processing block = 7 on PID = 5777
#> Processing block = 8 on PID = 5777
#> Processing block = 9 on PID = 5777
#> Processing block = 10 on PID = 5777
#> Processing block = 1 on PID = 5771
#> Processing block = 2 on PID = 5771
#> Processing block = 3 on PID = 5771
#> Processing block = 4 on PID = 5771
#> Processing block = 5 on PID = 5771
#> <100 x 10> DelayedMatrix object of type "integer":
#>         [,1]  [,2]  [,3]  [,4] ...  [,7]  [,8]  [,9] [,10]
#>   [1,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>   [2,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>   [3,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>   [4,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>   [5,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>    ...     .     .     .     .   .     .     .     .     .
#>  [96,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>  [97,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>  [98,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>  [99,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#> [100,]    NA    NA    NA    NA   .    NA    NA    NA    NA

# Only SerialParam() works for the RleArray backend.
setRealizationBackend("RleArray")
# Works
g(FUN_with_IPC_lock, SerialParam())
#> The backend is RleArray
#> Processing block = 1 on PID = 5740
#> Processing block = 2 on PID = 5740
#> Processing block = 3 on PID = 5740
#> Processing block = 4 on PID = 5740
#> Processing block = 5 on PID = 5740
#> Processing block = 6 on PID = 5740
#> Processing block = 7 on PID = 5740
#> Processing block = 8 on PID = 5740
#> Processing block = 9 on PID = 5740
#> Processing block = 10 on PID = 5740
#> <100 x 10> RleMatrix object of type "integer":
#>         [,1]  [,2]  [,3]  [,4] ...  [,7]  [,8]  [,9] [,10]
#>   [1,]     1   101   201   301   .   601   701   801   901
#>   [2,]     2   102   202   302   .   602   702   802   902
#>   [3,]     3   103   203   303   .   603   703   803   903
#>   [4,]     4   104   204   304   .   604   704   804   904
#>   [5,]     5   105   205   305   .   605   705   805   905
#>    ...     .     .     .     .   .     .     .     .     .
#>  [96,]    96   196   296   396   .   696   796   896   996
#>  [97,]    97   197   297   397   .   697   797   897   997
#>  [98,]    98   198   298   398   .   698   798   898   998
#>  [99,]    99   199   299   399   .   699   799   899   999
#> [100,]   100   200   300   400   .   700   800   900  1000
# Doesn't work: Don't understand why.
g(FUN_with_IPC_lock, MulticoreParam(2L))
#> The backend is RleArray
#> Error in validObject(.Object): invalid class "ChunkedRleArraySeed" object: 
#>     length of object data [0] does not match object dimensions
#>     [product 1000]
# Doesn't work: don't understand why I get this error.
g(FUN_with_IPC_lock, SnowParam(2L))
#> The backend is RleArray
#> Processing block = 6 on PID = 5793
#> Processing block = 7 on PID = 5793
#> Processing block = 8 on PID = 5793
#> Processing block = 9 on PID = 5793
#> Processing block = 10 on PID = 5793
#> Processing block = 1 on PID = 5787
#> Processing block = 2 on PID = 5787
#> Processing block = 3 on PID = 5787
#> Processing block = 4 on PID = 5787
#> Processing block = 5 on PID = 5787
#> Error in validObject(.Object): invalid class "ChunkedRleArraySeed" object: 
#>     length of object data [0] does not match object dimensions
#>     [product 1000]

Created on 2018-05-21 by the reprex package (v0.2.0).

Session info ``` r devtools::session_info() #> Session info ------------------------------------------------------------- #> setting value #> version R version 3.5.0 (2018-04-23) #> system x86_64, darwin15.6.0 #> ui X11 #> language (EN) #> collate en_AU.UTF-8 #> tz America/New_York #> date 2018-05-21 #> Packages ----------------------------------------------------------------- #> package * version date source #> backports 1.1.2 2017-12-13 CRAN (R 3.5.0) #> base * 3.5.0 2018-04-24 local #> BiocGenerics * 0.27.0 2018-05-01 Bioconductor #> BiocParallel * 1.15.3 2018-05-11 Bioconductor #> compiler 3.5.0 2018-04-24 local #> datasets * 3.5.0 2018-04-24 local #> DelayedArray * 0.7.0 2018-05-01 Bioconductor #> devtools 1.13.5 2018-02-18 CRAN (R 3.5.0) #> digest 0.6.15 2018-01-28 CRAN (R 3.5.0) #> evaluate 0.10.1 2017-06-24 CRAN (R 3.5.0) #> graphics * 3.5.0 2018-04-24 local #> grDevices * 3.5.0 2018-04-24 local #> HDF5Array * 1.9.0 2018-05-01 Bioconductor #> htmltools 0.3.6 2017-04-28 CRAN (R 3.5.0) #> IRanges * 2.15.13 2018-05-20 Bioconductor #> knitr 1.20 2018-02-20 CRAN (R 3.5.0) #> magrittr 1.5 2014-11-22 CRAN (R 3.5.0) #> matrixStats * 0.53.1 2018-02-11 CRAN (R 3.5.0) #> memoise 1.1.0 2017-04-21 CRAN (R 3.5.0) #> methods * 3.5.0 2018-04-24 local #> parallel * 3.5.0 2018-04-24 local #> Rcpp 0.12.17 2018-05-18 CRAN (R 3.5.0) #> rhdf5 * 2.25.0 2018-05-01 Bioconductor #> Rhdf5lib 1.3.1 2018-05-17 Bioconductor #> rmarkdown 1.9 2018-03-01 CRAN (R 3.5.0) #> rprojroot 1.3-2 2018-01-03 CRAN (R 3.5.0) #> S4Vectors * 0.19.5 2018-05-20 Bioconductor #> snow 0.4-2 2016-10-14 CRAN (R 3.5.0) #> stats * 3.5.0 2018-04-24 local #> stats4 * 3.5.0 2018-04-24 local #> stringi 1.2.2 2018-05-02 CRAN (R 3.5.0) #> stringr 1.3.1 2018-05-10 CRAN (R 3.5.0) #> tools 3.5.0 2018-04-24 local #> utils * 3.5.0 2018-04-24 local #> withr 2.1.2 2018-03-15 CRAN (R 3.5.0) #> yaml 2.1.19 2018-05-01 CRAN (R 3.5.0) ```
mtmorgan commented 6 years ago

The use of ipcmutex() is correct.

The challenge with the in-memory representations is that the memory is being modified in the memory space of each thread / process; that memory is NOT shared by the manager process. I don't think there's an easy way around that. A 'feature' for BiocParallel might be shared memory management, but that would be a big feature, and would probably require a custom 'realization back end' to work with it.

Also, the ipc stuff is inter-process (i.e., on the same computer); some back-ends (notably BatchtoolsParam()) can work on clusters or clouds where the ipc lock doesn't help.

PeteHaitch commented 6 years ago

Thanks, Martin, that's very helpful for my current use case in bsseq.

I was naïvely hoping that bplapply() with MulticoreParam() combined with a mutex might have been safe for in-memory representations. But now I realise that the copy-on-write mechanism in the fork will mean that a copy will be made even if a mutex is used to ensure that writes never occur in parallel. Bummer.

I think what I'll do in the short term is limit the 'realization back end' to 'on-disk' backends and the 'parallelisation back end' to 'same computer' backends. If the user requests an in-memory representation of the data then I'll assume they've got enough RAM to pass around lists of vectors that I'll eventually combine into a matrix in the main process with cbind()/rbind() operations.

hpages commented 3 years ago

Hi Pete,

Where do we stand on this? Is there something that we should implement in DelayedArray to facilitate parallel writing to a RealizationSink?

My feeling about this is that supporting parallel writing to an arbitrary RealizationSink is a hard problem. For on-disk backends that don't natively support concurrent writing, a simple workaround is to split the array to be written into column- or row-oriented blocks, then write the blocks in parallel by using one RealizationSink per block, and finally combine the on-disk blocks with a delayed cbind() or rbind(). This is a very generic strategy that works well whatever the on-disk backend. The drawback is that the final object is not monolithic i.e. it's made of several on-disk datasets combined together. If a monolithic object is desired, an extra call to realize() is needed. Since this last call to realize() cannot do parallel writing, it can be expensive.

We could introduce a new class (e.g. SplitRealizationSink or ParallelRealizationSink) to abstract the above strategy.

H.

PeteHaitch commented 3 years ago

Hi Hervé,

It's been a while since I thought about this. The model in my head involves workers that read input files (e.g., BAM files, CSV files, etc.) and process them (e.g., filtering, selection, aggregation, etc.) and then either:

  1. Write directly to the RealizationSink. This would need to be done serially (like my above attempt using mutexs) unless the backend supported concurrent writing. I envisioned this would be a single, shared RealizationSink but from the discussion here it sounds difficult/impossible for an in-memory RealizationSink and from https://github.com/Bioconductor/BiocParallel/issues/126 it sounds like there are challenges even for an on-disk RealizationSink.
  2. Pass the processed data back to a 'manager' that writes to the single RealizationSink. This incurs the cost of data transfer between worker and manager (and potential memory blowups if multiple workers return data to the manager concurrently) but ensures the writing to the RealizationSink is serial.

I think it's fair to say (1) would be preferable.

The 'one RealizationSink per block' approach is obviously viable, but I hoped/envisioned that it would be a single, shared RealizationSink mostly because I'd like to avoid having to do the extra (and in my case, expensive) realize() on the data before it's 'saveable' (e.g., with saveHDF5SummarizedExperiment()).

With some data types (like the whole-genome bisulfite-sequencing data handled by bsseq), this initial data ingestion/processing can be quite expensive (as is the writing to disk of these large datasets) and so you really only want to do this once and then be able to load the saved object for downstream analyses.