shikokuchuo / nanonext

nanonext - R binding for NNG (Nanomsg Next Gen)
http://shikokuchuo.net/nanonext/
GNU General Public License v3.0
58 stars 7 forks source link

nanonext : which kind of protocol we must use for working with 'future' package ? #2

Closed bsarrant closed 2 years ago

bsarrant commented 2 years ago

Hi,

I'm doing some experiments with your package because it provides interesting capacities, especially the one to transport objects. In the tests i'm doing, i try to coordinate message status between my workers and the main r session. The first functional prototype is based on liteq but limited to text messaging. So I'm trying to exteding it with your package.

the code is structured as follows:

library(future)
library(lgr)
library(promises)
library(data.table)
library(stringr)
library(nanonext)
library(magrittr)

dt_tasks <- data.table(
  ID = 1:3,
  name = c( "UC1", "UC2", "UC3"),
  memory = c(100, 150, 200),
  time = c(2, 4, 6),
  treated = rep(F,3)
)

# define a wrapper for initializing workers' environment
nb_workers <- 3
with_two_workers <- function( expr ) {
  old_plan <- future::plan(future::multisession(workers = nb_workers))
  on.exit({future::plan(old_plan)}, add = TRUE)
  force(expr)
  while(!later::loop_empty()) {Sys.sleep(0.1); later::run_now()}
  invisible()
}

nb_of_completed_ops <- 0
with_two_workers(
  {
    # define the workers' tasks
    for(i in 1:nrow(dt_tasks)){
      future_promise(
        {
          s1 <- socket("pair")
          send_aio(s1, str_c( dt_tasks[i,name], "|", str_pad( "Start Treatment",width = 15, side = "right" ), "|PID=", Sys.getpid() ) )
          Sys.sleep( dt_tasks[i,time] )
          send_aio(s1, str_c( dt_tasks[i,name], "|", str_pad( "End Treatment" ,width = 15, side = "right" ), "|PID=", Sys.getpid() ) )
          send_aio(s1, str_c( dt_tasks[i,name], "|", "DONE" ) )
          close(s1)
        },
        packages = c("data.table", "nanonext"), seed = NULL
      ) 
    }

    # define Main tasks to monitor the workers alongside
    promise_resolve(
      {
        s2 <- socket("pair")
        lgr$warn( str_c( "MAIN|",str_pad("Start Treatment",width = 15,side = "right"), "|PID=", Sys.getpid()) )
        nb_of_completed_ops <- 0
        while( nb_of_completed_ops < nrow(dt_tasks)) {
          lc_msg <- recv_aio(s2)
          if( unresolved(lc_msg) ){
            lgr$debug( str_c( "MAIN|",str_pad("Waiting...",width = 15,side = "right"),"|PID=", Sys.getpid(), ">nb of completed ops: ", nb_of_completed_ops) )
          } else {
            lgr$info( lc_msg$data )
            if (lc_msg$data == "DONE") {
              nb_of_completed_ops <- nb_of_completed_ops + 1
            }
          }
          Sys.sleep(1)
        }
        lgr$warn( str_c( "MAIN|",str_pad("End Treatment",width = 15,side = "right"), "|PID=", Sys.getpid()) )
        close(s2)
      }
    )
  }
)

the issue is i don't know how to combined nanonext with future/promises:

I'm sure i'm missing something but in your examples, you presents asynchronous interaction in a sequential manner. So how should I proceed while coding futures / promises ?

Many thanks in advance

Regards, Bruno

shikokuchuo commented 2 years ago

Just a couple of pointers:

bsarrant commented 2 years ago

Hi !

Many thanks for all these detailled information, it will help me to have a better understanding how to use your packages. It's really appreciated :)

To give a better insight on what i'm working:

I will think and test with your insights and just a last question: when you said mirai supports asyn with not many lines of R code, the limits is how many lines ? could this limitation could be overreached by wrapping the treatement definition by a function ?

Many thanks a lot and i'll keep you posted how it move on (and (for closing the issue too)

Regards, Bruno

shikokuchuo commented 2 years ago

Sounds good. I think for message passing all you will need is to ensure sockets are started in each process. You can have your master process listen at an address, your individual processes then dial into the address. For simplicity you can just have each process send it's PID to keep track of where messages come from without even having to use contexts.

You could have the master process start the individual processes (which is basically what mirai does) or you could even start them up manually.

My reference to lines of code in mirai was simply that mirai itself did not take many lines of code to write. It is kind of an elaborate wrapper around nanonext. If mirai is flexible enough for your needs (and it might well be), the expression that mirai takes can be arbitrarily long if you wrap it in { }. The returned value will be the last item in { } just like in a function.

If you write your own function to use with mirai remember to pass the function definition as an argument as well as the data: e.g. if you have defined myfunc() and data in your global environment. m <- mirai(myfunc(data), myfunc = myfunc, data = data)

shikokuchuo commented 2 years ago

Closing this issue. @bsarrant please feel free to re-open or open new issues with anything specific that comes up.