rstudio / promises

A promise library for R
https://rstudio.github.io/promises
Other
201 stars 19 forks source link

Add `wrapOnFinally` to promise domains #43

Closed jcheng5 closed 5 years ago

jcheng5 commented 5 years ago

This feature adds the ability for promise domains to handle finally differently than resolved/rejected.

When using private event loops to implement synchronous functions on top of promises, we need to take special care to make sure finally handlers get called even in the face of R interrupt. We can implement this with a promise domain, but only if promise domains can distinguish between finally and regular resolve/reject semantics. This causes the notion of finally to be pushed a little deeper into the promise abstractions, as previously it was literally just syntactic sugar over the regular then. Now, then has an explicit onFinally; though once promise domains encounter the finally handler, we then immediately split the finally into resolve/reject so it doesn't complicate the actual implementation of doResolve/doReject and friends.

jcheng5 commented 5 years ago

@wch Here's what I have for the synchronization stuff, I'm just dumping it here because I'm not sure if it belongs in promises or chromote (I'm not as confident in the design of this part as I am in wrapOnFinally).

create_interrupt_domain <- function() {
  domain <- new_promise_domain(
    wrapOnFulfilled = function(onFulfilled) {
      function(...) {
        if (domain$interrupted) {
          stop("Operation was interrupted")
        }
        tryCatch({
          onFulfilled(...)
        }, interrupt = function(e) {
          domain$interrupted <- TRUE
          stop(e)
        })
      }
    },
    wrapOnRejected = function(onRejected) {
      function(...) {
        if (domain$interrupted) {
          stop("Operation was interrupted")
        }
        tryCatch({
          onRejected(...)
        }, interrupt = function(e) {
          domain$interrupted <- TRUE
          signalCondition(e)
        })
      }
    },
    wrapOnFinally = function(onFinally) {
      function(...) {
        tryCatch({
          onFinally(...)
          if (domain$interrupted) {
            signalCondition(structure(list(), class = c("interrupt", "condition")))
          }
        }, interrupt = function(e) {
          domain$interrupted <<- TRUE
          signalCondition(e)
        })
      }
    },
    wrapSync = function(expr) {
      if (is.null(globals$synchronized)) {
        globals$synchronized <- 0L
      }
      globals$synchronized <- globals$synchronized + 1L
      on.exit(globals$synchronized <- globals$synchronized - 1L)

      force(expr)
    },
    interrupted = FALSE
  )

  domain
}

synchronize <- function(expr) {
  domain <- create_interrupt_domain()
  with_promise_domain(domain, {
    tryCatch({
      result <- force(expr)
      if (is.promising(result)) {
        value <- NULL
        type <- NULL
        result %...>% {
          value <<- .
          type <<- "success"
        } %...!% (function(reason) {
          value <<- reason
          type <<- "error"
        })
        while (is.null(type)) {
          later::run_now()
        }
        if (type == "success") {
          value
        } else {
          stop(value)
        }
      }
    }, interrupt = function(e) {
      domain$interrupted <<- TRUE
      message("Attempting to interrupt gracefully; press Esc/Ctrl+C to force interrupt")
      while (!later::loop_empty()) {
        later::run_now()
      }
      signalCondition(e)
    })
  })
}

# Example
synchronize({
  promise(~{
    message("Interrupt now, if you want...")
    later::later(~resolve(TRUE), 3)
  }) %...>% {
    message("Or interrupt now...")
    Sys.sleep(3)
  } %...>% {
    message("Got success")
  } %...!% {
    message("Got error")
  } %>% finally(~{
    message("Got finally")
  })
})
wch commented 5 years ago

Some comments on a slightly modified synchronize example:

# Example
synchronize({
  promise(~{
    message("Interrupt now, if you want...")
    later::later(~resolve(TRUE), 2)
  }) %...>% {
    message("Or interrupt now... ", appendLF = FALSE)
    Sys.sleep(2)
    message("Done.")
  } %...!% {
    message("Got error... ", appendLF = FALSE)
    Sys.sleep(2)
    message("Done.")
  } %>% finally(~{
    message("Got finally")
  })
})
jcheng5 commented 5 years ago

There's a TODO in this branch that I still need to look into:

# TODO: All wrapped functions should also be rewritten to reenter the domain

This is what the old code did, and the new code doesn't currently do this. However, I couldn't actually come up with any examples that would cause handlers to NOT reenter the domain, even though it seemed like they should, so maybe this is happening some other way already.

jcheng5 commented 5 years ago

If the interrupt happens in the "Or interrupt now... " stage, the finally doesn't run.

This is fixed if stop(e) and signalCondition(e) (which throw/signal the interrupt) are replaced with a more traditional stop("Operation was interrupted").

jcheng5 commented 5 years ago

This is where we ended our discussion

globals <- promises:::globals

generateInterrupt <- function() {
  # TODO: Do something that actually works
  tools::pskill(Sys.getpid(), tools::SIGINT)
  Sys.sleep(1)
}

create_interrupt_domain <- function() {
  domain <- new_promise_domain(
    wrapOnFulfilled = function(onFulfilled) {
      function(...) {
        if (domain$interrupted) {
          message("Got here 1")
          stop("Operation was interrupted")
        }
        tryCatch({
          onFulfilled(...)
        }, interrupt = function(e) {
          domain$interrupted <- TRUE
          stop("Operation was interrupted")
        })
      }
    },
    wrapOnRejected = function(onRejected) {
      function(...) {
        if (domain$interrupted) {
          message("Got here 2")
          stop("Operation was interrupted")
        }
        tryCatch({
          onRejected(...)
        }, interrupt = function(e) {
          domain$interrupted <- TRUE
          stop("Operation was interrupted")
        })
      }
    },
    wrapOnFinally = function(onFinally) {
      function(...) {
        tryCatch({
          onFinally(...)
        }, interrupt = function(e) {
          domain$interrupted <<- TRUE
          stop("Operation was interrupted")
        })
      }
    },
    wrapSync = function(expr) {
      if (is.null(globals$synchronized)) {
        globals$synchronized <- 0L
      }
      globals$synchronized <- globals$synchronized + 1L
      on.exit(globals$synchronized <- globals$synchronized - 1L)

      force(expr)
    },
    interrupted = FALSE
  )

  domain
}

synchronize <- function(expr) {
  domain <- create_interrupt_domain()
  with_promise_domain(domain, {
    tryCatch({
      result <- force(expr)
      if (is.promising(result)) {
        value <- NULL
        type <- NULL
        result %...>% {
          value <<- .
          type <<- "success"
        } %...!% (function(reason) {
          value <<- reason
          type <<- "error"
        })
        while (is.null(type) && !domain$interrupted) {
          later::run_now()
        }
        if (is.null(type)) {
          # domain$interrupted
          generateInterrupt()
        } else if (type == "success") {
          value
        } else if (type == "error") {
          stop(value)
        }
      }
    }, interrupt = function(e) {
      domain$interrupted <<- TRUE
      message("Attempting to interrupt gracefully; press Esc/Ctrl+C to force interrupt")
      while (!later::loop_empty()) {
        later::run_now()
      }
      # TODO: This needs to change to something that actually works (SIGINT?)
      generateInterrupt()
    })
  })
}