hubverse-org / hubData

Tools for accessing and working with hubverse Hub data
https://hubverse-org.github.io/hubData/
Other
3 stars 4 forks source link

Issues with filtering data from UnionDatasets/FileSystemDatasets when actual type_id data types in files are mixed #9

Open annakrystalli opened 1 year ago

annakrystalli commented 1 year ago

I'm having issues with filtering data from arrow Datasets on columns where the data type in the underlying source data files is mixed. In particular, the type_id column contains double data in some files and character data in others.

I've written a reprex to seek outside advise on this. The reprex contains stripped down source code of functions used within connect_hub() but overall describes the workflow accurately.

Data used in this example can be reviewed here (in the forecasts/ directory): https://github.com/Infectious-Disease-Modeling-Hubs/hubUtils/tree/schema-from-config/inst/testhubs/flusight

# remotes::install_github("Infectious-Disease-Modeling-Hubs/hubUtils",
#                         dependencies = TRUE, ref = "schema-from-config")

#### --- FUNCTIONS ----
# Open single FileSystemDatasets dataset for a given file_format
open_hub_dataset <- function(model_output_dir,
                             file_format = c("csv", "parquet", "arrow"),
                             config_tasks) {
    file_format <- rlang::arg_match(file_format)
    schema <- create_hub_schema(config_tasks, format = file_format)

    switch(file_format,
           csv = arrow::open_dataset(
               model_output_dir,
               format = "csv",
               partitioning = "model",
               col_types = schema,
               unify_schemas = TRUE,
               strings_can_be_null = TRUE,
               factory_options = list(exclude_invalid_files = TRUE)
           ),
           parquet = arrow::open_dataset(
               model_output_dir,
               format = "parquet",
               partitioning = "model",
               schema = schema,
               unify_schemas = TRUE,
               factory_options = list(exclude_invalid_files = TRUE)
           ),
           arrow = arrow::open_dataset(
               model_output_dir,
               format = "arrow",
               partitioning = "model",
               schema = schema,
               unify_schemas = TRUE,
               factory_options = list(exclude_invalid_files = TRUE)
           )
    )
}

# Open multiple FileSystemDatasets and combine into UnionDataset
open_hub_datasets <- function(model_output_dir,
                              file_format = c("csv", "parquet", "arrow"),
                              config_tasks) {
    if (length(file_format) == 1L) {
        open_hub_dataset(
            model_output_dir,
            file_format,
            config_tasks
        )
    } else {
        cons <- purrr::map(
            file_format,
            ~ open_hub_dataset(
                model_output_dir,
                .x,
                config_tasks
            )
        )

        # Remove connections with 0 files in model-output data
        cons[
            purrr::map_lgl(
                cons,
                ~ length(.x$files) == 0L)
        ] <- NULL

        arrow::open_dataset(cons)
    }
}
library(hubUtils)
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
hub_path <- system.file("testhubs/flusight", package = "hubUtils")
model_output_dir <- system.file("testhubs/flusight/forecasts", package = "hubUtils")

config_tasks <- hubUtils::read_config(hub_path, "tasks")
file_format <- hubUtils::read_config(hub_path, "admin")$file_format

# When opening individual datasets, a unifying schema is determined from the config
# file `config_tasks`. This is file_format specific.
hubUtils::create_hub_schema(config_tasks)
#> Schema
#> forecast_date: date32[day]
#> target: string
#> horizon: int32
#> location: string
#> type: string
#> type_id: string
#> value: double

hub_con <- open_hub_datasets(model_output_dir,
                  file_format,
                  config_tasks)

# For context, the type_id column contains double values in some files and character
# values in others. Hence create_hub_schema() coerces the column to character.
# This works when just collecting data and the type_id column is indeed returned as character
hub_con
#> UnionDataset
#> forecast_date: date32[day]
#> horizon: int32
#> target: string
#> location: string
#> type: string
#> type_id: string
#> value: double
#> model: string

hub_con %>%
    collect()
