pbiecek / archivist

A set of tools for datasets and plots archiving
http://pbiecek.github.io/archivist/
74 stars 9 forks source link

Parallel + DB Locking Issue #322

Closed happyshows closed 6 years ago

happyshows commented 6 years ago

Hi,

I have a regular batch job to save articrafts to local repo. And I was using mcparallel to do it.

library(parallel)
jobs <- lapply(files, function(f){
  mcparallel({
    res <- prepData(f)
    asave(res,'localRepo',artifactName = f,archiveMiniature = F,archiveSessionInfo = F,silent = T) %>% invisible()
  }, name = f)
})
mccollect(jobs)

However, one of the task failed which shows:

[1] "Error in rsqlite_send_query(conn@ptr, statement) : database is locked\n"
attr(,"class")
[1] "try-error"
attr(,"condition")
<Rcpp::exception in rsqlite_send_query(conn@ptr, statement): database is locked>

What type of parallel process should I use to avoid DB locking issue?

pbiecek commented 6 years ago

Very interesting use-case. By default, we are using SQLite database which is a very simple database linked with a single file. The file is locked when new objects are added. It may cause errors when two or more processes are trying to connect to a LOCKed file.

I do not see how this can be solved for SQLite. The best solution is to use a more advanced database, like postgress. Try createPostgresRepo() and setPostgresRepo(). Postgress will handle concurrent access to a single database.

jakubkuzilek commented 6 years ago

Hi, I was dealing with the similar issue and my solution was to create save_to_repo_extended using package flock, which creates mutex to deal with critical code part:

save_to_repo_extended <- function(artifact,
                                  repoDir = archivist::aoptions("repoDir"),
                                  archiveData = TRUE,
                                  archiveTags = TRUE,
                                  archiveMiniature = TRUE,
                                  archiveSessionInfo = TRUE,
                                  force = TRUE, value = FALSE,
                                  ...,
                                  userTags = c(),
                                  silent = archivist::aoptions("silent"),
                                  ascii = FALSE,
                                  artifactName = deparse(substitute(artifact)),
                                  file_lock = NULL){ # passing the file lock
  if(is.null(file_lock)){ # if lock does not exists continue as usual
    return_value <- archivist::saveToRepo(artifact, repoDir, archiveData, archiveTags,
                                          archiveMiniature, archiveSessionInfo, force,
                                          value, ..., userTags, silent, ascii, artifactName)
  } else { # lock exists -> lock section and continue
    locker <- flock::lock(file_lock) # lock critical section
    return_value <- archivist::saveToRepo(artifact, repoDir, archiveData, archiveTags,
                                          archiveMiniature, archiveSessionInfo, force,
                                          value, ..., userTags, silent, ascii, artifactName)
    flock::unlock(locker) # unlock section
  }

  return_value
}
happyshows commented 6 years ago

thanks, will try later.

pbiecek commented 6 years ago

Thank you guys for this interesting use case. In the version 2.3 I've added support of flock package to the saveToRepo function through use_flock parameter http://pbiecek.github.io/archivist/reference/saveToRepo.html