richfitz / storr

:package: Object cacher for R
http://richfitz.github.io/storr
Other
117 stars 10 forks source link

lz4 and zstd compression via fst #111

Open wlandau opened 5 years ago

wlandau commented 5 years ago

Suggested by @MarcusKlik. This PR implements optional fst compression for RDS storrs. When we save serialized data, we

  1. Use fst::compress_fst() to compress the data, and
  2. Save to a file using saveRDS(compress = FALSE).

When we read the data back in, we call fst::decompress_fst() before returning the value.

We can select fst compression for new storrs with storr_rds(compress = "fst"). The compress argument now understands values "fst", "gzfile", and "none", as well as the original logical arguments. The default for new storrs is "gzfile". Should it be?. Personally, I would rather it be "fst", but then we would have to move the fst package from Suggests to Imports.

I also updated the tests to check back-compatibility and compress = "fst".

codecov-io commented 5 years ago

Codecov Report

Merging #111 into master will increase coverage by <.01%. The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #111      +/-   ##
==========================================
+ Coverage   99.91%   99.91%   +<.01%     
==========================================
  Files          16       16              
  Lines        1203     1224      +21     
==========================================
+ Hits         1202     1223      +21     
  Misses          1        1
Impacted Files Coverage Δ
R/utils.R 100% <ø> (ø) :arrow_up:
R/hash.R 100% <100%> (ø) :arrow_up:
R/driver_rds.R 100% <100%> (ø) :arrow_up:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update 0c64f1e...deea50d. Read the comment docs.

wlandau commented 5 years ago

Speeds are comparable for small data. Large data benchmarks are forthcoming.

library(storr)
library(microbenchmark)
st_none <- storr_rds(tempfile(), compress = "none")
st_gzfile <- storr_rds(tempfile(), compress = "gzfile")
st_fst <- storr_rds(tempfile(), compress = "fst")
 microbenchmark(
    none = st_none$set(key = "x", value = runif(1)),
    gzfile = st_gzfile$set(key = "x", value = runif(1)),
    fst = st_fst$set(key = "x", value = runif(1))
 )
#> Unit: milliseconds
#>    expr      min       lq     mean   median       uq       max neval
#>    none 5.863562 6.246155 6.503483 6.442023 6.751117  7.786466   100
#>  gzfile 4.528847 4.978007 5.209034 5.138960 5.376522  6.907493   100
#>     fst 5.827846 6.205787 6.563974 6.392180 6.712995 13.425614   100

Created on 2019-06-18 by the reprex package (v0.3.0)

wlandau commented 5 years ago

Benchmarks on 800 MB data, where storr uses writeBin():

library(storr)
library(microbenchmark)
library(pryr)
#> Registered S3 method overwritten by 'pryr':
#>   method      from
#>   print.bytes Rcpp
data <- runif(1e8)
object_size(data)
#> 800 MB
st_none <- storr_rds(tempfile(), compress = "none")
st_gzfile <- storr_rds(tempfile(), compress = "gzfile")
st_fst <- storr_rds(tempfile(), compress = "fst")
 microbenchmark(
    none = st_none$set(key = "x", value = data),
    gzfile = st_gzfile$set(key = "x", value = data),
    fst = st_fst$set(key = "x", value = data),
    times = 1
 )
#> Unit: seconds
#>    expr       min        lq      mean    median        uq       max neval
#>    none  6.375112  6.375112  6.375112  6.375112  6.375112  6.375112     1
#>  gzfile 89.511704 89.511704 89.511704 89.511704 89.511704 89.511704     1
#>     fst  6.636744  6.636744  6.636744  6.636744  6.636744  6.636744     1

and for large enough data to repack:

library(storr)
library(microbenchmark)
library(pryr)
#> Registered S3 method overwritten by 'pryr':
#>   method      from
#>   print.bytes Rcpp
data <- runif(3e8)
object_size(data)
#> 2.4 GB
st_none <- storr_rds(tempfile(), compress = "none")
st_gzfile <- storr_rds(tempfile(), compress = "gzfile")
st_fst <- storr_rds(tempfile(), compress = "fst")
 microbenchmark(
    none = st_none$set(key = "x", value = data, use_cache = FALSE),
    gzfile = st_gzfile$set(key = "x", value = data, use_cache = FALSE),
    fst = st_fst$set(key = "x", value = data, use_cache = FALSE),
    times = 1
 )
