apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.52k stars 3.54k forks source link

[R] cannot mutate into a hive-partition key #44309

Open r2evans opened 1 month ago

r2evans commented 1 month ago

Describe the bug, including details regarding any error messages, version, and platform.

There are use-cases where I cannot use the "normal" filtering with hive-partitioning and must open_dataset directly on a subdir. Using add_filename(), it is in general not difficult to infer the string value of the key variable, but ... I cannot assign it directly in a lazy mutate.

td <- tempfile(fileext=".d")
dir.create(td)
arrow::write_dataset(mtcars, td, partitioning="cyl")
list.files(td, recursive = TRUE, full.names = TRUE)
# [1] "/home/r2/tmp/RtmpsoiQjF/file640d75e7cb0d4.d/cyl=4/part-0.parquet" "/home/r2/tmp/RtmpsoiQjF/file640d75e7cb0d4.d/cyl=6/part-0.parquet"
# [3] "/home/r2/tmp/RtmpsoiQjF/file640d75e7cb0d4.d/cyl=8/part-0.parquet"

arrow::open_dataset(file.path(td, "cyl=6")) |>
  mutate(
    .fn = add_filename(),
    cyl = sub(".*/cyl=([^/]*)/.*", "\\1", .fn),
    cyl = if_else(cyl == .fn, NA_character_, cyl)
  ) |>
  collect()
# Error in compute.arrow_dplyr_query(x) : ℹ In index: 2.
# Caused by error in `x$Equals()`:
# ! attempt to apply non-function

arrow::open_dataset(file.path(td, "cyl=6")) |>
  mutate(
    .fn = add_filename(),
    .cyl = sub(".*/cyl=([^/]*)/.*", "\\1", .fn),
    .cyl = if_else(.cyl == .fn, NA_character_, .cyl)
  ) |>
  collect()
# # A tibble: 7 × 12
#     mpg  disp    hp  drat    wt  qsec    vs    am  gear  carb .fn                                                              .cyl 
#   <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <chr>                                                            <chr>
# 1  21    160    110  3.9   2.62  16.5     0     1     4     4 /home/r2/tmp/RtmpsoiQjF/file640d75e7cb0d4.d/cyl=6/part-0.parquet 6    
# 2  21    160    110  3.9   2.88  17.0     0     1     4     4 /home/r2/tmp/RtmpsoiQjF/file640d75e7cb0d4.d/cyl=6/part-0.parquet 6    
# 3  21.4  258    110  3.08  3.22  19.4     1     0     3     1 /home/r2/tmp/RtmpsoiQjF/file640d75e7cb0d4.d/cyl=6/part-0.parquet 6    
# 4  18.1  225    105  2.76  3.46  20.2     1     0     3     1 /home/r2/tmp/RtmpsoiQjF/file640d75e7cb0d4.d/cyl=6/part-0.parquet 6    
# 5  19.2  168.   123  3.92  3.44  18.3     1     0     4     4 /home/r2/tmp/RtmpsoiQjF/file640d75e7cb0d4.d/cyl=6/part-0.parquet 6    
# 6  17.8  168.   123  3.92  3.44  18.9     1     0     4     4 /home/r2/tmp/RtmpsoiQjF/file640d75e7cb0d4.d/cyl=6/part-0.parquet 6    
# 7  19.7  145    175  3.62  2.77  15.5     0     1     5     6 /home/r2/tmp/RtmpsoiQjF/file640d75e7cb0d4.d/cyl=6/part-0.parquet 6    

The workaround is to rename it, but ... if I need to do this renaming and still return a lazy table, then this is a hard-fail.

The question then could be an either/or question. Is it possible to:

  1. Directly return the key value in the data when opening a directory that is using hive partitioning? Or ...
  2. Assign to a variable that is never seen or available in the object or data anyway.

The fact that in this case cyl can be found nowhere in the attributes confounded me for a long time until I realized it was the key itself (and not the literal cyl).

(For the record, it is not an option to return the filename and expect the caller to do the parsing, nor is it good to have a renamed keyfield in the data.)

Thanks!

Component(s)

R

r2evans commented 1 month ago

I think one of the more confusing parts of this is that if I give that part-0.parquet file to somebody else hoping that they can work with the data directly and lazily (e.g., contracting, bug-reports, etc), then there is no way to know superficially what if any names are restricted. And not knowing that this could happen makes that error very confusing.

r2evans commented 1 month ago

Here's a HACK, no idea why it works:

arr <- arrow::open_dataset(file.path(td, "cyl=6"))
arr |>
  mutate(
    .fn = add_filename(),
    .cyl = sub(".*/cyl=([^/]*)/.*", "\\1", .fn),
    .cyl = if_else(.cyl == .fn, NA_character_, .cyl)
  ) |>
  slice_head(n = nrow(arr)) |>
  rename(cyl = .cyl) |>
  collect()

The presence of slice_head(.) before rename(.) allows it to work (but not if before the mutate).

Unfortunately, retrieving nrow() before collect() now returns NA.

arr <- arrow::open_dataset(file.path(td, "cyl=6"))
nrow(arr)
# [1] 7

arr |>
  mutate(
    .fn = add_filename(),
    .cyl = sub(".*/cyl=([^/]*)/.*", "\\1", .fn),
    .cyl = if_else(.cyl == .fn, NA_character_, .cyl)
  ) |>
  slice_head(n = nrow(arr)) |>
  rename(cyl = .cyl) |>
  nrow()
# [1] NA
r2evans commented 1 month ago

I found a better workaround: removing it from the $schema$metadata$r$columns before attempting anything.