#> # A tibble: 292 × 8
#>    forecast_date horizon target               location type  type_id value model
#>    <date>          <int> <chr>                <chr>    <chr> <chr>   <dbl> <chr>
#>  1 2023-04-24          1 wk ahead inc flu ho… US       mean  <NA>     1033 base…
#>  2 2023-04-24          2 wk ahead inc flu ho… US       mean  <NA>     1033 base…
#>  3 2023-04-24          1 wk ahead inc flu ho… US       quan… 0.01        0 base…
#>  4 2023-04-24          1 wk ahead inc flu ho… US       quan… 0.025       0 base…
#>  5 2023-04-24          1 wk ahead inc flu ho… US       quan… 0.05        0 base…
#>  6 2023-04-24          1 wk ahead inc flu ho… US       quan… 0.1       281 base…
#>  7 2023-04-24          1 wk ahead inc flu ho… US       quan… 0.15      600 base…
#>  8 2023-04-24          1 wk ahead inc flu ho… US       quan… 0.2       717 base…
#>  9 2023-04-24          1 wk ahead inc flu ho… US       quan… 0.25      817 base…
#> 10 2023-04-24          1 wk ahead inc flu ho… US       quan… 0.3       877 base…
#> # ℹ 282 more rows

# It also works when filtering on columns which have consistent data_types in the
# source data, e.g.
hub_con %>%
    filter(type == "quantile") %>%
    collect()
#> # A tibble: 276 × 8
#>    forecast_date horizon target               location type  type_id value model
#>    <date>          <int> <chr>                <chr>    <chr> <chr>   <dbl> <chr>
#>  1 2023-05-01          1 wk ahead inc flu ho… US       quan… 0.01        0 base…
#>  2 2023-05-01          1 wk ahead inc flu ho… US       quan… 0.025       0 base…
#>  3 2023-05-01          1 wk ahead inc flu ho… US       quan… 0.05        0 base…
#>  4 2023-05-01          1 wk ahead inc flu ho… US       quan… 0.1       193 base…
#>  5 2023-05-01          1 wk ahead inc flu ho… US       quan… 0.15      495 base…
#>  6 2023-05-01          1 wk ahead inc flu ho… US       quan… 0.2       618 base…
#>  7 2023-05-01          1 wk ahead inc flu ho… US       quan… 0.25      717 base…
#>  8 2023-05-01          1 wk ahead inc flu ho… US       quan… 0.3       774 base…
#>  9 2023-05-01          1 wk ahead inc flu ho… US       quan… 0.35      822 base…
#> 10 2023-05-01          1 wk ahead inc flu ho… US       quan… 0.4       857 base…
#> # ℹ 266 more rows

# However problems arise when any type of filtering on the type_id column is attempted:
hub_con %>%
    filter(type == "quantile", type_id == "0.5") %>%
    collect()
#> Error in `compute.arrow_dplyr_query()`:
#> ! NotImplemented: Function 'equal' has no kernel matching input types (string, double)
#> Backtrace:
#>      ▆
#>   1. ├─... %>% collect()
#>   2. ├─dplyr::collect(.)
#>   3. └─arrow:::collect.arrow_dplyr_query(.)
#>   4.   └─arrow:::compute.arrow_dplyr_query(x)
#>   5.     └─base::tryCatch(...)
#>   6.       └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>   7.         └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>   8.           └─value[[3L]](cond)
#>   9.             └─arrow:::augment_io_error_msg(e, call, schema = schema())
#>  10.               └─rlang::abort(msg, call = call)

# Filtering down to rows for which type_id will definintely be numeric and then trying
# to filter before collecting also doesn't work
hub_con %>%
    filter(type == "quantile") %>%
    mutate(type_id = as.double(type_id)) %>%
    filter(type_id == 0.5) %>%
    collect()
#> Error in `compute.arrow_dplyr_query()`:
#> ! Invalid: Failed to parse string: 'decrease' as a scalar of type double
#> Backtrace:
#>      ▆
#>   1. ├─... %>% collect()
#>   2. ├─dplyr::collect(.)
#>   3. └─arrow:::collect.arrow_dplyr_query(.)
#>   4.   └─arrow:::compute.arrow_dplyr_query(x)
#>   5.     └─base::tryCatch(...)
#>   6.       └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>   7.         └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>   8.           └─value[[3L]](cond)
#>   9.             └─arrow:::augment_io_error_msg(e, call, schema = schema())
#>  10.               └─rlang::abort(msg, call = call)

# The only way to get it to work is to collect first (including some filtering on
# consistent columns) and then filter/mutate-filter to get the data required.
hub_con %>%
    filter(type == "quantile") %>%
    collect() %>%
    filter(type_id == "0.5")
