shikokuchuo / mirai

mirai - Minimalist Async Evaluation Framework for R
https://shikokuchuo.net/mirai/
GNU General Public License v3.0
193 stars 10 forks source link

How to know when everywhere() is done with everything #125

Closed gangstR closed 3 months ago

gangstR commented 3 months ago

I'm still thoroughly appreciating this excellent package. Thank you! This is the best thing that has happened to R in several years.

I'm working on an application that needs to process ~850 requests/minute. I've historically used stable persistent clusters (leveraging parallel and parallelly), but I wanted to migrate it to your lovely package since the polling in my previous setup was creating a rather heavy parallel 'tax'. I was noticing some very odd intermittent failures in my set of provisioning functions that ensure all workers are loaded with packages and variables as well as my own parallel logger routines. It seemed at times that everywhere() might be failing. I threw some breakpoints in my package and rebuilt and, sure enough, caught what seems to be intermittent points where previous steps of provisioning are not completed even though everywhere() had been called. Rerunning those lines while in the debugger allowed me continue on.

On one hand, this makes sense; a mirai is an async future. But how does one check on a call to everywhere() other than to inspect the search() path, .GlobalEnv, etc. (depending on what I've done) of each worker since the function has no return value? It took a minute before I resorted to some code violence.

Being the gangstR that I am, I decided some good old fashioned torture might get it talk to me. The reprex below will provision 10 daemons(). Once it makes sure they're connected, I set a node.pid value in each daemon and then try to retrieve them all by calling an excessive amount of mirai() (knowing that there is no reason to expect a uniform set of returns, I can imagine some will be missing from time to time). Because we know those will take time, I add a delay (with exponential backoff). To test whether everywhere() also has an effect, too, I place an earlier delay before the set of mirai are called. Once all node.pid values are recovered from the daemons, I reset them and start the provisioning over again and again.

library(mirai)

node.cnt <- 10

while (TRUE) {
  daemons(node.cnt)

  count <- 0L
  while (TRUE) {
    count <- count + 1L
    out <- daemons()$daemons
    if (is.matrix(out)) {
      break
    }
  }
  message("Daemons ready after ", count, " status check(s)")

  count <- 0L
  while (TRUE) {
    everywhere(node.pid <<- Sys.getpid())
    Sys.sleep(2^count)  # Something strange is afoot at the Circle K if you remove this
    m <- lapply(1:1000, FUN = function(x) {mirai(print(node.pid))})
    Sys.sleep(2^count)  # We expect to either wait here or else call in the mirai resolution...
    pid.vec <- unique(na.omit(unlist(lapply(m, FUN = function(x) {x$data}))))
    pid.df <- data.frame(pid = pid.vec, id = order(pid.vec))
    count <- count + 1L
    if (nrow(pid.df) == node.cnt) {
      break
    } else {
      message("...only ", nrow(pid.df), " row(s) present but expected ", node.cnt)
    }
  }
  message("All nodes replying after ", count, " status check(s)\n\n")

  daemons(0)
}

Here's a short sample run:

Daemons ready after 1 status check(s)
All nodes replying after 1 status check(s)

Daemons ready after 1 status check(s)
...only 9 row(s) present but expected 10
All nodes replying after 2 status check(s)

Daemons ready after 1 status check(s)
...only 8 row(s) present but expected 10
All nodes replying after 2 status check(s)

Daemons ready after 1 status check(s)
...only 9 row(s) present but expected 10
All nodes replying after 2 status check(s)

What's interesting is that I notice that commenting out the first delay (after the everywhere()) has quite a noticeable effect on the progress (i.e., it gets much worse). This shows up as:

Daemons ready after 1 status check(s)
...only 0 row(s) present but expected 10
All nodes replying after 2 status check(s)

Daemons ready after 1 status check(s)
...only 0 row(s) present but expected 10
All nodes replying after 2 status check(s)

Daemons ready after 1 status check(s)
...only 0 row(s) present but expected 10
All nodes replying after 2 status check(s)

Daemons ready after 1 status check(s)
...only 0 row(s) present but expected 10
...only 9 row(s) present but expected 10
All nodes replying after 3 status check(s)

Daemons ready after 1 status check(s)
...only 7 row(s) present but expected 10
All nodes replying after 2 status check(s)

I repeated this behavior many times (commenting and uncommenting the first delay). This seems to suggest that I should devise checks after my calls to everywhere() to make sure that the provisioning work has completed. Or is there simply a way to block and wait for them to complete as there is with mirai? Keep in mind, my applications run on Linux but these tests were ran on a very modest Windows laptop; thus, not particularly fast.

One other minor/unrelated question: Is there a way to get the i value from a daemon (that shows in status()) to know its identity as part of a mirai? It will be handy to know which worker processed my future in a less convoluted way than I've devised locally. Thank you

Thank you

shikokuchuo commented 3 months ago

Thanks for reporting your experience of everywhere(). It is designed to perform initial setup steps after establishing daemons such as loading packages etc., rather than for changing them after certain evaluation steps. I wonder if it makes sense in your workflows to tear down the daemons and create new ones at the points where you need to change the persistent configurations.

Nevertheless I will consider what is the correct behaviour here - it should be fairly straightforward to allow (i) the ability to wait for everywhere() or else (ii) make them synchronisation points (wait for all mirai calls to complete).

Getting the 'i' value on the other hand is not supported by design. Dispatcher performs FIFO scheduling, hence you cannot know beforehand which daemon dispatcher will send a task to. If this is important then use compute profiles to route certain tasks to certain daemons.

gangstR commented 3 months ago

I appreciate your response and agree on the potential mitigations. Thank you.

As a point of clarification, what I'm doing with the daemons is an iterative map-reduce pattern; that is, after provisioning them for distributed independent work and after they have completed that work, the daemons require progressive provisioning with more information that can only be ascertained from the reduce phase. It seemed natural to try and use everywhere() in an incremental way on that iterative journey as I had been doing with the persistent clusters in the previous incarnation of the code (e.g., using combinations of clusterCall(), clusterExport(), clusterEvalQ(), etc. to evolve the persistent workers). Besides, even if I'm only provisioning them once, I need to know when they're ready for work. Although it wasn't stated in the documentation, I'd initially incorrectly assumed from the function's description that everywhere() was a blocking call for each daemon.

Also, I'm not interested in getting the i value to understand dispatcher behavior a priori; rather, my interest is strictly posteriori so that I can use this identifier in my custom hierarchical parallel logger to track which daemon turned in some homework. Put another way, my use case is [ostensibly?] a disaggregated view of the summarized details tracked in your status() matrix that identifies how much work each worker has completed. I have a workaround, but it's not too dissimilar to what I included in the reprex (i.e., extracting pids from all workers, building a pid lookup table, and pushing that table back onto each daemon for local lookup so it can have a simple identifier for logging purposes). Overly complicated, but works. I suppose one might ask why not just use pid as the logging identifier from each daemon's local state when it logs something, but my logging data must make its way into a Kafka process that is expecting a simple 1 .. n id in addition to pid. Changing out the rest of the plumbing is more than I have time for at the moment πŸ˜†

shikokuchuo commented 3 months ago

If you are interested in utilisation rate for the daemons then the summary status() matrix should be sufficient.

Otherwise, from the point of view of the framework, all daemons (under the same compute profile) are treated the same. It matters not whether one is labelled 1 or 10, hence this information is not made available. If in reality they are not the same, and you need to make use of this fact, then you should make use of compute profiles - this was my earlier point.

If just for logging, then that seems to be the responsibility of the logging implementation - for example it seems possible in the case you describe to send everything to one stream rather than 'n' streams - you have the PIDs anyway so you can query the database to get whatever view you want.

If there is some other reason I am missing, please feel free to point out.

gangstR commented 3 months ago

Thank you for the reply. I very much appreciate your contributions to the R community. Your perspective on the daemons within the framework you've designed resonates with me. And, as the author, you always have the conch πŸ˜„

While I'm not interested in the utilization rate, I still do need to know who turned in homework. Workers are initialized identically, but then may evolve into different states. They receive different data, may run different processes (from their loaded collection), and turn in results that are collated and redistributed. I'm not at liberty to reveal my context and so it's rather difficult to explain. Science requires reproducibility, and that was my motivation. I can most certainly do some great science with your package, but, alas for me, not always easily with my own current logger package πŸ˜† Haha. But I have a workaround and so no lab mice need suffer... until perhaps I hire a new cat to rewrite my old logger(???).

I do very much appreciate and use the compute profile via the .compute parameter since I frequently need to manage things across nested parallelism due to some performance-related NFRs. Inner workers are provisioned differently than outer workers, and your parameter makes it easy. Thank you for that.

As I've 'chewed' on my original problem a bit more, I must say that I'm genuinely surprised that no one else noticed that everywhere() --- while designed for daemon setup --- is not blocking (which was my own incorrect assumption) and yet (a) does not let you know when it's done and (b) offers no way to inspect that it's done without a bit of work. As far as I can tell right now [and I might very well be missing something obvious because I love work and loathe sleep], the only way to know that every daemon is provisioned in my process after running everywhere() with a complex expression and a few variables is to saturate the daemons with interrogations and aggregate those results until I've gotten every daemon to reply at least once. Only by saturating them for the assurance of a census in my inspection can I know they're ready. And running a thousand mirai() requests will frequently miss a few of them if I have 10 daemons. The problem gets worse if I start more daemons. Sometimes I see great uniformity and sometimes heavy skew in utilization, both of which are natural and expected; thus, I have coded to achieve a census. The processing time for inspection is negligible, of course, but code for that processing really clutters up the provisioning code.

Ironically and perhaps in the context of the original reason I opened the issue, you might begin to appreciate why I was logging what each daemon is doing. The incomplete state was blowing up my science and it took some time to discover why code was working on one run and then not working on another run under what were identical conditions from my vantage point. While I threw in the daemon index question in my posts, it was an honest sidebar question; however, in retrospect, the single best argument for accessing the index [so I can track individual daemon behavior] is this issue about everywhere(). All daemons are not the same sometimes, and it's not because I'm not running the same code on them. It's because they may take different times to provision and they don't let me know when they're done. So I must bludgeon them with inspection since I cannot talk to each one, and simply waiting a few seconds or sampling a few mirai() is not sufficient either. Each one must report in about its state before I can begin the heavy lifting. The main issue I've opened currently makes somewhat of an argument for access to each daemon.

I would have fully expected that someone else would have hit this provisioning situation before me.

I appreciate you. Forgive the verbosity and humor. I work by myself too often...

shikokuchuo commented 3 months ago

Given what you're saying, I'm no longer sure it's a good idea to change the behaviour of everywhere().

It is designed to perform setup tasks i.e. right after a call to daemons(), and it does this perfectly. This is given added importance in the dev version as I have folded serialization support into the new '.serial' argument of everywhere().

Other usage is purely incidental, and I'd have to think through the implications thoroughly before I 'bless' them through providing a formal interface (such as the ability to wait).