rstudio / plumber

Turn your R code into a web API.
https://www.rplumber.io
Other
1.39k stars 256 forks source link

Allow for async filters #907

Open king-of-poppk opened 1 year ago

king-of-poppk commented 1 year ago

With #204 implemented we now have a pretty convenient way to parallelize IO resource consumption. However, it seems plumber filters still do not have this feature. Would it be possible to make it so that filters can return promises for the plumber::forward() call?

meztez commented 10 months ago

@king-of-poppk how would it be used?

Why would it not work currently? Aren't filters are just regular steps according to runSteps source?

king-of-poppk commented 10 months ago

@meztez I may be wrong. I will double-check.

meztez commented 10 months ago

@king-of-poppk I did some further testing and I do not think plumber::forward could be used within a future::future. What would be the use case?

schloerke commented 10 months ago

Correct.

You can have a future return and then call forward() within the follow up promise.


So if the filter could return a promise and within that promise (which executes in the main R worker) it calls forward(), I think it'll work.

meztez commented 10 months ago
library(plumber)
library(future)
plan("multisession")

#* @filter future
function(req) {
  print(req$PATH_INFO)
  if (req$PATH_INFO=="/echo1") {
    future::future({
      Sys.sleep(10)
      plumber::forward()
    })
  } else {
    plumber::forward()
  }
}

#* @serializer text
#* @get /echo1
function(msg) {
  msg
}

#* @serializer text
#* @get /echo2
function(msg) {
  msg
}

I'm working from this, only getting a json serialized true. I will try some things from what I understand from your comment.

schloerke commented 10 months ago

I'm on mobile, so formatting will not be good / tested.

Updating your future code to something like this...

  future::future({
    if (req$PATH_INFO=="/echo1") {
      Sys.sleep(10)
    }
  }) %...>% {
    value <- .

    plumber::forward()
    value
  }

(I hope I'm not making stuff up here 😅)


I believe you could even call forward before the promise is returned.

  plumber::forward()
  future::future({
    if (req$PATH_INFO=="/echo1") {
      Sys.sleep(10)
    }
    # return data here
  })
schloerke commented 10 months ago

Important part is that forward() is called within the main R worker.

meztez commented 10 months ago

@schloerke Oh, I get it. I thought it needed to be called last but it just sets a flag. Ok, this will work now.

schloerke commented 10 months ago

I wish forward() was required to be returned from filter().

Migrating towards that goal is really hard without spooky deprecation message or a separate solution entirely (ex: endpoint based filters).

meztez commented 10 months ago

Yep it works @king-of-poppk , let me know if this is something you can use.

library(plumber)
library(future)
plan("multisession")

#* @filter future
function(req) {
  plumber::forward()
  future::future({
    if (req$PATH_INFO=="/echo1") {
      Sys.sleep(10)
    }
  })
}

#* @serializer text
#* @get /echo1
function(msg) { msg }

#* @serializer text
#* @get /echo2
function(msg) { msg }
meztez commented 10 months ago

Important part is that forward() is called within the main R worker.

Yeah, because it sets the flags in the .global env?

schloerke commented 10 months ago

Important part is that forward() is called within the main R worker.

Yeah, because it sets the flags in the .global env?

Yup!

king-of-poppk commented 10 months ago

Argh this might be one of those XY problem again. I might have tried only with async/await from the coro package, which implies having the forward call inside the future, which won't work.

The goal here is to make the entire filter processing potentially run in parallel, and conditionally forward based on the results of the processing. Calling plumber::forward() ahead of the processing is not an option, so I guess this is what it should look like:

function myAuthorizationFilter(req, res) {
  myAuthorizationFilterLogic(req) %>% then(
    onFulfilled = function(value) {
      plumber::forward()
    },
    onRejected = function(error) {
      res$status <- 403 # NOTE Or 401 depending on error class.
    }
  ) %>% finally(
    function() { ... }
  )
schloerke commented 10 months ago

I might have tried only with async/await from the coro package, which implies having the forward call inside the future, which won't work.

@king-of-poppk Can you show me a small copy of this code? The async/await logic is built on a state machine which execute in the main R process. I'd be curious how you're using {future} within {coro}.

king-of-poppk commented 10 months ago

Can you show me a small copy of this code? The async/await logic is built on a state machine which execute in the main R process.

Ah yes of course my mistake, it should have worked then.