I don't know if this is intentional, or if the presence of keys in $r$columns and not in the data itself is an accidental inconsistency. If the former, I'm curious but I assume it's for a good reason. If the latter ... it's a bug?

arr <- arrow::open_dataset(file.path(td, "cyl=6"))
arr$schema$metadata$r$columns$cyl <- NULL
arr |>
  mutate(
    .fn = add_filename(),
    cyl = sub(".*/cyl=([^/]*)/.*", "\\1", .fn),
    cyl = if_else(cyl == .fn, NA_character_, cyl)
  ) |>
  select(-.fn) |>
  collect()
# # A tibble: 7 × 11
#     mpg  disp    hp  drat    wt  qsec    vs    am  gear  carb cyl  
#   <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <chr>
# 1  21    160    110  3.9   2.62  16.5     0     1     4     4 6    
# 2  21    160    110  3.9   2.88  17.0     0     1     4     4 6    
# 3  21.4  258    110  3.08  3.22  19.4     1     0     3     1 6    
# 4  18.1  225    105  2.76  3.46  20.2     1     0     3     1 6    
# 5  19.2  168.   123  3.92  3.44  18.3     1     0     4     4 6    
# 6  17.8  168.   123  3.92  3.44  18.9     1     0     4     4 6    
# 7  19.7  145    175  3.62  2.77  15.5     0     1     5     6 6    

To generalize this (since I'm not sharing mtcars :-), I wonder if there are unintended consequences of doing this:

arr <- arrow::open_dataset(file.path(td, "cyl=6"))
arr$schema$metadata$r$columns <- arr$schema$metadata$r$columns[ 
  intersect(names(arr$schema$metadata$r$columns), names(arr)) ]

Or perhaps even more tersely:

arr <- arrow::open_dataset(file.path(td, "cyl=6"))
arr$schema$metadata$r$columns <- arr$schema$metadata$r$columns[ names(arr) ]
amoeba commented 3 weeks ago

This reminds me of https://github.com/apache/arrow/issues/38724. Doing something different than what's currently done will require some work and may impact Arrow C++ and PyArrow.

At a minimum, I bet we could improve this error message,

# Error in compute.arrow_dplyr_query(x) : ℹ In index: 2.
# Caused by error in `x$Equals()`:
# ! attempt to apply non-function
r2evans commented 3 weeks ago

Yes, my issue here appears to be a dupe of #38724 and #41146. I'm glad to discover I'm not the only (or first) one to have this question, though I'm dismayed to see no obvious progress since the first opened 11 months ago.

For the record, I using the following to get what I "need":

arrow::register_scalar_function(
  "extract_keyfield_from_path",
  function(context, filenames, key) {
    uniqfn <- unique(filenames)
    val <- setNames(sub(paste0(".*/", key, "=([^/]*)/.*"), "\\1", uniqfn, ignore.case = TRUE), uniqfn)
    val[val == uniqfn] <- NA
    unname(val[filenames])
  },
  in_type = arrow::schema(filenames = arrow::string(), key = arrow::string()),
  out_type = arrow::string(),
  auto_convert = TRUE
)

arr <- arrow::open_dataset(file.path(td, "cyl=4"))
need_keys <- setdiff(names(arr$schema$metadata$r$columns), names(arr)) # can be 0+
arr$schema$metadata$r$columns <- arr$schema$metadata$r$columns[ names(arr) ]
arr <- mutate(arr, fn = add_filename())
arr <- Reduce(function(ar, ky) mutate(ar, !!as.symbol(ky) := extract_keyfield_from_path(fn, ky)), need_keys, init = arr)
arr |>
  select(-fn) |>
  head(n=2) |>
  collect()
#      mpg  disp    hp  drat    wt  qsec    vs    am  gear  carb    cyl
#    <num> <num> <num> <num> <num> <num> <num> <num> <num> <num> <char>
# 1:  22.8 108.0    93  3.85  2.32 18.61     1     1     4     1      4
# 2:  24.4 146.7    62  3.69  3.19 20.00     1     0     4     2      4

While far from herculean, the use of regex is overkill and fragile (it assumes hive-style, for instance).

Since the hive-partitioning is known at write time, and the key columns are already stored in the schema/metadata, would it make sense to store the actual value in the schema if not in the data? Looking at the schema metadata, while $r is the only thing in arr$schema$metadata, what if instead the metadata looked like this:

str(arr$schema$metadata)
# List of 2
#  $ partitions:List of 1
#   ..$ columns:List of 1
#   .. ..$ cyl: chr "4"
#  $ r         :List of 2
#   ..$ attributes:List of 1
#   .. ..$ class: chr [1:2] "data.table" "data.frame"
#   ..$ columns   :List of 11
#   .. ..$ mpg : NULL
#   .. ..$ cyl : NULL
#   .. ..$ disp: NULL
#   .. ..$ hp  : NULL
# ...

or in the case of non-hive partitioning (which I don't use, so I'm speculating here):

str(arr$schema$metadata)
# List of 2
#  $ partitions:List of 1
#   ..$ columns:List of 1
#   .. ..$ : chr "4"
#  $ r         :List of 2
#   ..$ attributes:List of 1
#   .. ..$ class: chr [1:2] "data.table" "data.frame"
#   ..$ columns   :List of 11
#   .. ..$ mpg : NULL
#   .. ..$ cyl : NULL
#   .. ..$ disp: NULL
# ...

In this way, I'd think it'd be feasible to use this in other languages as well. Since it would be a breaking change, I suggest one could "opt-in" to this feature. While "how" that is done is left up to the architects, I don't think it's hard at all. And it knocks out three issues for the price of one ;-)

I know it's not that simple, I don't know if other languages write attributes the same as $r$columns.