#> # A tibble: 12 × 8
#>    forecast_date horizon target               location type  type_id value model
#>    <date>          <int> <chr>                <chr>    <chr> <chr>   <dbl> <chr>
#>  1 2023-05-01          1 wk ahead inc flu ho… US       quan… 0.5      926  base…
#>  2 2023-05-01          2 wk ahead inc flu ho… US       quan… 0.5      926  base…
#>  3 2023-04-24          1 wk ahead inc flu ho… US       quan… 0.5     1033  base…
#>  4 2023-04-24          2 wk ahead inc flu ho… US       quan… 0.5     1033  base…
#>  5 2023-04-24          1 wk ahead inc flu ho… US       quan… 0.5      967. ense…
#>  6 2023-04-24          2 wk ahead inc flu ho… US       quan… 0.5      976. ense…
#>  7 2023-05-08          1 wk ahead inc flu ho… US       quan… 0.5      945  base…
#>  8 2023-05-08          2 wk ahead inc flu ho… US       quan… 0.5      945  base…
#>  9 2023-05-08          1 wk ahead inc flu ho… US       quan… 0.5      950. ense…
#> 10 2023-05-08          2 wk ahead inc flu ho… US       quan… 0.5      916. ense…
#> 11 2023-05-01          1 wk ahead inc flu ho… US       quan… 0.5      885. ense…
#> 12 2023-05-01          2 wk ahead inc flu ho… US       quan… 0.5      866. ense…

hub_con %>%
    filter(type == "quantile") %>%
    collect() %>%
    mutate(type_id = as.double(type_id)) %>%
    filter(type_id == 0.5)
#> # A tibble: 12 × 8
#>    forecast_date horizon target               location type  type_id value model
#>    <date>          <int> <chr>                <chr>    <chr>   <dbl> <dbl> <chr>
#>  1 2023-05-01          1 wk ahead inc flu ho… US       quan…     0.5  926  base…
#>  2 2023-05-01          2 wk ahead inc flu ho… US       quan…     0.5  926  base…
#>  3 2023-04-24          1 wk ahead inc flu ho… US       quan…     0.5 1033  base…
#>  4 2023-04-24          2 wk ahead inc flu ho… US       quan…     0.5 1033  base…
#>  5 2023-04-24          1 wk ahead inc flu ho… US       quan…     0.5  967. ense…
#>  6 2023-04-24          2 wk ahead inc flu ho… US       quan…     0.5  976. ense…
#>  7 2023-05-08          1 wk ahead inc flu ho… US       quan…     0.5  945  base…
#>  8 2023-05-08          2 wk ahead inc flu ho… US       quan…     0.5  945  base…
#>  9 2023-05-01          1 wk ahead inc flu ho… US       quan…     0.5  885. ense…
#> 10 2023-05-01          2 wk ahead inc flu ho… US       quan…     0.5  866. ense…
#> 11 2023-05-08          1 wk ahead inc flu ho… US       quan…     0.5  950. ense…
#> 12 2023-05-08          2 wk ahead inc flu ho… US       quan…     0.5  916. ense…

# While this may work ok for such a small toy dataset, this isn't ideal for the
# larger datasets we are designing the software for.

Created on 2023-05-17 with reprex v2.0.2

I'm trying to understand the purpose of providing a schema to the arrow::open_dataset() function.

It appears that it works on the way out, i.e. it converts the data to the data type specified in the schema when extracting data but does not appear to have any effect on the way in, i.e. when sending dplyr queries back to the dataset.

Is there a way to make queries aware of the trasformations specified in the schema?

If not, is there any plans to do so in arrow?

I do appreciate that taking into account the schema when sending queries is a lot more work that just using it when extracting data but it feels like a limitation that hampers the package's utility and the purpose of the schema.

To get around it in our own use case we would have to enforce certain data types when writing the data which is something we would ideally like to avoid. Any advise is greatly welcomed!

P.S. this behaviour is not a result of mixing file formats as it's also a problem when opening for example only csv files as the datasest has files with different type_id data types across different file.

annakrystalli commented 1 year ago

Feedback from seeking help on rOpenSci slack showed that in simplified cases, casting using a schema and then filtering of mixed data based on the schema data type works fine. I've been trying to work up the complexity ladder from the simplified example that works to the breaking behaviour described above.

So far no joy, cannot reproduce the errors encountered in my flusight hub example.

Here's what I've tried so far:

library(arrow)
#> 
#> Attaching package: 'arrow'
#> The following object is masked from 'package:utils':
#> 
#>     timestamp
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
tmpdir <- tempdir()
data.frame(a = 1:10) |>
    write_csv_arrow(sink = file.path(tmpdir, "int.csv"))