#> Repacking large object
#> Repacking large object
#> Unit: seconds
#>    expr       min        lq      mean    median        uq       max neval
#>    none  27.21743  27.21743  27.21743  27.21743  27.21743  27.21743     1
#>  gzfile 276.25813 276.25813 276.25813 276.25813 276.25813 276.25813     1
#>     fst  36.22005  36.22005  36.22005  36.22005  36.22005  36.22005     1

Created on 2019-06-18 by the reprex package (v0.3.0)

wlandau commented 5 years ago

A minor snag: we need an extra deserialization step if we compress with fst. Not terrible, but worth noting.

library(storr)
library(microbenchmark)

st_none <- storr_rds(tempfile(), compress = "none")
st_gzfile <- storr_rds(tempfile(), compress = "gzfile")
st_fst <- storr_rds(tempfile(), compress = "fst")

st_none$set(key = "x", value = 1, use_cache = FALSE)
st_gzfile$set(key = "x", value = 1, use_cache = FALSE)
st_fst$set(key = "x", value = 1, use_cache = FALSE)

microbenchmark(
    none = st_none$get(key = "x", use_cache = FALSE),
    gzfile = st_gzfile$get(key = "x", use_cache = FALSE),
    fst = st_fst$get(key = "x", use_cache = FALSE)
)
#> Unit: microseconds
#>    expr     min       lq     mean   median       uq     max neval
#>    none 205.729 207.9270 219.5831 211.3970 224.3855 415.171   100
#>  gzfile 207.871 210.9915 225.9019 217.7565 233.5205 274.534   100
#>     fst 227.325 229.5640 240.8948 232.3175 244.8615 374.784   100

Created on 2019-06-18 by the reprex package (v0.3.0)

wlandau commented 5 years ago

For 800 GB, reading the data is slower than with no compression, but it is still better than the default compression.

library(storr)
library(microbenchmark)
library(pryr)
#> Registered S3 method overwritten by 'pryr':
#>   method      from
#>   print.bytes Rcpp

data <- runif(1e8)
object_size(data)
#> 800 MB

st_none <- storr_rds(tempfile(), compress = "none")
st_gzfile <- storr_rds(tempfile(), compress = "gzfile")
st_fst <- storr_rds(tempfile(), compress = "fst")

st_none$set(key = "x", value = data, use_cache = FALSE)
st_gzfile$set(key = "x", value = data, use_cache = FALSE)
st_fst$set(key = "x", value = data, use_cache = FALSE)

microbenchmark(
    none = st_none$get(key = "x", use_cache = FALSE),
    gzfile = st_gzfile$get(key = "x", use_cache = FALSE),
    fst = st_fst$get(key = "x", use_cache = FALSE),
    times = 1
)
#> Unit: seconds
#>    expr      min       lq     mean   median       uq      max neval
#>    none 2.425930 2.425930 2.425930 2.425930 2.425930 2.425930     1
#>  gzfile 4.908687 4.908687 4.908687 4.908687 4.908687 4.908687     1
#>     fst 3.112166 3.112166 3.112166 3.112166 3.112166 3.112166     1

Created on 2019-06-18 by the reprex package (v0.3.0)

For larger data:

library(storr)
library(microbenchmark)
library(pryr)
#> Registered S3 method overwritten by 'pryr':
#>   method      from
#>   print.bytes Rcpp

data <- runif(3e8)
object_size(data)
#> 2.4 GB

st_none <- storr_rds(tempfile(), compress = "none")
st_gzfile <- storr_rds(tempfile(), compress = "gzfile")
st_fst <- storr_rds(tempfile(), compress = "fst")

st_none$set(key = "x", value = data, use_cache = FALSE)
#> Repacking large object
st_gzfile$set(key = "x", value = data, use_cache = FALSE)
#> Repacking large object
st_fst$set(key = "x", value = data, use_cache = FALSE)

microbenchmark(
    none = st_none$get(key = "x", use_cache = FALSE),
    gzfile = st_gzfile$get(key = "x", use_cache = FALSE),
    fst = st_fst$get(key = "x", use_cache = FALSE),
    times = 1
)
#> Unit: seconds
#>    expr       min        lq      mean    median        uq       max neval
#>    none  7.194649  7.194649  7.194649  7.194649  7.194649  7.194649     1
#>  gzfile 14.287884 14.287884 14.287884 14.287884 14.287884 14.287884     1
#>     fst  8.197173  8.197173  8.197173  8.197173  8.197173  8.197173     1
wlandau commented 5 years ago

We might still want to keep #109, either instead of or in addition to #111: https://github.com/richfitz/storr/pull/109#issuecomment-503388873.

MarcusKlik commented 5 years ago

