SWS-Methodology / faoswsTrade

World trade data processing for the FAO Statistical Working System
http://www.fao.org/economic/ess/ess-home/en/
5 stars 2 forks source link

Optimisation (was: Investigate whether plyr functions can be replaced) #146

Open chrMongeau opened 7 years ago

chrMongeau commented 7 years ago

[NOTE: The name of this issue was changed from "Investigate whether plyr functions can be replaced" to "Optimisation", the reason being that while working on the plyr issues some other chunks of code were found to be inefficient and, thus, worth rewriting]

It seems that there can be some efficiency gains if plyr::XYply functions are replaced by alternatives. For instance, given the following dataframe:

> toexpand
# A tibble: 138,358 x 6
   reporter  flow fromcode tocode   fcl hsrange
      <int> <int>    <int>  <int> <int>   <int>
 1        1     1    10210  10290   866      80
 2        1     1    10300  10399  1034      99
 3        1     1    10410  10419   976       9
 4        1     1    10420  10429  1016       9
 5        1     1    20410  20419   977       9
 6        1     1    20430  20439   977       9
 7        1     1    20450  20459  1017       9
 8        1     1    20630  20639  1036       9
 9        1     1    20820  20829  1166       9
10        1     1    21020  21029   872       9
# ... with 138,348 more rows

the following code (currently in extract_hs6fclmap.R, toexpand was added):

maptable_range <- toexpand %>%
  plyr::ddply(.variables = c("reporter", "flow"),
              function(df) {
                plyr::adply(df, 1L, function(df){
                  allhs <- seq.int(df$fromcode, df$tocode)
                  rows <- length(allhs)
                  fcl <- rep.int(df$fcl, times = rows)
                  data_frame(hs6 = allhs,
                             fcl = fcl)
                })
              },
              .parallel = parallel,
              .paropts = list(.packages = "dplyr")) %>%
  select_(~reporter,
          ~flow,
          ~hs6,
          ~fcl)

takes around 3 minutes, while the following alternative (that generates an identical output):

maptable_range = toexpand %>%
  rowwise() %>%
  mutate(hs6 = list(fromcode:tocode)) %>%
  tidyr::unnest() %>%
  select(reporter, flow, hs6, fcl)

takes around 4 seconds:

> micro <- microbenchmark::microbenchmark(maptable_range = toexpand %>% rowwise() %>% mutate(hs6 = list(fromcode:tocode)) %>% tidyr::unnest() %>% select(reporter, flow, hs6, fcl), times = 10)
> micro
Unit: seconds
 expr      min       lq     mean   median       uq      max neval
  aaa 3.930937 4.105681 4.235943 4.203929 4.383569 4.721291    10

This issue came to light while investigating what are the bottlenecks in running parallel processes on the SWS server (the parallel is TRUE).

Work on this is on branch refactor/issue146.

chrMongeau commented 7 years ago

Chunk in f1():

f1 <- function() {
    bind_rows(maptable_0range, maptable_range) %>%
    arrange_(~reporter, ~flow, ~hs6, ~fcl) %>%
    distinct() %>%
    # Split by chunks to be efficient in parallel execution
    plyr::ddply(.variables = c("reporter", "flow"),
                function(df) {
                  df %>%
                    group_by_(~hs6) %>%
                    mutate_(fcl_links = ~length(unique(fcl))) %>%
                    ungroup()
                },
                .parallel = parallel,
                .paropts = list(.packages = "dplyr"))
}

f2 <- function() {
  bind_rows(maptable_0range, maptable_range) %>%
  group_by(reporter, flow, hs6) %>%
  mutate(fcl_links = n_distinct(fcl)) %>%
  ungroup() %>%
  distinct()
}

> system.time(a_1 <- f1())
   user  system elapsed 
  17.05    3.74   59.05
> system.time(a_2 <- f2())
   user  system elapsed 
  22.81    0.30   23.14 
chrMongeau commented 7 years ago

For the hsInRange() a good alternative seems to be using the data.table::foverlaps() function (HT @vidigal88). The code below refers to esdata.