data.frame(a = letters[1:10]) |>
    write_csv_arrow(sink = file.path(tmpdir, "alpha.csv"))

ds1 <- open_dataset(file.path(tmpdir, "int.csv"),
                    format = "csv", schema = schema(a = string()))
ds2 <- open_dataset(file.path(tmpdir, "alpha.csv"),
                    format = "csv", schema = schema(a = string()))

open_dataset(list(ds1, ds2)) |> filter(a == "1") |>
    collect()
#> # A tibble: 1 × 1
#>   a    
#>   <chr>
#> 1 1

# Add NAs and parquet files
data.frame(a = c(NA, 1:10)) |>
    write_csv_arrow(sink = file.path(tmpdir, "int.csv"))

data.frame(a = c(NA, NA, seq(0, 1, by = 0.1))) |>
    write.csv(file.path(tmpdir, "dbl.csv"), row.names = FALSE)

data.frame(a = c(NA, NA, letters[1:10])) |>
    write_parquet(sink = file.path(tmpdir, "alpha.parquet"))

data.frame(a = c(NA, NA, letters[1:10])) |>
    write.csv(file.path(tmpdir, "alpha.csv"), row.names = FALSE)

ds1 <- open_dataset(file.path(tmpdir,  "int.csv"),
                    format = "csv", schema = schema(a = string()))
ds2 <- open_dataset(file.path(tmpdir, "alpha.parquet"),
                    format = "parquet", schema = schema(a = string()))
ds3 <- open_dataset(file.path(tmpdir, "dbl.csv"),
                    format = "csv", schema = schema(a = string()))
ds4 <- open_dataset(file.path(tmpdir, "alpha.csv"),
                    format = "csv", schema = schema(a = string()))

open_dataset(list(ds1, ds2)) |> filter(a == "1") |>
    collect()
#> # A tibble: 1 × 1
#>   a    
#>   <chr>
#> 1 1

open_dataset(list(ds1, ds2, ds3, ds4)) |> filter(a == "1") |>
    collect()
#> # A tibble: 2 × 1
#>   a    
#>   <chr>
#> 1 1    
#> 2 1

open_dataset(list(ds1, ds2, ds3, ds4)) |> filter(a == "0.5") |>
    collect()
#> # A tibble: 1 × 1
#>   a    
#>   <chr>
#> 1 0.5

# Use folder instead of filenames and exclude invalid files
csv <- open_dataset(tmpdir,
                    format = "csv", col_types = schema(a = string()),
                    strings_can_be_null = TRUE,
                    unify_schemas = TRUE,
                    factory_options = list(exclude_invalid_files = TRUE))
parquet <- open_dataset(tmpdir,
                    format = "parquet", schema = schema(a = string()),
                    unify_schemas = TRUE,
                    factory_options = list(exclude_invalid_files = TRUE))

open_dataset(list(csv, parquet)) |> filter(a == "0.5") |>
    collect()
#> # A tibble: 1 × 1
#>   a    
#>   <chr>
#> 1 0.5

open_dataset(list(csv, parquet)) |> filter(!is.na(a)) |>
    collect()
#> # A tibble: 41 × 1
#>    a    
#>    <chr>
#>  1 a    
#>  2 b    
#>  3 c    
#>  4 d    
#>  5 e    
#>  6 f    
#>  7 g    
#>  8 h    
#>  9 i    
#> 10 j    
#> # ℹ 31 more rows

# Add folder partitioning
library(fs)
tmpdir <- tempdir()
dir_create(tmpdir, "partition")
tmpdir <- path(tmpdir, "partition")
dir_create(tmpdir, "csv")
dir_create(tmpdir, "parquet")

data.frame(a = c(NA, 1:10)) |>
    write_csv_arrow(sink = file.path(tmpdir, "csv", "int.csv"))

data.frame(a = c(NA, NA, seq(0, 1, by = 0.1))) |>
    write.csv(file.path(tmpdir, "csv", "dbl.csv"), row.names = FALSE)

data.frame(a = c(NA, NA, letters[1:10])) |>
    write.csv(file.path(tmpdir, "csv", "alpha.csv"), row.names = FALSE)

data.frame(a = c(NA, NA, letters[1:10])) |>
    write_parquet(sink = file.path(tmpdir, "parquet", "alpha.parquet"))

data.frame(a = c(NA, 1:10)) |>
    write_parquet(sink = file.path(tmpdir, "parquet", "int.parquet"))