Hi @wlandau, I see you have added an option compress = "fst". Method fst::compress_fst() allows for selection of the LZ4 compressor (build for speed) or the ZSTD compressor (slower but with better compression) and a compression setting between 0 and 100.

Would it be useful to distinguish between the two (very different) compressors?

wlandau commented 5 years ago

Yes, I agree. In https://github.com/richfitz/storr/pull/111/commits/486b1eadb70ff77e00512f834fe764b1372ae8bf, I exposed both the algorithm and the compression factor to storr_rds(). Both seem to fall within the scope of storr.

wlandau commented 5 years ago

Assuming a compression factor of 0, I expect drake users to prefer LZ4. Finding the right compression factor is an intense (but interesting) optimization problem that I plan to come back to later. (I want to use an optimization method that a colleague and I are working on right now, and we need to wrap it up first.)

library(gt)
library(microbenchmark)
library(pryr)
#> Registered S3 method overwritten by 'pryr':
#>   method      from
#>   print.bytes Rcpp
library(storr)
library(tidyverse)

none <- storr_rds(tempfile(), "none")
gzip <- storr_rds(tempfile(), "gzip")
lz4 <- storr_rds(tempfile(), "lz4")
zstd <- storr_rds(tempfile("zstd"))

results <- NULL
for (scale in seq_len(8)) {
    data <- runif(10 ^ scale)
    set <- microbenchmark(
        none = none$set(key = "x", value = data, use_cache = FALSE),
        gzip = gzip$set(key = "x", value = data, use_cache = FALSE),
        lz4 = lz4$set(key = "x", value = data, use_cache = FALSE),
        zstd = zstd$set(key = "x", value = data, use_cache = FALSE),
        times = floor(100 ^ (1 / scale))
    ) %>%
        mutate(op = "set")
  get <- microbenchmark(
    none = none$get(key = "x", use_cache = FALSE),
    gzip = gzip$get(key = "x", use_cache = FALSE),
    lz4 = lz4$get(key = "x", use_cache = FALSE),
    zstd = zstd$get(key = "x", use_cache = FALSE),
    times = floor(100 ^ (1 / scale))
  ) %>%
    mutate(op = "get")
    new_results <- bind_rows(get, set) %>%
        mutate(size = as.numeric(object_size(data)))
    results <- bind_rows(new_results, results)
}
results <- as_tibble(results) %>%
    mutate(
        algo = as.character(expr),
        time = time / 1e9
    ) %>%
    select(-expr) %>%
    group_by(algo, op, size) %>%
    summarize(time = mean(time), reps = n())

ggplot(results) +
    geom_line(aes(x = size, y = time, group = algo, color = algo)) +
    facet_grid(op ~ ., scales = "free_y") +
    xlab("Bytes") +
    ylab("Mean seconds") +
    labs(color = "storr operation") +
    theme_gray(16)


ggplot(results) +
    geom_line(aes(x = size, y = time, group = algo, color = algo)) +
    facet_grid(op ~ ., scales = "free_y") +
    xlab("Log bytes") +
    ylab("Log mean seconds") +
    scale_x_log10() +
    scale_y_log10() +
    labs(color = "storr operation") +
    theme_gray(16)


results %>%
   spread("algo", "time") %>%
    arrange(op, size) %>%
    select(reps, size, none, gzip, lz4, zstd) %>%
    gt() %>%
      fmt_number(columns = vars(none, gzip, lz4, zstd), decimals = 3) %>%
      fmt_scientific(column = vars(size))
#> Adding missing grouping variables: `op`
reps size none gzip lz4 zstd
get
100 1.76 × 102 0.000 0.000 0.000 0.000
10 8.48 × 102 0.000 0.000 0.000 0.000
4 8.05 × 103 0.000 0.000 0.000 0.000
3 8.00 × 104 0.001 0.001 0.001 0.001
2 8.00 × 105 0.004 0.008 0.005 0.006
2 8.00 × 106 0.020 0.051 0.031 0.046
1 8.00 × 107 0.180 0.443 0.309 0.441
1 8.00 × 108 2.061 4.546 2.848 4.605
set
100 1.76 × 102 0.004 0.004 0.004 0.004
10 8.48 × 102 0.005 0.005 0.005 0.004
4 8.05 × 103 0.006 0.006 0.007 0.005
3 8.00 × 104 0.006 0.008 0.006 0.009
2 8.00 × 105 0.013 0.055 0.014 0.054
2 8.00 × 106 0.075 0.456 0.061 0.456
1 8.00 × 107 0.722 8.892 0.793 8.798
1 8.00 × 108 6.322 87.454 6.509 86.731

