nt-williams / lmtp

:package: Non-parametric Causal Effects Based on Modified Treatment Policies :crystal_ball:
http://www.beyondtheate.com
GNU Affero General Public License v3.0
57 stars 17 forks source link

Package vignette fails when using plan(multisession) [with patch] #100

Closed HenrikBengtsson closed 3 years ago

HenrikBengtsson commented 3 years ago

Issue

The package vignette only works when using 'sequential' or 'multicore' parallel processing. It fails when using 'multisession' processing, e.g.

$ R --vanilla
R version 4.1.1 Patched (2021-08-10 r80727) -- "Kick Things"
Copyright (C) 2021 The R Foundation for Statistical Computing
Platform: x86_64-pc-linux-gnu (64-bit)
...
> future::plan("multisession", workers = 2)
> tools::buildVignette("vignettes/getting-started.Rmd", clean = FALSE)
Quitting from lines 160-182 (getting-started.Rmd) 
Error in unserialize(node$con) : 
  Failed to retrieve the value of MultisessionFuture (<none>) from cluster RichSOCKnode #1 (PID 1047 on 'localhost'). The reason reported was 'error reading from connection'
In addition: There were 48 warnings (use warnings() to see them)

I discovered this through reverse dependency checks when working on a major update of the future package.

Troubleshooting

This error occurs in chunk https://github.com/nt-williams/lmtp/blob/e44fb1f50ce200f9310bf19c95296b8eefe1c2af/vignettes/getting-started.Rmd#L159-L182. The reason for it is a bit complex, but basically, there are two different things called shift; one is the argument shift and one is the global function shift. Normally, this type of code should work, but this a actually a tricky situation for the future framework to figure out, which is used in https://github.com/nt-williams/lmtp/blob/e44fb1f50ce200f9310bf19c95296b8eefe1c2af/R/cross-fit.R#L24-L39.

In order to workaround this, one needs to rename to global function to something different than the argument, e.g. my_shift as in:

my_shift <- function(data, trt) {
  (data[[trt]] - 1) * (data[[trt]] - 1 >= 1) + data[[trt]] * (data[[trt]] - 1 < 1)
}
# creating a dynamic mtp that applies the shift function 
# but also depends on history and the current time
dynamic_mtp <- function(data, trt) {
  if (trt == "A_1") {
    # if its the first time point, follow the same mtp as before
    my_shift(data, trt)
  } else {
    # otherwise check if the time varying covariate equals 1
    ifelse(data[[sub("A", "L", trt)]] == 1, 
           my_shift(data, trt), # if yes continue with the policy
           data[[trt]])      # otherwise do nothing
  }
}
HenrikBengtsson commented 3 years ago

Another alternative is to make shift() a local function of dynamic_mtp();

# creating a dynamic mtp that applies the shift function 
# but also depends on history and the current time
dynamic_mtp <- function(data, trt) {
  shift <- function(data, trt) {
    (data[[trt]] - 1) * (data[[trt]] - 1 >= 1) + data[[trt]] * (data[[trt]] - 1 < 1)
  }

  if (trt == "A_1") {
    # if its the first time point, follow the same mtp as before
    shift(data, trt)
  } else {
    # otherwise check if the time varying covariate equals 1
    ifelse(data[[sub("A", "L", trt)]] == 1, 
           shift(data, trt), # if yes continue with the policy
           data[[trt]])      # otherwise do nothing
  }
}
nt-williams commented 3 years ago

Resolved in PR #102