futureverse / future

:rocket: R package: future: Unified Parallel and Distributed Processing in R for Everyone
https://future.futureverse.org
957 stars 85 forks source link

sigpipe error when using Rscript and future #380

Open randomgambit opened 4 years ago

randomgambit commented 4 years ago

Hello there,

I apologize in advance if the question is a bit broad, but I am running into various SIGPIPE warnings of the form:

Error in for (j in seq_along(value)) { : ignoring SIGPIPE signal
Error in for (i in seq_len(n)) { : ignoring SIGPIPE signal
Error in for (j in seq_along(value)) { : ignoring SIGPIPE signal
Error: ignoring SIGPIPE signal
Error in for (i in 1:(k - 1L)) { : ignoring SIGPIPE signal
Error in UseMethod("as.list") : ignoring SIGPIPE signal

when I use Rscript and future in multicore mode on a linux computer (I use the package furrr).

I would be happy to create a reproducible example but was not able to yet. Do you know what could cause the issue here? Is a sigpipe warning something that can happen with multiprocessing? I am not getting any error when i run my code sequentially.

Again, sorry for the general question but I am not sure where to start. Any hints greatly appreciated!

Thanks!

HenrikBengtsson commented 4 years ago

That smells like problems you see when parallelizing using forked processing (which multicore use via the 'parallel' package). I'd bet you get the same if you use parallel::mclapply().

Forked processing is know to be unstable in various setting, e.g. it should not be use in RStudio.

Whether forked processing is stable or not depends also what is parallelized. There many reports out there like this one.

There's no magic sause to fix this. I recommend that you use multisession instead.

randomgambit commented 4 years ago

@HenrikBengtsson thank you so much for answering. This is very helpful.

The fact is that I am using Rscript, specifically to avoid working with R Studio. Essentially I am parallelizing some text-processing. Each row of my dataframe contains some text and the function does some stuff with it. It is an "embarassingly parallel problem.

Could there be a conflict when the cores try to access a shared object (dataframe)? Is this why the multicore processing can be unstable?

Thanks!

HenrikBengtsson commented 4 years ago

The fact is that I am using Rscript, specifically to avoid working with R Studio.

Good, so then we can rule out RStudio.

Essentially I am parallelizing some text-processing. Each row of my dataframe contains some text and the function does some stuff with it. It is an "embarassingly parallel problem.

Could there be a conflict when the cores try to access a shared object (dataframe)? Is this why the multicore processing can be unstable?

Hard to guess. Multi-threaded code, which some package use, is a common suspect, so, if you use forked parallelization over such multi-threaded code, that could be one reason.

I suggest that you confirm that you get the same problem when using parallel::mclapply() without involving futures. If you can reproduce it there, then I'd reach out to the R-devel mailing list with a reproducible example. If you try to produce a minimal reproducible example, you might be able to narrow down the culprit yourself.

randomgambit commented 4 years ago

what is strange though is that I get the same error when I use plan(multiprocess). However, as mentioned, the programs continues and I still get some output.

HenrikBengtsson commented 4 years ago

what is strange though is that I get the same error when I use plan(multiprocess).

Not strange at all; multiprocess is an alias for multicore on Unix and macOS. So, use multisession.

I'm leaning more and more toward deprecated that multiprocess alias - it's the doParallel::registerDoParallel() of the future framework - it adds way more confusion that it adds value. I've already started the process of not using multiprocess in my examples.

randomgambit commented 4 years ago

thank you again @HenrikBengtsson , this is very helpful.

I am trying with multisession now. Something I have always wondered about multisession is whether the multiple R sessions will duplicate everything in memory (resulting in a possible crash) or only the object/dataframe/chunk that needs to multiprocessed. Any ideas? Can this be optimized with future (or possibly with future_map)?

randomgambit commented 4 years ago

yes... as expected, multisession maxed out all of my RAM (I have 300GB of RAM!). Perhaps I am missing something here?

HenrikBengtsson commented 4 years ago

It tries to copy only what's needed. It can't do magic, that is, you can still write code that is inefficient. Hard to say without reproducible example. Have you considered that your original problem might be that your also running out of memory? Have you profiled your code in sequential mode - do you know what are the memory hungry parts?

randomgambit commented 4 years ago

Hi @HenrikBengtsson here is a short example of what I am struggling with (of course, my real-life problem uses a much more complex function that is not vectorizable over all rows directly)

library(dplyr)
library(stringr)
library(furrr)
library(tictoc)

mydata <- tibble(mytext = rep('Henrik is a great programmer', times = 1000))

myfunc <- function(mytext){
  tibble(test = str_detect(mytext, 'Henrik'),
         value = 2)
}

tic()
mydata %>% mutate(myoutput = future_map(mytext, ~myfunc(.x))) %>% tail()
toc()

which gives:

# A tibble: 6 x 2
  mytext                       myoutput        
  <chr>                        <list>          
1 Henrik is a great programmer <tibble [1 x 2]>
2 Henrik is a great programmer <tibble [1 x 2]>
3 Henrik is a great programmer <tibble [1 x 2]>
4 Henrik is a great programmer <tibble [1 x 2]>
5 Henrik is a great programmer <tibble [1 x 2]>
6 Henrik is a great programmer <tibble [1 x 2]>
> toc()
0.953 sec elapsed

As you can see, this is embarassingly parallelizable and when the number of rows becomes large (say 100k and more) one must use multiprocessing because doing so sequentially is too slow. My go to solution was to use future_map but maybe there is something else you can recommend with future directly?

Thanks!

EDIT 2020-06-30: Explicitly attaching all packages needed for this example to work. /Henrik

tdeenes commented 4 years ago

@randomgambit

I do not think your issue is related to future at all. The problem seems to be that you have been misled by the hocus-pocus of the tidyverse approach, leading to overly complicated and inefficient code.

E.g.,

Just compare your code with this (only base-R):

mydata <- data.frame(
  mytext = rep('Henrik is a great programmer', times = 1000),
  stringsAsFactors = FALSE
)
myfunc <- function(x){
  data.frame(test = grepl('Henrik', x), value = 2L)
}
system.time(
  mydata$myoutput <- lapply(mydata$mytext, myfunc)
)

And the data.table-way:

library(data.table)
mydata <- data.table(
  mytext = rep('Henrik is a great programmer', times = 1000)
)
myfunc <- function(x){
  data.table(test = grepl('Henrik', x), value = 2L)
}
system.time(
  mydata[, myoutput := lapply(mytext, myfunc)]
)

Instead of lapply(), you can use future.apply::future_lapply() as a drop-in replacement. If you actually need one large data.frame instead of a list of tiny data.frames, you have data.table::rbindlist which can row-bind the list of lists/tibbles/data.tables/data.frames in a fast and memory-efficient way.

If you have an embarassingly parallel problem, and you run out-of-RAM (hard to believe with 300GB), you have to profile your code, and check which part of your code uses an excessive amount of memory. It can also happen that you keep one large object in your master session, that grows inefficiently, and this causes your havoc. E.g., instead of storing the results in a list and later row-bind them, you can do it efficiently in data.table with:

library(data.table)
out <- data.table()
for (x in mytext) {
  out <- rbind(out, myfunc(x))
}

You can of course split your text to larger chunks and process each chunk in a future, etc.

But really, try to avoid using tidyverse for every single piece of functionality that you do in R. And if you must work with largish data and data.frame-like structures, go for data.table.

randomgambit commented 4 years ago

thank you @tdeenes for this very interesting take! You make some really good points here.

However, I do think my issue is related to future and furrr. Indeed, the code works (albeit at a very slow space) when I use sequential processing.

I really like your idea of using data.table but the issue will remain the same: how to use multiprocessing efficiently in the context of my small toy example?

You suggest using future::lapply() . I really like this idea but could you please share some additional details about it? Is lapply processing the rows by chunks of rows? Or row-by-row? How can I tweak that?

Thank you!!

tdeenes commented 4 years ago

@randomgambit , your toy example is not really appropriate to demonstrate the efficient use of multiprocessing. First, we have fast vectorized routines implemented at the C level (e.g., grepl, stri_detect) which will be faster - if you know that there are duplicated values in mydata$mytext, you can call any of those functions on the unique values, which will be even faster. Second, myfunc does not perform any heavy calculation so you can not really test if it can cause memory overflow.

If you use any kind of multiprocessing on the same host (that is, all processes run on the same machine), in general you can face the following bottlenecks:

There is no general recipe for efficient parallel processing. As Henrik stated previously, you first have to identify where your current bottleneck is. If you find something which you think is a bug in R or in any of the packages that your calculations depend on, you have to create a minimal reproducible example (which of course must demonstrate the bug).

randomgambit commented 4 years ago

@tdeenes @HenrikBengtsson first of all, let me thank you for your very kind and informative comments. It is very hard to find package maintainers that are both at the bleeding edge and eager to help users like me. Thanks!

I will keep working on reproducing the error and I hope you will not mind if I post another usage question about future.. in the future :)

Thanks!

HenrikBengtsson commented 4 years ago

@randomgambit, I tried to do distill a smaller example from your example;

library(tibble)
library(stringr)
library(furrr)
plan(multicore)

myfunc <- function(mytext) {
  tibble(test=str_detect(mytext, "Henrik"), value=2)
}

n <- 10e3
mytext <- rep("Henrik is a great programmer", times=n)
y <- future_map(mytext, ~myfunc(.x))

Does the above also produce those critical errors when you increase n, to say n <- 100e3 (as you suggested)? If you get an error, make sure to try in a fresh R session (R --vanilla).

BTW, you never mentioned your sessionInfo(). The above works just fine for me with n <- 100e3 on Ubuntu 18.04 with 16 GiB RAM and future::availableCores() == 8 cores.

HenrikBengtsson commented 4 years ago

FWIW, regarding the ignoring SIGPIPE signal error message, I spotted a comment/question and answer of PR#17850 that confirms that this likely happens because a "connection is broken/closed/invalid", e.g. when an R worker crashes.

randomgambit commented 4 years ago

hi @HenrikBengtsson thank you so much for your feedback. When I try your small program, even with n <- 100e3, I dont get any errors. I think we need much more text to process on each pass (say instead of Henrik is a great programmer we could copy paste the text of a random full wikipedia page).

What is puzzling here is that I am not using a cluster where usually a worker (on another machine) can crash. Here I am just using all the processors of my local machine (this is what furrr does). How can the connection be broken in that case? Is this something we can tune with future at all?

Thanks!