Created on 2019-06-20 by the reprex package (v0.3.0)

wlandau commented 5 years ago

One problem with this approach is that we have 3 copies of the data in memory at the same time.

  1. The original data.
  2. The serialized raw vector.
  3. The compressed serialized raw vector.

@MarcusKlik, does there exist a way to compress and write to a connection simultaneously, as with saveRDS(compress = TRUE) or gzfile()? If not, would you be open to the idea in compress_fst() itself?

MarcusKlik commented 5 years ago

Hi @wlandau, in general, if you want to compress and store in a single step, you will need some existing function that uses R's C API to accomplish that or write one yourself. Any step that calls two distinct methods (from R) for compressing and storing will need two buffers...

In write_fst(), the input data is split into blocks that are processed one at a time. Therefore, only a limited amount of buffer memory is required to compress and store that block and the user won't need much additional memory to store existing vectors (it looks like saveRDS is doing something similar).

Method compress_fst is also using blocks to compress, but those are much larger than the blocks in write_fst(). Perhaps an extra argument (connection) could be provided to that method to output the results of those (larger) buffers to a connection instead of the main memory. So much like R's serialize() method?

wlandau commented 5 years ago

Perhaps an extra argument (connection) could be provided to that method to output the results of those (larger) buffers to a connection instead of the main memory. So much like R's serialize() method?

This is exactly what I had in mind. How much of a difference in memory usage do you think it will make? How much does the compression level affect block size? It looks like blocks cam be as large as 0.8 * INT_MAX, which on my machine is about 1.7 GB. Do blocks get that large in practice for large data?

MarcusKlik commented 5 years ago

Hi @wlandau, yes indeed, the blocksize is set using this code and will be in between 16 kB and 0.8 * INT_MAX in size. The latter only occurs if the original raw vector length is larger than 48 times the block size (about 82 GB).

The thing is that because compress_fst() uses multiple threads, the output buffer needs to be allocated for each thread. So, in practice, an amount of memory is allocated that can be several times the blocksize.

To avoid large allocations, perhaps yet another argument will be useful to allow the user to set the maximum block size to use, e.g. max_buffer = 1e8. Or maybe using a factor of the original vector length: max_buffer_ratio = 0.5.

With the two arguments combined (connection and max_buffer_ratio) you would be able to set the algorithm to use very little overhead in terms of RAM usage, what do you think?

wlandau commented 5 years ago

The thing is that because compress_fst() uses multiple threads, the output buffer needs to be allocated for each thread. So, in practice, an amount of memory is allocated that can be several times the blocksize.

I think I should document this tradeoff. Commits forthcoming.

With the two arguments combined (connection and max_buffer_ratio) you would be able to set the algorithm to use very little overhead in terms of RAM usage, what do you think?

I love that idea. I think it would require some work to understand the tradeoff between max_buffer_ratio and speed, but I think it could still help a lot. I am curious to know what @richfitz thinks (though right now he is in Toulouse for useR 2019.

MarcusKlik commented 5 years ago

ah, lucky guy :-).

In the zst format specification, it is stated that the internally used buffer can be up to 128 kB (but not larger). Having blocks that are orders of magnitude larger probably doesn't help compression too much (the size of the block is much larger than the sliding window anyway). I suspect that any block size larger than a few tens of MB won't help compression too much. Switching to a new block will lose the compressors history about previous elements however, and that switch will incur a cost in speed.

So, it might be a good idea to significantly reduce the block sizes used in compress_fst(). It will lower the RAM usage and if the blocks are still large enough, we will have the maximum possible compression anyway. Also, the threads will be balanced better when the compression is more difficult for some parts of the data.

Thanks for discussing your use case!

wlandau commented 5 years ago

Excellent! Thank you for being open to these improvements.

wlandau commented 5 years ago

When I discussed with @richfitz earlier today, he suggested that:

  1. We wait for https://github.com/fstpackage/fst/issues/204 and https://github.com/fstpackage/fst/issues/205, and
  2. We decouple the compression options (algorithm, compression level, buffer size, etc.) from the driver API.
richfitz commented 5 years ago

Thanks @wlandau - I'll take a shot at an abstraction layer for this once I see what the connection interface to the fst compression looks like. I think once that's done we can come up with something here that gets the performance improvements you're wanting/seeing without creating an interface that's too unweildly

wlandau commented 5 years ago

Awesome! Thank you for the support.

wlandau commented 5 years ago

Just realized I had a typo in the code for the zstd benchmarks from https://github.com/richfitz/storr/pull/111#issuecomment-504042967. Updated results, now including the compression level:

library(microbenchmark)
library(pryr)
#> Registered S3 method overwritten by 'pryr':
#>   method      from
#>   print.bytes Rcpp
library(storr)
library(tidyverse)

none_0 <- storr_rds(tempfile())
lz4_0 <- storr_rds(tempfile(), "lz4", 0)
lz4_25 <- storr_rds(tempfile(), "lz4", 25)
lz4_50 <- storr_rds(tempfile(), "lz4", 50)
lz4_75 <- storr_rds(tempfile(), "lz4", 75)
lz4_100 <- storr_rds(tempfile(), "lz4", 100)
zstd_0 <- storr_rds(tempfile(), "zstd", 0)
zstd_25 <- storr_rds(tempfile(), "zstd", 25)
zstd_50 <- storr_rds(tempfile(), "zstd", 50)
zstd_75 <- storr_rds(tempfile(), "zstd", 75)
zstd_100 <- storr_rds(tempfile(), "zstd", 100)

results <- NULL
for (scale in seq_len(8)) {
  data <- runif(10 ^ scale)
  set <- microbenchmark(
    none_0 = none_0$set(key = "x", value = data, use_cache = FALSE),
    lz4_0 = lz4_0$set(key = "x", value = data, use_cache = FALSE),
    lz4_25 = lz4_25$set(key = "x", value = data, use_cache = FALSE),
    lz4_50 = lz4_50$set(key = "x", value = data, use_cache = FALSE),
    lz4_75 = lz4_75$set(key = "x", value = data, use_cache = FALSE),
    lz4_100 = lz4_100$set(key = "x", value = data, use_cache = FALSE),
    zstd_0 = zstd_0$set(key = "x", value = data, use_cache = FALSE),
    zstd_25 = zstd_25$set(key = "x", value = data, use_cache = FALSE),
    zstd_50 = zstd_50$set(key = "x", value = data, use_cache = FALSE),
    zstd_75 = zstd_75$set(key = "x", value = data, use_cache = FALSE),
    zstd_100 = zstd_100$set(key = "x", value = data, use_cache = FALSE),
    times = floor(100 ^ (1 / scale))
  ) %>%
    mutate(op = "set")
  get <- microbenchmark(
    none_0 = none_0$get(key = "x", use_cache = FALSE),
    lz4_0 = lz4_0$get(key = "x", use_cache = FALSE),
    lz4_25 = lz4_25$get(key = "x", use_cache = FALSE),
    lz4_50 = lz4_50$get(key = "x", use_cache = FALSE),
    lz4_75 = lz4_75$get(key = "x", use_cache = FALSE),
    lz4_100 = lz4_100$get(key = "x", use_cache = FALSE),
    zstd_0 = zstd_0$get(key = "x", use_cache = FALSE),
    zstd_25 = zstd_25$get(key = "x", use_cache = FALSE),
    zstd_50 = zstd_50$get(key = "x", use_cache = FALSE),
    zstd_75 = zstd_75$get(key = "x", use_cache = FALSE),
    zstd_100 = zstd_100$get(key = "x", use_cache = FALSE),
    times = floor(100 ^ (1 / scale))
  ) %>%
    mutate(op = "get")
  new_results <- bind_rows(get, set) %>%
    mutate(size = as.numeric(object_size(data)))
  results <- bind_rows(new_results, results)
}
results <- as_tibble(results) %>%
  mutate(
    algo = ordered(
      gsub("_.*$", "", as.character(expr)),
      levels = c("lz4", "zstd", "none")
    ),
    compression = ordered(
      as.integer(gsub("^.*_", "", as.character(expr)))
    ),
    time = time / 1e9
  ) %>%
  select(-expr) %>%
  group_by(algo, compression, op, size) %>%
  summarize(time = mean(time), reps = n())

ggplot(results) +
  geom_line(
    aes(
      x = size,
      y = time,
      group = compression,
      linetype = compression,
      color = compression
    )
  ) +
  facet_grid(op ~ algo, scales = "free_y", labeller = label_both) +
  xlab("Bytes") +
  ylab("Mean seconds") +
  theme_gray(16) +
  scale_color_brewer(palette = "Set1")

Created on 2019-08-03 by the reprex package (v0.3.0)

wlandau commented 5 years ago

From https://github.com/ropensci/drake/issues/907#issuecomment-517934993, this PR does not quite achieve the efficiency of write_fst(). I still think #111 has value for the general case, but drake would also benefit from a decorated storr: https://github.com/ropensci/drake/issues/971#issuecomment-517971052