data.frame(a = c(NA, NA, seq(0, 1, by = 0.1))) |>
    write_parquet(file.path(tmpdir, "parquet", "dbl.parquet"))

csv <- open_dataset(tmpdir,
                    format = "csv", col_types = schema(a = string()),
                    strings_can_be_null = TRUE,
                    unify_schemas = TRUE,
                    factory_options = list(exclude_invalid_files = TRUE),
                    partitioning = "model")

parquet <- open_dataset(tmpdir,
                        format = "parquet",
                        schema = schema(a = string(),
                                        model = string()),
                        unify_schemas = TRUE,
                        factory_options = list(exclude_invalid_files = TRUE),
                        partitioning = "model")

open_dataset(list(csv, parquet)) |> filter(a == "0.5") |>
    collect()
#> # A tibble: 2 × 2
#>   a     model  
#>   <chr> <chr>  
#> 1 0.5   csv    
#> 2 0.5   parquet

open_dataset(list(csv, parquet)) |> filter(model == "csv", !is.na(a)) |>
    collect()
#> # A tibble: 31 × 2
#>    a     model
#>    <chr> <chr>
#>  1 0     csv  
#>  2 0.1   csv  
#>  3 0.2   csv  
#>  4 0.3   csv  
#>  5 0.4   csv  
#>  6 0.5   csv  
#>  7 0.6   csv  
#>  8 0.7   csv  
#>  9 0.8   csv  
#> 10 0.9   csv  
#> # ℹ 21 more rows
open_dataset(list(csv, parquet)) |> filter(model == "parquet", !is.na(a)) |>
    collect()
#> # A tibble: 31 × 2
#>    a                   model  
#>    <chr>               <chr>  
#>  1 0                   parquet
#>  2 0.1                 parquet
#>  3 0.2                 parquet
#>  4 0.30000000000000004 parquet
#>  5 0.4                 parquet
#>  6 0.5                 parquet
#>  7 0.6000000000000001  parquet
#>  8 0.7000000000000001  parquet
#>  9 0.8                 parquet
#> 10 0.9                 parquet
#> # ℹ 21 more rows
open_dataset(list(csv, parquet)) |> filter(model == "csv", a == "0.5") |>
    collect()
#> # A tibble: 1 × 2
#>   a     model
#>   <chr> <chr>
#> 1 0.5   csv
open_dataset(list(csv, parquet)) |> filter(model == "parquet", a == "0.5") |>
    collect()
#> # A tibble: 1 × 2
#>   a     model  
#>   <chr> <chr>  
#> 1 0.5   parquet

# Additional note:
# In contrast to casting using a schema when opening the dataset,
# while mutating/casting to a different data type within a query can
# be done just prior to collecting data
open_dataset(list(csv, parquet)) |>
    filter(!a %in% letters) |>
    mutate(a = as.double(a)) |>
    collect()
#> # A tibble: 51 × 2
#>        a model  
#>    <dbl> <chr>  
#>  1    NA csv    
#>  2    NA csv    
#>  3    NA parquet
#>  4    NA parquet
#>  5     1 csv    
#>  6     2 csv    
#>  7     3 csv    
#>  8     4 csv    
#>  9     5 csv    
#> 10     6 csv    
#> # ℹ 41 more rows

# it does not work when trying to filter immediately afterwards
# even though the first filtering clause ensures all
# data after that can be safely converted to double. It seems the second
# filter is being on rows that should have already been excluded by the first
# filter.
open_dataset(list(csv, parquet)) |>
    filter(!a %in% letters) |>
    mutate(a = as.double(a)) |>
    filter(a == 0.5) |>
    collect()
#> Error in `compute.arrow_dplyr_query()`:
#> ! Invalid: Failed to parse string: 'j' as a scalar of type double
#> Backtrace:
#>     ▆
#>  1. ├─dplyr::collect(...)
#>  2. └─arrow:::collect.arrow_dplyr_query(...)
#>  3.   └─arrow:::compute.arrow_dplyr_query(x)
#>  4.     └─base::tryCatch(...)
#>  5.       └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>  6.         └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>  7.           └─value[[3L]](cond)
#>  8.             └─arrow:::augment_io_error_msg(e, call, schema = schema())
#>  9.               └─rlang::abort(msg, call = call)

# Use unify_schemas = FALSE when opening individula datasets
csv <- open_dataset(tmpdir,
                    format = "csv", col_types = schema(a = string()),
                    strings_can_be_null = TRUE,
                    unify_schemas = FALSE,
                    factory_options = list(exclude_invalid_files = TRUE),
                    partitioning = "model")

