OHDSI / CohortGenerator

An R package for instantiating cohorts using data in the CDM.
https://ohdsi.github.io/CohortGenerator/
11 stars 10 forks source link

Incremental Save File stalls causing lost cached information #131

Open chrisknoll opened 6 months ago

chrisknoll commented 6 months ago

This may be a bit tricky to reproduce, as I had this issue generating 17k cohorts using cohort generator.

Symptoms

As the incremental file grows (it's just a csv with cohort hashID and generation) the write_csv call will eventually hang. The problem is that it will hang in the middle of writing the results, thus loosing everything that wasn't 'flushed' to disk during save. This causes subsequent runs to think it never generated the origional cohort list...so a cohort set of 8k finished cohorts reverts back to 1k finished cohorts because the write to csv hangs writing to the file.

I suspect it's something to do with readr but I haven't been able to find any reported issues with that. The other possibility is that I am running in an EC2 instance, possibly writing to NAS attached storage, and something just flakes out and it hangs writing to disk. When I open the file (.csv) it doesn't seem to report another process is locking it, the file just seems to stop receiving I/O and the R process hangs. It is the hang-and-restart-R that leads to the issue where the partially written .csv.

The current work-around is to disable the incremental mode so that it doesn't hang on writing the file.

Ideas for Work Arounds:

If the problem is the volume of data being written to disk at once, perhaps the solution is to write the contents to the file through a series of batch-append calls to the CSV.

...That's basically all my ideas, i'm not sure of the exact nature of the failure to propose methods to address it.

azimov commented 6 months ago

Looking at the internals we're using using write_csv without any append or chunking, which will definitely have a hard ceiling in terms of scalability...

So the implementation is:

.writeCsv <- function(x, file, append = FALSE) {
  # Write the file
  readr::write_csv(
    x = x,
    file = file,
    append = append
  )
  invisible(x)
}

And the call from incremental.R doesn't even append it.

Using a csv is also problematic because we have issues like this (where a task fails mid write) but also race conditions etc.

But this is a pretty straightforward key value store and we should never need to store the entire block in memory at all.

Currently exported functions are:

computeChecksum This can stay the same (or just use a better hashing algorithm than md5) isTaskRequired This should be a simple key lookup on the computed checksum. Currently it reads the entire file into memory and then does a filter to see if it's in the data frame. Just using readr::read_csv_chunked would be pretty straightforward here (though not especially fast).

getRequiredTasks. This takes a set of tasks and asks "which need to be completed". Again, just using read_csv_chunked would stop any memory overflow issues.

recordTasksDone should just append to the file. Strictly you don't even really need uniqueness here because a filter can just stop as soon as a task is done - so we don't actually need to read the file at all (as far as I can tell from your comment, this should solve the immediate issue).

saveIncremental - This is used for updating an existing file and reads all lines, computes an index for each row (based on uniqueness of the rows) and then saves it to disk. This could be done better with chunked writing but I kind of find using this function pretty odd because the logic of "incremental" work is "1. check if task is done 2. if task is done do nothing 3. if task isn't done do something AND save results" so the results should always be unique.

Simply appending to the csv file would be sufficient, provided the checksum process works for each task. I may be missing something but reading the existing data at all seems completely unnecessary, all we care about is not overwriting it.

refactor As we would like to use the same task recording logic across packages I think a re-write would be merited as using a csv has potential flaws with race conditions and likely won't scale very well even with the proposed changes above. The cachem package appears to have a disk cache mechanism that can be used by multiple processes but it isn't immediately clear if it uses locks on the saved objects. Alternatively we could just use Andromeda as a solution, which uses sqlite as a backend which would give us locking on tables. Storing the result in a results table should also be possible, which would simplify a lot of things.

Incremental mode is also not something that needs to be audited or transfered with study results, so using a csv should not be a hard requirement here.

anthonysena commented 1 month ago

Noting this in case others run into this problem, which we observed when running a large number of cohorts through CohortGenerator via Strategus. This appears to be related to writing to csv files with multiple threads and as a work-around we've applied this code in the CohortGeneratorModule:

  # Setting the readr.num_threads=1 to prevent multi-threading for reading
  # and writing csv files which sometimes causes the module to hang on
  # machines with multiple processors. This option is only overridden
  # in the scope of this function.
  withr::local_options(list(readr.num_threads=1))

By setting the option of readr.num_threads=1, this will force readr/vroom to use a single thread. While this is not ideal nor performant, it does prevent the issue described in this thread.

I'd like to repurpose this issue to provide a mechanism to store this incremental information in the database so that it is stored with the cohort table(s) vs the file system. We'll leave the file approach in the v0.x for the package and aim to find a new home for file-based incremental operation tracking at some point.