f1 <- function(parallel_par) {
plyr::ddply(
    df,
    .variables = c("reporter", "flow"),
    .fun = function(subdf) {

      # Subsetting mapping file
      maptable <- maptable %>%
        filter_(~reporter == subdf$reporter[1],
                ~flow == subdf$flow[1])

      # If no corresponding records in map return empty df
      if(nrow(maptable) == 0)
        return(data_frame(
          datumid = subdf$id,
          hs = subdf$hs,
          hsext = subdf$hsext,
          fcl = as.integer(NA),
          recordnumb = as.numeric(NA)))

      # Split original data.frame by row,
      # and looking for matching fcl codes
      # for each input hs code.
      # If there are multiple matchings we return
      # all matches.
      fcl <- plyr::ldply(
        subdf$datumid,
        function(currentid) {

          # Put single hs code into a separate variable
          hsext <- subdf %>%
            filter_(~datumid == currentid) %>%
            select_(~hsext) %>%
            unlist() %>% unname()

          # Original HS to include into output dataset
          hs <- subdf %>%
            filter_(~datumid == currentid) %>%
            select_(~hs) %>%
            unlist() %>% unname()

          maptable <- maptable %>%
            filter_(~fromcodeext <= hsext &
                      tocodeext >= hsext)

          # If no corresponding HS range is
          # available return empty integer
          if(nrow(maptable) == 0L) {
            fcl <- as.integer(NA)
            recordnumb <- as.numeric(NA)
          }

          if(nrow(maptable) >= 1L) {
             fcl <- maptable$fcl
             recordnumb <- maptable$recordnumb
          }

          data_frame(datumid = currentid,
                     hs = hs,
                     hsext = hsext,
                     fcl = fcl,
                     recordnumb = recordnumb)
        }
      )

    },
    .parallel = parallel_par,
    # Windows requires functions and packages to be explicitly exported
    .paropts = list(.packages = "dplyr"),
    .progress = ifelse(CheckDebug() &
                         !parallel_par &
                         # Don't show progress for quick calculations
                         nrow(uniqhs) > 10^4,
                       "text", "none")
  )
}

f2 <- function() {
foverlaps(df1, maptable1) %>%
  tbl_df() %>%
  select(reporter, flow, datumid, hs, hsext, fcl, recordnumb)
}

df1 <- df %>% mutate(fromcodeext = hsext, tocodeext = hsext)
maptable1 <- maptable

df1 <- as.data.table(df1)
maptable1 <- as.data.table(maptable1)
setkey(maptable1, reporter, flow, fromcodeext, tocodeext)

micro <- microbenchmark::microbenchmark(test1 <- f1(FALSE), test2 <- f2(), times = 3)
> micro
Unit: milliseconds
               expr         min          lq        mean      median          uq         max neval
 test1 <- f1(FALSE) 199201.4088 199337.5273 200054.4935 199473.6458 200481.0358 201488.4258     3
      test2 <- f2()    112.0667    112.5727    112.7575    113.0788    113.1029    113.1271     3

(The comparison was made without parallel computation; it would take around 1/n the time above (acttually more for the overhead) when parallel_par = TRUE, where n is the number of cores.)

chrMongeau commented 7 years ago

While not strictly related to the plyr issue, I modified the sel1FCL() function. It used to work by row with a do(), now (added in 998a81d) it uses group_by().

Example of improvement (_pre is previous version, _post is new version, see links above):

> system.time(a_pre <- sel1FCL_pre(uniqhs %>% slice(1:100), maptable, cur_yr = year))
   user  system elapsed 
  54.46    0.23   54.76

> system.time(a_post <- sel1FCL_post(uniqhs %>% slice(1:100), maptable, cur_yr = year))
   user  system elapsed 
   0.98    0.00    0.99
chrMongeau commented 7 years ago

In b952828 the very inefficient use of sumFlags() was changed in order to use a normal sum():

> system.time(
tradedata_flags_pre <- tradedata %>%
  group_by_(~year, ~reporter, ~partner, ~flow, ~fcl) %>%
  summarise_each_(funs(sumFlags(flags = .)), vars = ~starts_with('flag_')) %>%
  ungroup()
)
   user  system elapsed 
 496.38    0.39  500.53 
> 
> 
> system.time(
tradedata_flags_post <- tradedata %>%
  group_by_(~year, ~reporter, ~partner, ~flow, ~fcl) %>%
  summarise_each_(funs(sum(.)), vars = ~starts_with('flag_')) %>%
  ungroup() %>%
  mutate_each_(funs(as.integer(. > 0)), vars = ~starts_with('flag_'))
)
   user  system elapsed 
   1.97    0.11    2.07 
sebastian-c commented 7 years ago

While looking at the code you sent to the SWS team, I made a data.table implementation of the plyr code you had before to compare it with parallel performance. I think you've already fixed it, however.

library(data.table)
maptable_dt <- as.data.table(maptable)
system.time(maptable_range_dt <- maptable_dt[hsrange > 0, 
                                             .(hs6=seq.int(fromcode, tocode), fcl = fcl), 
                                             by = .(reporter, flow, fromcode, tocode, fcl)])
chrMongeau commented 7 years ago

Your version runs faster:

f1 <- function() {
    maptable_range = (maptable %>% filter(hsrange > 0)) %>%
    rowwise() %>%
    mutate(hs6 = list(fromcode:tocode)) %>%
    tidyr::unnest() %>%
    select(reporter, flow, hs6, fcl)
}

f2 <- function() {
maptable_range_dt <- maptable_dt[hsrange > 0, 
  .(hs6=seq.int(fromcode, tocode), fcl = fcl), 
  by = .(reporter, flow, fromcode, tocode, fcl)]
}

> microbenchmark::microbenchmark(f1(), f2(), times = 10)
Unit: milliseconds
 expr       min        lq      mean    median        uq       max neval
 f1() 3747.4974 3852.9860 3915.0701 3908.8779 3999.9282 4065.3669    10
 f2()  265.8615  302.9104  345.9196  335.8995  391.4239  443.8465    10