parquet <- open_dataset(tmpdir,
                        format = "parquet",
                        schema = schema(a = string(),
                                        model = string()),
                        unify_schemas = FALSE,
                        factory_options = list(exclude_invalid_files = TRUE),
                        partitioning = "model")

open_dataset(list(csv, parquet)) |> filter(a == "0.5") |>
    collect()
#> # A tibble: 2 × 2
#>   a     model  
#>   <chr> <chr>  
#> 1 0.5   csv    
#> 2 0.5   parquet
open_dataset(list(csv, parquet)) |> filter(model == "csv", !is.na(a)) |>
    collect()
#> # A tibble: 31 × 2
#>    a     model
#>    <chr> <chr>
#>  1 0     csv  
#>  2 0.1   csv  
#>  3 0.2   csv  
#>  4 0.3   csv  
#>  5 0.4   csv  
#>  6 0.5   csv  
#>  7 0.6   csv  
#>  8 0.7   csv  
#>  9 0.8   csv  
#> 10 0.9   csv  
#> # ℹ 21 more rows
open_dataset(list(csv, parquet)) |> filter(model == "parquet", !is.na(a)) |>
    collect()
#> # A tibble: 31 × 2
#>    a     model  
#>    <chr> <chr>  
#>  1 a     parquet
#>  2 b     parquet
#>  3 c     parquet
#>  4 d     parquet
#>  5 e     parquet
#>  6 f     parquet
#>  7 g     parquet
#>  8 h     parquet
#>  9 i     parquet
#> 10 j     parquet
#> # ℹ 21 more rows
open_dataset(list(csv, parquet)) |> filter(model == "csv", a == "0.5") |>
    collect()
#> # A tibble: 1 × 2
#>   a     model
#>   <chr> <chr>
#> 1 0.5   csv
open_dataset(list(csv, parquet)) |> filter(model == "parquet", a == "0.5") |>
    collect()
#> # A tibble: 1 × 2
#>   a     model  
#>   <chr> <chr>  
#> 1 0.5   parquet

# Use extra column in one parquet file
data.frame(a = c(NA, NA, letters[1:10]), b = "extra-parquet-column") |>
    write_parquet(sink = file.path(tmpdir, "parquet", "alpha.parquet"))

csv <- open_dataset(tmpdir,
                    format = "csv", col_types = schema(a = string()),
                    strings_can_be_null = TRUE,
                    unify_schemas = FALSE,
                    factory_options = list(exclude_invalid_files = TRUE),
                    partitioning = "model")
parquet <- open_dataset(tmpdir,
                        format = "parquet",
                        schema = schema(a = string(),
                                        b = string(),
                                        model = string()),
                        unify_schemas = FALSE,
                        factory_options = list(exclude_invalid_files = TRUE),
                        partitioning = "model")

open_dataset(list(csv, parquet)) |> filter(a == "0.5") |>
    collect()
#> # A tibble: 2 × 3
#>   a     model   b    
#>   <chr> <chr>   <chr>
#> 1 0.5   csv     <NA> 
#> 2 0.5   parquet <NA>
open_dataset(list(csv, parquet)) |> filter(model == "csv", !is.na(a)) |>
    collect()
#> # A tibble: 31 × 3
#>    a     model b    
#>    <chr> <chr> <chr>
#>  1 0     csv   <NA> 
#>  2 0.1   csv   <NA> 
#>  3 0.2   csv   <NA> 
#>  4 0.3   csv   <NA> 
#>  5 0.4   csv   <NA> 
#>  6 0.5   csv   <NA> 
#>  7 0.6   csv   <NA> 
#>  8 0.7   csv   <NA> 
#>  9 0.8   csv   <NA> 
#> 10 0.9   csv   <NA> 
#> # ℹ 21 more rows
open_dataset(list(csv, parquet)) |> filter(model == "parquet", !is.na(a)) |>
    collect()
#> # A tibble: 31 × 3
#>    a                   model   b    
#>    <chr>               <chr>   <chr>
#>  1 0                   parquet <NA> 
#>  2 0.1                 parquet <NA> 
#>  3 0.2                 parquet <NA> 
#>  4 0.30000000000000004 parquet <NA> 
#>  5 0.4                 parquet <NA> 
#>  6 0.5                 parquet <NA> 
#>  7 0.6000000000000001  parquet <NA> 
#>  8 0.7000000000000001  parquet <NA> 
#>  9 0.8                 parquet <NA> 
#> 10 0.9                 parquet <NA> 
#> # ℹ 21 more rows
open_dataset(list(csv, parquet)) |> filter(model == "csv", a == "0.5") |>
    collect()
