apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.49k stars 3.52k forks source link

[R] Default write_dataset min_rows_per_group parameter, 1L, can lead to very bad performance (time and memory) : #41057

Open nbc opened 6 months ago

nbc commented 6 months ago

Describe the enhancement requested

I'm not sure it's a request or a bug but when using write_dataset, the resulting dataset can have very small rows_per_group resulting in very bad performance for many queries : at least 20 time slower and 20 time more memory for my dataset of 10GB.

Setting the min_rows_per_group to something around 100000L fixes the problem.

Users are not all aware of min_rows_per_group parameter so setting a "good" default (if it exists) could help them very much.

I'm not qualified enough to know if there's drawbacks.

Component(s)

R

ZhangHuiGui commented 6 months ago

Could you provide a benchmark to reproduce this problem? It's convenient for others to find the problem.

nbc commented 6 months ago

Working on it

nbc commented 6 months ago

To reproduce this problem, you can use code below.

library(arrow)
library(dplyr)

download.file("https://static.data.gouv.fr/resources/base-sirene-des-entreprises-et-de-leurs-etablissements-siren-siret/20231214-130548/stocketablissement-utf8.parquet", "stocketablissement-utf8.parquet")

# create a partitioned dataset with default min_rows_per_group
open_dataset("stocketablissement-utf8.parquet") |>
  mutate(code = str_sub(activitePrincipaleEtablissement, 1L, 2L)) |>
  write_dataset("stock_ds", partitioning = "code")

# create a partitioned dataset with min_rows_per_group at 32000L
open_dataset("stocketablissement-utf8.parquet") |>
  mutate(code = str_sub(activitePrincipaleEtablissement, 1L, 2L)) |>
  write_dataset("stock_ds_32000", partitioning = "code", min_rows_per_group = 32000L)

I check duration and memory usage for this request :

library(tictoc)

tic()
open_dataset("stocketablissement-utf8.parquet") |>
  filter(dateCreationEtablissement < "2018-02-07" & dateDebut > "2020-02-02") |>
  count() |>
  collect()
toc()
tic()
open_dataset("stock_ds") |>
  filter(dateCreationEtablissement < "2018-02-07" & dateDebut > "2020-02-02") |>
  count() |>
  collect()
toc()
tic()
open_dataset("stock_ds_32000") |>
  filter(dateCreationEtablissement < "2018-02-07" & dateDebut > "2020-02-02") |>
  count() |>
  collect()
toc()

The result is :

 source                                 duration max_mem
1 stocketablissement-utf8.parquet        3.14    2.17
2 stock_ds                              15.9     6.54
3 stock_ds_32000                         3.38    2.83

And the problem is worse when file size grows (stocketablissement-utf8.parquet is 1,2GB for 37 millions of rows)

mapleFU commented 6 months ago

I guess that's because a small RecordBatch would been written, causing a small rowgroup. Are you storing on Local fs or S3?

nbc commented 6 months ago

I store on local fs.

I thought it was related to R arrow but it's not. I've just tried the same partitioning with python and it gave the same result : very low mean row group size (around 730) :

Source file : https://static.data.gouv.fr/resources/base-sirene-des-entreprises-et-de-leurs-etablissements-siren-siret/20231214-130548/stocketablissement-utf8.parquet

import pyarrow.dataset as ds
import pyarrow as pa

ds.write_dataset(ds.dataset("stocketablissement-utf8.parquet"),
                 base_dir='stock_py_32',
                 format='parquet',
                 partitioning = ds.partitioning(
                     schema=pa.schema([('typeVoieEtablissement', pa.utf8())]),
                     flavor='hive'
                 ))
> parquet_metadata("stocketablissement-utf8.parquet") |> summarise(mean = mean(row_group_num_rows), min = min(row_group_num_rows), max = max(row_group_num_rows))
# A tibble: 1 × 3
     mean   min    max
    <dbl> <dbl>  <dbl>
1 123753. 79292 124927

> parquet_metadata("stock_py") |> summarise(mean = mean(row_group_num_rows), min = min(row_group_num_rows), max = max(row_group_num_rows))
# A tibble: 1 × 3
   mean   min   max
  <dbl> <dbl> <dbl>
1  727.     1 22805
mapleFU commented 6 months ago

it's related to parquet-writer in underlying C++ impl, also why a "1" rowgroup size is used here?

nbc commented 6 months ago

Absolutely no clue

To be complete, the parquet_metadata function I use to get metadata/row group size (sorry, not pyarrow/arrow fluent enough to get this done with arrow) :

parquet_metadata <- function(parquet) {
  if (dir.exists(parquet)) {
    parquet <- file.path(parquet, '**', '*.parquet')
  } else if (file.exists(parquet)) {
  } else {
    stop("parquet doit être un fichier (parquet) ou un répertoire (dataset)")
  }

  tryCatch(
    {
      con <- DBI::dbConnect(duckdb::duckdb())
      DBI::dbGetQuery(con, glue::glue("SELECT * EXCLUDE(key_value_metadata) FROM parquet_metadata('{parquet}')")) |>
        tibble::as_tibble()
    },
    finally = {
      DBI::dbDisconnect(con, shutdown = TRUE)
    }
  )
}
mapleFU commented 6 months ago

row_group_size == 1 might means not buffering data and write the file by row. It's expected to be a bit slow but I may need test it to find out where it spending its time