Bioconductor / DelayedArray

A unified framework for working transparently with on-disk and in-memory array-like datasets
https://bioconductor.org/packages/DelayedArray
24 stars 9 forks source link

Adding an mapply-like blockApply() function to the API #11

Open PeteHaitch opened 6 years ago

PeteHaitch commented 6 years ago

Hi Hervé,

In a recent email you wrote:

At some point the old block-processing functions (i.e. block_APPLY(), block_MAPPLY(), and block_APPLY_and_COMBINE()) will go away.

I'd like to request that a mapply-like blockApply() function is part of the exported API. In minfi, we need to iterate over multiple (conformable) array-like objects in parallel for several of the preprocessing routines (e.g., converting the red & green channels from the array to methylated/unmethylated calls).

The return value is a large matrix-like object, so it'd be great if this mapply-like blockApply() also supported the sink or BACKEND argument (as in https://github.com/Bioconductor/DelayedArray/issues/10)

PeteHaitch commented 6 years ago

FYI, this is what I'm currently using, but I'd prefer to retire this for an 'official' solution.

# NOTE: A mapply()-like function for conformable arrays.
# NOTE: Different from DelayedArray:::block_Mapply(); designed to have an API more like
#       DelayedArray::blockArray()
blockMapply <- function(FUN, ..., MoreArgs = NULL, grids = NULL,
                        BPREDO = list(), BPPARAM = bpparam()) {
    FUN <- match.fun(FUN)
    dots <- unname(list(...))
    # Check conformable grids
    if (is.null(grids)) {
        grids <- replicate(length(dots), NULL)
    }
    grids <- mapply(
        FUN = DelayedArray:::.normarg_grid,
        grids,
        dots,
        SIMPLIFY = FALSE,
        USE.NAMES = FALSE)
    grids_dims <- lapply(grids, dim)
    all_same_grids_dims <- all(
        vapply(X = grids_dims,
               FUN = function(dim) all(dim == grids_dims[[1L]]),
               FUN.VALUE = logical(1L)))
    if (!all_same_grids_dims) {
        stop("non-conformable grids")
    }
    stopifnot(length(dots) == length(grids))
    nblock <- length(grids[[1]])
    bplapply(seq_len(nblock), function(b) {
        if (DelayedArray:::get_verbose_block_processing()) {
            message("Processing block ", b, "/", nblock, " ... ",
                    appendLF = FALSE)
        }
        viewports <- lapply(grids, function(grid) grid[[b]])
        blocks <- mapply(
            FUN = function(x, grid, viewport) {
                block <- DelayedArray:::extract_block(x, viewport)
                if (!is.array(block)) {
                    block <- DelayedArray:::.as_array_or_matrix(block)
                }
                attr(block, "from_grid") <- grid
                attr(block, "block_id") <- b
                block
            },
            x = dots,
            grid = grids,
            viewport = viewports,
            SIMPLIFY = FALSE,
            USE.NAMES = FALSE)
        block_ans <- do.call(FUN, c(blocks, MoreArgs))
        if (DelayedArray:::get_verbose_block_processing()) {
            message("OK")
        }
        block_ans
    },
    BPREDO = BPREDO,
    BPPARAM = BPPARAM)
}
PeteHaitch commented 6 years ago

In minfi, I needed a function that allowed me to iterate over multiple DelayedMatrix objects ("inputs") and write to multiple RealizationSink instances ("outputs"). What's more, although all the "inputs" have the same dimensions, and all the "outputs" have the same dimensions, the "inputs" and "outputs" have different dimensions (specifically, the "outputs" have fewer rows than the "inputs" but they all have the same number of rows).

Here is what I cam up with:

# NOTE: blockMapply() with the option to write the blocks to multiple 'sinks'.
#       Useful, for example, to apply a function across column-blocks of
#       multiple DelayedMatrix objects, write these results to disk, and then
#       wrap these in a DelayedMatrix.
# NOTE: `dots_grids`, `sinks_grids`, and `sinks` should all be lists
blockMapplyWithRealization <- function(FUN, ..., MoreArgs = NULL, sinks = NULL,
                                       dots_grids = NULL, sinks_grids = NULL,
                                       BPREDO = list(), BPPARAM = bpparam()) {
    FUN <- match.fun(FUN)
    dots <- unname(list(...))
    # Check valid `sinks`
    stopifnot(is.null(sinks) || is.list(sinks))
    # Check conformable dots_grids and sinks_grids
    if (is.null(dots_grids)) {
        dots_grids <- replicate(length(dots), NULL)
    } else {
        stopifnot(is.list(dots_grids))
    }
    dots_grids <- mapply(
        FUN = DelayedArray:::.normarg_grid,
        dots_grids,
        dots,
        SIMPLIFY = FALSE,
        USE.NAMES = FALSE)
    if (is.null(sinks_grids)) {
        sinks_grids <- replicate(length(sinks), NULL)
    } else {
        stopifnot(is.list(sinks_grids))
    }
    sinks_grids <- mapply(
        FUN = DelayedArray:::.normarg_grid,
        sinks_grids,
        sinks,
        SIMPLIFY = FALSE,
        USE.NAMES = FALSE)
    grids_dims <- lapply(c(dots_grids, sinks_grids), dim)
    all_same_grids_dims <- all(
        vapply(X = grids_dims,
               FUN = function(dim) all(dim == grids_dims[[1L]]),
               FUN.VALUE = logical(1L)))
    if (!all_same_grids_dims) {
        stop("non-conformable 'dots_grids' and 'sinks_grids'")
    }
    stopifnot(length(dots) == length(dots_grids),
              length(sinks) == length(sinks_grids))

    # Loop over blocks of `dots` and write to `sinks`
    nblock <- length(dots_grids[[1]])
    bplapply(seq_len(nblock), function(b) {
        if (DelayedArray:::get_verbose_block_processing()) {
            message("Processing block ", b, "/", nblock, " ... ",
                    appendLF = FALSE)
        }
        input_viewports <- lapply(dots_grids, function(grid) grid[[b]])
        output_viewports <- lapply(sinks_grids, function(grid) grid[[b]])
        blocks <- mapply(
            FUN = function(x, grid, viewport) {
                block <- DelayedArray:::extract_block(x, viewport)
                if (!is.array(block)) {
                    block <- DelayedArray:::.as_array_or_matrix(block)
                }
                attr(block, "from_grid") <- grid
                attr(block, "block_id") <- b
                block
            },
            x = dots,
            grid = dots_grids,
            viewport = input_viewports,
            SIMPLIFY = FALSE,
            USE.NAMES = FALSE)
        block_ans <- do.call(FUN, c(blocks, MoreArgs))
        if (!is.list(block_ans)) {
            block_ans <- list(block_ans)
        }
        # NOTE: This is the only part different from blockMapply()
        if (!is.null(sinks)) {
            mapply(function(ans, sink, viewport) {
                write_block_to_sink(ans, sink, viewport)
            }, ans = block_ans, sink = sinks, viewport = output_viewports,
            SIMPLIFY = FALSE, USE.NAMES = FALSE)
            block_ans <- NULL
        }
        if (DelayedArray:::get_verbose_block_processing()) {
            message("OK")
        }
        block_ans
    },
    BPREDO = BPREDO,
    BPPARAM = BPPARAM)
}