#> # A tibble: 1 × 3
#>   a     model b    
#>   <chr> <chr> <chr>
#> 1 0.5   csv   <NA>
open_dataset(list(csv, parquet)) |> filter(model == "parquet", a == "0.5") |>
    collect()
#> # A tibble: 1 × 3
#>   a     model   b    
#>   <chr> <chr>   <chr>
#> 1 0.5   parquet <NA>

# Use extra column in one csv file
data.frame(a = c(NA, NA, letters[1:10])) |>
    write_parquet(sink = file.path(tmpdir, "parquet", "alpha.parquet"))

data.frame(a = c(NA, NA, letters[1:10]),
           b = "extra-csv-column") |>
    write.csv(file.path(tmpdir, "csv", "alpha.csv"), row.names = FALSE)

csv <- open_dataset(tmpdir,
                    format = "csv",
                    col_types = schema(a = string(),
                                       b = string()),
                    strings_can_be_null = TRUE,
                    unify_schemas = FALSE,
                    factory_options = list(exclude_invalid_files = TRUE),
                    partitioning = "model")

parquet <- open_dataset(tmpdir,
                        format = "parquet",
                        schema = schema(a = string(),
                                        b = string(),
                                        model = string()),
                        unify_schemas = FALSE,
                        factory_options = list(exclude_invalid_files = TRUE),
                        partitioning = "model")

open_dataset(list(csv, parquet)) |> filter(a == "0.5") |>
    collect()
#> # A tibble: 2 × 3
#>   a     b     model  
#>   <chr> <chr> <chr>  
#> 1 0.5   <NA>  csv    
#> 2 0.5   <NA>  parquet
open_dataset(list(csv, parquet)) |> filter(model == "csv", !is.na(a)) |>
    collect()
#> # A tibble: 31 × 3
#>    a     b     model
#>    <chr> <chr> <chr>
#>  1 0     <NA>  csv  
#>  2 0.1   <NA>  csv  
#>  3 0.2   <NA>  csv  
#>  4 0.3   <NA>  csv  
#>  5 0.4   <NA>  csv  
#>  6 0.5   <NA>  csv  
#>  7 0.6   <NA>  csv  
#>  8 0.7   <NA>  csv  
#>  9 0.8   <NA>  csv  
#> 10 0.9   <NA>  csv  
#> # ℹ 21 more rows
open_dataset(list(csv, parquet)) |> filter(model == "parquet", !is.na(a)) |>
    collect()
#> # A tibble: 31 × 3
#>    a     b     model  
#>    <chr> <chr> <chr>  
#>  1 a     <NA>  parquet
#>  2 b     <NA>  parquet
#>  3 c     <NA>  parquet
#>  4 d     <NA>  parquet
#>  5 e     <NA>  parquet
#>  6 f     <NA>  parquet
#>  7 g     <NA>  parquet
#>  8 h     <NA>  parquet
#>  9 i     <NA>  parquet
#> 10 j     <NA>  parquet
#> # ℹ 21 more rows
open_dataset(list(csv, parquet)) |> filter(model == "csv", a == "0.5") |>
    collect()
#> # A tibble: 1 × 3
#>   a     b     model
#>   <chr> <chr> <chr>
#> 1 0.5   <NA>  csv
open_dataset(list(csv, parquet)) |> filter(model == "parquet", a == "0.5") |>
    collect()
#> # A tibble: 1 × 3
#>   a     b     model  
#>   <chr> <chr> <chr>  
#> 1 0.5   <NA>  parquet

# Use extra column in one parquet and one csv file
data.frame(a = c(NA, NA, letters[1:10]),
           b = "extra-csv-column") |>
    write.csv(file.path(tmpdir, "csv", "alpha.csv"), row.names = FALSE)

data.frame(a = c(NA, NA, letters[1:10]), b = "extra-parquet-column") |>
    write_parquet(sink = file.path(tmpdir, "parquet", "alpha.parquet"))

csv <- open_dataset(tmpdir,
                    format = "csv",
                    col_types = schema(a = string(),
                                       b = string()),
                    strings_can_be_null = TRUE,
                    unify_schemas = FALSE,
                    factory_options = list(exclude_invalid_files = TRUE),
                    partitioning = "model")

parquet <- open_dataset(tmpdir,
                        format = "parquet",
                        schema = schema(a = string(),
                                        b = string(),
                                        model = string()),
                        unify_schemas = FALSE,
                        factory_options = list(exclude_invalid_files = TRUE),
                        partitioning = "model")

open_dataset(list(csv, parquet)) |> filter(a == "0.5") |>
    collect()
#> # A tibble: 2 × 3
#>   a     b     model  
#>   <chr> <chr> <chr>  
#> 1 0.5   <NA>  csv    
#> 2 0.5   <NA>  parquet
open_dataset(list(csv, parquet)) |> filter(model == "csv", is.na(a)) |>
    collect()
#> # A tibble: 4 × 3
#>   a     b                model
#>   <chr> <chr>            <chr>
#> 1 <NA>  extra-csv-column csv  
#> 2 <NA>  extra-csv-column csv  
#> 3 <NA>  <NA>             csv  
#> 4 <NA>  <NA>             csv
open_dataset(list(csv, parquet)) |> filter(model == "parquet", is.na(a)) |>
    collect()
#> # A tibble: 5 × 3
#>   a     b                    model  
#>   <chr> <chr>                <chr>  
#> 1 <NA>  extra-parquet-column parquet
#> 2 <NA>  extra-parquet-column parquet
#> 3 <NA>  <NA>                 parquet
#> 4 <NA>  <NA>                 parquet
#> 5 <NA>  <NA>                 parquet
open_dataset(list(csv, parquet)) |> filter(model == "csv", a == "0.5") |>
    collect()
#> # A tibble: 1 × 3
#>   a     b     model
#>   <chr> <chr> <chr>
#> 1 0.5   <NA>  csv
open_dataset(list(csv, parquet)) |> filter(model == "parquet", a == "0.5") |>
    collect()
#> # A tibble: 1 × 3
#>   a     b     model  
#>   <chr> <chr> <chr>  
#> 1 0.5   <NA>  parquet

# Use extra column in one parquet and one csv file + unify_schemas = TRUE
csv <- open_dataset(tmpdir,
                    format = "csv",
                    col_types = schema(a = string(),
                                       b = string()),
                    strings_can_be_null = TRUE,
                    unify_schemas = TRUE,
                    factory_options = list(exclude_invalid_files = TRUE),
                    partitioning = "model")

parquet <- open_dataset(tmpdir,
                        format = "parquet",
                        schema = schema(a = string(),
                                        b = string(),
                                        model = string()),
                        unify_schemas = TRUE,
                        factory_options = list(exclude_invalid_files = TRUE),
                        partitioning = "model")

open_dataset(list(csv, parquet)) |> filter(a == "0.5") |>
    collect()
#> # A tibble: 2 × 3
#>   a     b     model  
#>   <chr> <chr> <chr>  
#> 1 0.5   <NA>  csv    
#> 2 0.5   <NA>  parquet
open_dataset(list(csv, parquet)) |> filter(model == "csv", is.na(a)) |>
    collect()
#> # A tibble: 4 × 3
#>   a     b                model
#>   <chr> <chr>            <chr>
#> 1 <NA>  extra-csv-column csv  
#> 2 <NA>  extra-csv-column csv  
#> 3 <NA>  <NA>             csv  
#> 4 <NA>  <NA>             csv
open_dataset(list(csv, parquet)) |> filter(model == "parquet", is.na(a)) |>
    collect()
#> # A tibble: 5 × 3
#>   a     b                    model  
#>   <chr> <chr>                <chr>  
#> 1 <NA>  extra-parquet-column parquet
#> 2 <NA>  extra-parquet-column parquet
#> 3 <NA>  <NA>                 parquet
#> 4 <NA>  <NA>                 parquet
#> 5 <NA>  <NA>                 parquet
open_dataset(list(csv, parquet)) |> filter(model == "csv", a == "0.5") |>
    collect()
#> # A tibble: 1 × 3
#>   a     b     model
#>   <chr> <chr> <chr>
#> 1 0.5   <NA>  csv
open_dataset(list(csv, parquet)) |> filter(model == "parquet", a == "0.5") |>
    collect()
#> # A tibble: 1 × 3
#>   a     b     model  
#>   <chr> <chr> <chr>  
#> 1 0.5   <NA>  parquet

Created on 2023-06-06 with reprex v2.0.2