hubverse-org / hubverse-transform

Data transform functions for hubverse model-output files
MIT License
0 stars 0 forks source link

hubverse-transform: supply a parquet schema #14

Open bsweger opened 1 month ago

bsweger commented 1 month ago

Currently, hubverse-transform infers the parquet schema to apply when converted incoming model-output data to parquet. Because each file arrives and is transformed as a single unit, pyarrow has a limited amount of information to use when determining the parquet schema.

This resulted in some annoying behavior when working with the CDC's FluSight data. For example:

# when using pyarrow to read data
ArrowInvalid: Failed to parse string: 'large_increase' as a scalar of type double
# when using pandas.read_parquet method
ArrowInvalid: Failed to parse string: 'US' as a scalar of type int64

The current state is kind of workable but not ideal:

bsweger commented 1 month ago

I thought hubData was handling the differing schemas ok, but @elray1 reported this error message, which I suspect is related

> flusion_table <- hub_con %>%
+     filter(output_type == "quantile", location == "US") %>%
+     hubData::collect_hub()
Error in `dplyr::collect()`:
! NotImplemented: Function 'equal' has no kernel matching input types (string, int64)
Run `rlang::last_trace()` to see where the error occurred.

> rlang::last_trace()
<error/rlang_error>
Error in `dplyr::collect()`:
! NotImplemented: Function 'equal' has no kernel matching input types (string, int64)
---
Backtrace:
    β–†
 1. β”œβ”€... %>% hubData::collect_hub()
 2. └─hubData::collect_hub(.)
 3.   └─base::tryCatch(...)
 4.     └─base (local) tryCatchList(expr, classes, parentenv, handlers)
 5.       └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
 6.         └─value[[3L]](cond)
Run rlang::last_trace(drop = FALSE) to see 2 hidden frames.
bsweger commented 1 month ago

@annakrystalli I know you've wrestled with the mis-matched schema implication of location and output_type_id.

Curious to hear your thoughts: do you think it's reasonable to force those to column types to str when we're transforming the model-output files as part of syncing to the cloud?

Location seems like a good candidate for that type of brute force operation--it doesn't really make sense to have FIPS codes coming in as integers. Am less sure about output_type_id (FWIW, when creating the DuckDB file, I used the union_by_name option to merge schemas, which resulted in a VARCHAR data type for output_type_id

elray1 commented 1 month ago

can we generate a schema based on contents of tasks.json? i.e., if the data type of entries in tasks.json specifying options for a location is character, then we could infer that the schema should specify character type, but if it contains integer values then we could infer that the schema should specify integer type.

Edit: Lucie pointed me to this function.

annakrystalli commented 1 month ago

hubData::connect_hub() does use tasks.json to create a schema. There's a few things that may be going on but I'll need a more reproducible example to try and figure it out, i.e. the code to create the hub_con object in @elray's example.

The location column is coming out as character when you do collect so seems a bit strange that the filter wasn't working.

LucieContamin commented 1 month ago

I am curious about the solution, I encounter the same issue in the past and the only solution I found was to fix the input file. I created a small example using one of our example hub for the hub_con object:

I first created 2 parquet file in a model-output folder that I created, with one file with a location column in numeric:

hub_path <- "./example-complex-forecast-hub/"
hub_con <- hubData::connect_hub(hub_path)
test <- hubData::create_model_out_submit_tmpl(hub_con, round_id = "2023-05-27")
a <- dplyr::filter(test, output_type == "quantile", location != "US") %>% 
  dplyr::mutate(value = sample(seq(0,100, by = 0.1), nrow(.), replace = T),
                location = as.numeric(location))
b <- test %>% 
  dplyr::mutate(value = sample(seq(0,100, by = 0.1), nrow(.), replace = T))
arrow::write_parquet(a, "model-output/teama-model/2023-10-14-teama-model.parquet")
arrow::write_parquet(b, "model-output/teamb-model/2023-10-14-teamb-model.parquet")

And when I try to read the files with one of the connect function:

hubData::connect_model_output("model-output/", file_format = "parquet") %>%
  dplyr::filter(output_type == "quantile", location == "US") %>%
  hubData::collect_hub()

I got the error:

Error in `arrow::open_dataset()`:
  ! Type error: Unable to merge: Field location has incompatible types: double vs string

even if I remove the filter:

hubData::connect_model_output("model-output/", file_format = "parquet") %>%
   #dplyr::filter(output_type == "quantile", location != "US") %>%
   hubData::collect_hub()
Error in `arrow::open_dataset()`:
! Type error: Unable to merge: Field location has incompatible types: double vs string
Run `rlang::last_trace()` to see where the error occurred.

And when I add the schema: I got an error too (I different one but still an error):

config_json <- hubUtils::read_config(hub_path, "tasks")
or_schema <- hubData::create_hub_schema(config_json)
hubData::connect_model_output("model-output/", file_format = "parquet", 
                              schema = or_schema) %>%
  dplyr::filter(output_type == "quantile", location == "US") %>%
  hubData::collect_hub()

Error in `dplyr::collect()`:
  ! NotImplemented: Function 'equal' has no kernel matching input types (string, double)

However if I comment the filter, it works:

> hubData::connect_model_output("model-output/", file_format = "parquet", 
                               schema = or_schema) %>%
  # dplyr::filter(output_type == "quantile", location == "US") %>%
   hubData::collect_hub()
# A tibble: 1,028,224 Γ— 9
   model_id  reference_date target horizon location target_end_date output_type output_type_id value
 * <chr>     <date>         <chr>    <int> <chr>    <date>          <chr>       <chr>          <dbl>
 1 teama-mo… 2023-05-27     wk in…       0 1        2022-10-22      quantile    0.01            88.6
 2 teama-mo… 2023-05-27     wk in…       1 1        2022-10-22      quantile    0.01            74.6
 3 teama-mo… 2023-05-27     wk in…       2 1        2022-10-22      quantile    0.01            44.9
 4 teama-mo… 2023-05-27     wk in…       3 1        2022-10-22      quantile    0.01            17.6
 5 teama-mo… 2023-05-27     wk in…       0 2        2022-10-22      quantile    0.01            57.1
 6 teama-mo… 2023-05-27     wk in…       1 2        2022-10-22      quantile    0.01            69.8
 7 teama-mo… 2023-05-27     wk in…       2 2        2022-10-22      quantile    0.01            69.4
 8 teama-mo… 2023-05-27     wk in…       3 2        2022-10-22      quantile    0.01            96.6
 9 teama-mo… 2023-05-27     wk in…       0 4        2022-10-22      quantile    0.01            17.4
10 teama-mo… 2023-05-27     wk in…       1 4        2022-10-22      quantile    0.01            63.8
# β„Ή 1,028,214 more rows
# β„Ή Use `print(n = ...)` to see more rows
lmullany commented 1 month ago

When you add the schema option to the connect_model_output() you are overriding the unify_schemas = TRUE which is currently being passed to arrow::open_dataset(). This is why the error message is different.

In the first case (when you are not overriding unify_schemas=TRUE, unify_schemas itself fails (this failure happens even before a call to collect()), because it can't unify the different types (double and string) of location field in the two separate parquet files

In the second case, when you override unify_schemas=TRUE, by passing the or_schema object, you are indicating what the schema should be.. However, when you add a dplyr::filter(), the object that gets passed to collect() is of class arrow_dplyr_query Such an object first gets computed in arrow:::compute.arrow_dplyr_query() which fails, because it can't compute the query on the teama dataset.

In the absence of the dplyr::filter(), the object getting passed to collect() is of class Dataset, which goes to the Scan->Build->Finish pipeline without any Filter(Expression)) step.

You can even force the error that you see in the second scenario (with or_schema and with dplyr::filter() included), via arrow expressions... something like this:


# Create object of class `Dataset`
x = connect_model_output("model-output/"), file_format = "parquet", schema = or_schema)

# No error
ScannerBuilder$create(x)$Finish()$ToTable()

# Error if you add Filter Expression
ScannerBuilder$create(x)$Filter(Expression$field_ref("location") == "US")$Finish()$ToTable()
lmullany commented 1 month ago

Just to follow on from above, you can cast to character. i.e. filter(x, as.character(location)=="US"); you can see that this type of dplyr::filter() call gets translated as:

ExecPlan with 4 nodes:
3:SinkNode{}
  2:ProjectNode{projection=[reference_date, target, horizon, location, target_end_date, output_type, output_type_id, value, model_id]}
    1:FilterNode{filter=(cast(location, {to_type=string, allow_int_overflow=false, allow_time_truncate=false, allow_time_overflow=false, allow_decimal_truncate=false, allow_float_truncate=false, allow_invalid_utf8=false}) == "US")}
      0:SourceNode{}

Or, equivalently

ScannerBuilder$create(x)$Filter(arrow:::cast(Expression$field_ref("location"), string()) == "US")$Finish()$ToTable()
annakrystalli commented 1 month ago
library(dplyr)

There’s a lot going on here. I’ve done some experiments which can be found in this fork and branch of example-complex-scenario-hub. I made a few changes to ensure files conformed to hub standards adn could be validated.

All the code can be found in report.Rmd.

Differences in reading csvs between arrow and readr

The first issue appears to be a result of how arrow reads csvs, especially compared to readr.

Below I create two csvs, both with US removed from the location column. I then write them out using arrow and readr.

dir.create("test", showWarnings = FALSE)

arrow::read_csv_arrow(
  "model-output/HUBuni-simexamp/2021-03-07-HUBuni-simexamp.csv"
) %>%
  filter(location != "US") %>%
  slice(1:1000) %>%
  arrow::write_csv_arrow(file.path("test", "2021-03-07-HUBuni-simexamp-arrow.csv"))

readr::read_csv("model-output/HUBuni-simexamp/2021-03-07-HUBuni-simexamp.csv") %>%
  filter(location != "US") %>%
  slice(1:1000) %>%
  readr::write_csv(file.path("test", "2021-03-07-HUBuni-simexamp-readr.csv"))
#> Rows: 838656 Columns: 8
#> ── Column specification ────────────────────────────────────────────────────────
#> Delimiter: ","
#> chr  (4): scenario_id, location, target, output_type
#> dbl  (3): horizon, output_type_id, value
#> date (1): origin_date
#> 
#> β„Ή Use `spec()` to retrieve the full column specification for this data.
#> β„Ή Specify the column types or set `show_col_types = FALSE` to quiet this message.

CSVs can often be difficult to work with because they don’t explicitly encapsulate a schema like e.g.Β parquet files do.

Looking at the structure of the two files created:

readr csv
origin_date,scenario_id,location,target,horizon,output_type,output_type_id,value
2021-03-07,A-2021-03-05,02,inc death,1,quantile,0.01,0
2021-03-07,A-2021-03-05,02,inc death,1,quantile,0.025,0
2021-03-07,A-2021-03-05,02,inc death,1,quantile,0.05,1
2021-03-07,A-2021-03-05,02,inc death,1,quantile,0.1,1
2021-03-07,A-2021-03-05,02,inc death,1,quantile,0.15,2
arrow csv
"origin_date","scenario_id","location","target","horizon","output_type","output_type_id","value"
2021-03-07,"A-2021-03-05","02","inc death",1,"quantile",0.01,0
2021-03-07,"A-2021-03-05","02","inc death",1,"quantile",0.025,0
2021-03-07,"A-2021-03-05","02","inc death",1,"quantile",0.05,1
2021-03-07,"A-2021-03-05","02","inc death",1,"quantile",0.1,1
2021-03-07,"A-2021-03-05","02","inc death",1,"quantile",0.15,2

you’d think arrow had done a better job of encoding the character nature of the location column.

However, it seems that arrow does a worse job of reading in the location column, regardless of what function was used to write the file.

arrow does NOT correctly read in location, regardless of what function was used to write the file,

arrow::read_csv_arrow(
  file.path("test", "2021-03-07-HUBuni-simexamp-arrow.csv")
)
#> # A tibble: 1,000 Γ— 8
#>    origin_date scenario_id  location target   horizon output_type output_type_id
#>    <date>      <chr>           <int> <chr>      <int> <chr>                <dbl>
#>  1 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.01 
#>  2 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.025
#>  3 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.05 
#>  4 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.1  
#>  5 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.15 
#>  6 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.2  
#>  7 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.25 
#>  8 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.3  
#>  9 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.35 
#> 10 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.4  
#> # β„Ή 990 more rows
#> # β„Ή 1 more variable: value <dbl>
arrow::read_csv_arrow(
  file.path("test", "2021-03-07-HUBuni-simexamp-readr.csv")
)
#> # A tibble: 1,000 Γ— 8
#>    origin_date scenario_id  location target   horizon output_type output_type_id
#>    <date>      <chr>           <int> <chr>      <int> <chr>                <dbl>
#>  1 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.01 
#>  2 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.025
#>  3 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.05 
#>  4 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.1  
#>  5 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.15 
#>  6 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.2  
#>  7 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.25 
#>  8 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.3  
#>  9 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.35 
#> 10 2021-03-07  A-2021-03-05        2 inc dea…       1 quantile             0.4  
#> # β„Ή 990 more rows
#> # β„Ή 1 more variable: value <dbl>

while readr correctly reads in location regardless of what function was used to write the file.

readr::read_csv(file.path("test", "2021-03-07-HUBuni-simexamp-readr.csv"))
#> Rows: 1000 Columns: 8
#> ── Column specification ────────────────────────────────────────────────────────
#> Delimiter: ","
#> chr  (4): scenario_id, location, target, output_type
#> dbl  (3): horizon, output_type_id, value
#> date (1): origin_date
#> 
#> β„Ή Use `spec()` to retrieve the full column specification for this data.
#> β„Ή Specify the column types or set `show_col_types = FALSE` to quiet this message.
#> # A tibble: 1,000 Γ— 8
#>    origin_date scenario_id  location target   horizon output_type output_type_id
#>    <date>      <chr>        <chr>    <chr>      <dbl> <chr>                <dbl>
#>  1 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.01 
#>  2 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.025
#>  3 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.05 
#>  4 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.1  
#>  5 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.15 
#>  6 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.2  
#>  7 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.25 
#>  8 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.3  
#>  9 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.35 
#> 10 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.4  
#> # β„Ή 990 more rows
#> # β„Ή 1 more variable: value <dbl>
readr::read_csv(file.path("test", "2021-03-07-HUBuni-simexamp-arrow.csv"))
#> Rows: 1000 Columns: 8
#> ── Column specification ────────────────────────────────────────────────────────
#> Delimiter: ","
#> chr  (4): scenario_id, location, target, output_type
#> dbl  (3): horizon, output_type_id, value
#> date (1): origin_date
#> 
#> β„Ή Use `spec()` to retrieve the full column specification for this data.
#> β„Ή Specify the column types or set `show_col_types = FALSE` to quiet this message.
#> # A tibble: 1,000 Γ— 8
#>    origin_date scenario_id  location target   horizon output_type output_type_id
#>    <date>      <chr>        <chr>    <chr>      <dbl> <chr>                <dbl>
#>  1 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.01 
#>  2 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.025
#>  3 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.05 
#>  4 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.1  
#>  5 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.15 
#>  6 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.2  
#>  7 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.25 
#>  8 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.3  
#>  9 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.35 
#> 10 2021-03-07  A-2021-03-05 02       inc dea…       1 quantile             0.4  
#> # β„Ή 990 more rows
#> # β„Ή 1 more variable: value <dbl>

Why is this relevant?

I suspect what is going on in the case of hub transformations and the errors @bsweger is experiencing is that arrow might be converting location to integers on read (in at least one file where there is no β€œUS” value?) and then writing them out as integers as well.

I have worried from the beginning about issues with the transformations if they are not schema aware and I think this may be an example of that. I think the best way to handle this is to make sure that the schema is preserved when writing out the parquet files and test for it as well.

Differences in how schemas are applied to datasets between csv and parquet

Having said all that, there seem to be additional issues with arrow datasets. All of this reminds of edge cases I’ve come across before and even reached out to the arrow team about but we were never able to reproduce the issue when building up from smaller examples… I think we may have just figured it out!

The problem seems to be parquet files and that applying the schema does not appear to work in the same way as for csv files.

Create a csv file with integer location column

readr::read_csv("model-output/hubcomp_examp/2021-03-07.csv") %>%
  filter(location != "US") %>%
  mutate(location = as.integer(location)) %>%
  readr::write_csv(
    file.path(
      "model-output/hubcomp-examp",
      "2021-03-07-hubcomp-examp.csv"
    )
  )
#> Rows: 838656 Columns: 8
#> ── Column specification ────────────────────────────────────────────────────────
#> Delimiter: ","
#> chr  (4): scenario_id, location, target, output_type
#> dbl  (3): horizon, output_type_id, value
#> date (1): origin_date
#> 
#> β„Ή Use `spec()` to retrieve the full column specification for this data.
#> β„Ή Specify the column types or set `show_col_types = FALSE` to quiet this message.

Create a parquet file with integer location column

readr::read_csv("model-output/hubcomp_examp/2021-03-07.csv") %>%
  filter(location != "US") %>%
  mutate(location = as.integer(location)) %>%
  arrow::write_parquet(
    file.path(
      "model-output/hubcomp-examp",
      "2021-03-07-hubcomp-examp.parquet"
    )
  )
#> Rows: 838656 Columns: 8
#> ── Column specification ────────────────────────────────────────────────────────
#> Delimiter: ","
#> chr  (4): scenario_id, location, target, output_type
#> dbl  (3): horizon, output_type_id, value
#> date (1): origin_date
#> 
#> β„Ή Use `spec()` to retrieve the full column specification for this data.
#> β„Ή Specify the column types or set `show_col_types = FALSE` to quiet this message.

Connect to whole hub

Dataset opens but we get the familiar error when filtering

hubData::connect_hub(".")
#> 
#> ── <hub_connection/UnionDataset> ──
#> 
#> β€’ hub_name: "Complex Scenario Hub"
#> β€’ hub_path: '.'
#> β€’ file_format: "csv(3/3)" and "parquet(5/5)"
#> β€’ file_system: "LocalFileSystem"
#> β€’ model_output_dir: "./model-output"
#> β€’ config_admin: 'hub-config/admin.json'
#> β€’ config_tasks: 'hub-config/tasks.json'
#> 
#> ── Connection schema
#> hub_connection
#> origin_date: date32[day]
#> scenario_id: string
#> location: string
#> target: string
#> horizon: int32
#> output_type: string
#> output_type_id: double
#> value: double
#> model_id: string
#> age_group: string
#> target_date: date32[day]

hubData::connect_hub(".") %>%
  filter(location == "US") %>%
  hubData::collect_hub()
#> Error in `dplyr::collect()`:
#> ! NotImplemented: Function 'equal' has no kernel matching input types
#>   (string, int32)

Connect to parquet only

Same error as above

hubData::connect_hub(".", file_format = "parquet")
#> 
#> ── <hub_connection/FileSystemDataset> ──
#> 
#> β€’ hub_name: "Complex Scenario Hub"
#> β€’ hub_path: '.'
#> β€’ file_format: "parquet(5/5)"
#> β€’ file_system: "LocalFileSystem"
#> β€’ model_output_dir: "./model-output"
#> β€’ config_admin: 'hub-config/admin.json'
#> β€’ config_tasks: 'hub-config/tasks.json'
#> 
#> ── Connection schema
#> hub_connection with 5 Parquet files
#> origin_date: date32[day]
#> scenario_id: string
#> location: string
#> target: string
#> horizon: int32
#> age_group: string
#> target_date: date32[day]
#> output_type: string
#> output_type_id: double
#> value: double
#> model_id: string

hubData::connect_hub(".", file_format = "parquet") %>%
  filter(location == "US") %>%
  hubData::collect_hub()
#> Error in `dplyr::collect()`:
#> ! NotImplemented: Function 'equal' has no kernel matching input types
#>   (string, int32)

Connect to csv only

Works!

hubData::connect_hub(".", file_format = "csv")
#> 
#> ── <hub_connection/FileSystemDataset> ──
#> 
#> β€’ hub_name: "Complex Scenario Hub"
#> β€’ hub_path: '.'
#> β€’ file_format: "csv(3/3)"
#> β€’ file_system: "LocalFileSystem"
#> β€’ model_output_dir: "./model-output"
#> β€’ config_admin: 'hub-config/admin.json'
#> β€’ config_tasks: 'hub-config/tasks.json'
#> 
#> ── Connection schema
#> hub_connection with 3 csv files
#> origin_date: date32[day]
#> scenario_id: string
#> location: string
#> target: string
#> horizon: int32
#> output_type: string
#> output_type_id: double
#> value: double
#> model_id: string

hubData::connect_hub(".", file_format = "csv") %>%
  filter(location == "US") %>%
  hubData::collect_hub()
#> # A tibble: 29,952 Γ— 9
#>    model_id      origin_date scenario_id  location target    horizon output_type
#>  * <chr>         <date>      <chr>        <chr>    <chr>       <int> <chr>      
#>  1 hubcomp_examp 2021-03-07  A-2021-03-05 US       inc death       1 quantile   
#>  2 hubcomp_examp 2021-03-07  A-2021-03-05 US       inc death       1 quantile   
#>  3 hubcomp_examp 2021-03-07  A-2021-03-05 US       inc death       1 quantile   
#>  4 hubcomp_examp 2021-03-07  A-2021-03-05 US       inc death       1 quantile   
#>  5 hubcomp_examp 2021-03-07  A-2021-03-05 US       inc death       1 quantile   
#>  6 hubcomp_examp 2021-03-07  A-2021-03-05 US       inc death       1 quantile   
#>  7 hubcomp_examp 2021-03-07  A-2021-03-05 US       inc death       1 quantile   
#>  8 hubcomp_examp 2021-03-07  A-2021-03-05 US       inc death       1 quantile   
#>  9 hubcomp_examp 2021-03-07  A-2021-03-05 US       inc death       1 quantile   
#> 10 hubcomp_examp 2021-03-07  A-2021-03-05 US       inc death       1 quantile   
#> # β„Ή 29,942 more rows
#> # β„Ή 2 more variables: output_type_id <dbl>, value <dbl>

I suspect this is most likely a result of the fact that the schema is supplied differently when opening csv vs parquet datasets (including under the hood in connect_hub()).

config_tasks <- hubUtils::read_config(".", "tasks")
schema <- hubData::create_hub_schema(
  config_tasks,
  partitions = list(model_id = arrow::utf8())
)

In csv datasets we use argument col_types to specify the schema:

arrow::open_dataset(
  "model-output",
  format = "csv",
  partitioning = "model_id",
  col_types = schema,
  unify_schemas = TRUE,
  strings_can_be_null = TRUE,
  factory_options = list(exclude_invalid_files = TRUE)
) %>%
  filter(location == "US") %>%
  hubData::collect_hub()
#> # A tibble: 29,952 Γ— 9
#>    model_id        origin_date scenario_id  location target  horizon output_type
#>  * <chr>           <date>      <chr>        <chr>    <chr>     <int> <chr>      
#>  1 HUBuni-simexamp 2021-03-07  A-2021-03-05 US       inc de…       1 quantile   
#>  2 HUBuni-simexamp 2021-03-07  A-2021-03-05 US       inc de…       1 quantile   
#>  3 HUBuni-simexamp 2021-03-07  A-2021-03-05 US       inc de…       1 quantile   
#>  4 HUBuni-simexamp 2021-03-07  A-2021-03-05 US       inc de…       1 quantile   
#>  5 HUBuni-simexamp 2021-03-07  A-2021-03-05 US       inc de…       1 quantile   
#>  6 HUBuni-simexamp 2021-03-07  A-2021-03-05 US       inc de…       1 quantile   
#>  7 HUBuni-simexamp 2021-03-07  A-2021-03-05 US       inc de…       1 quantile   
#>  8 HUBuni-simexamp 2021-03-07  A-2021-03-05 US       inc de…       1 quantile   
#>  9 HUBuni-simexamp 2021-03-07  A-2021-03-05 US       inc de…       1 quantile   
#> 10 HUBuni-simexamp 2021-03-07  A-2021-03-05 US       inc de…       1 quantile   
#> # β„Ή 29,942 more rows
#> # β„Ή 2 more variables: output_type_id <dbl>, value <dbl>

while in parquet datasets we use argument schema to specify the schema:

arrow::open_dataset(
  "model-output",
  format = "parquet",
  partitioning = "model_id",
  schema = schema,
  unify_schemas = TRUE,
  factory_options = list(exclude_invalid_files = TRUE)
) %>%
  filter(location == "US") %>%
  hubData::collect_hub()
#> Error in `dplyr::collect()`:
#> ! NotImplemented: Function 'equal' has no kernel matching input types
#>   (string, int32)

This does sort of beg the question of what is the purpose of schemas for parquet files if they are not applied in the same way as for csv files. I think this is a question for the arrow team.

The importance of validation

Note also that none of these files would pass validation. Given the brittleness of arrow, validation of data on the way in is very important. It seems like a lot of this issues are coming from hubs that have not been fully validated or have been transformed in a non schema aware way. There is a limit to what can be done to fix issues with the data at access time. It seems like this is especially important for parquet files, which effectively include a schema as part of file metadata.

hubValidations::validate_submission(
  hub_path = ".",
  file_path = file.path(
    "hubcomp-examp",
    "2021-03-07-hubcomp-examp.parquet"
  ),
  skip_submit_window_check = TRUE
)
#> βœ” example-complex-scenario-hub: All hub config files are valid.
#> βœ” 2021-03-07-hubcomp-examp.parquet: File exists at path
#>   'model-output/hubcomp-examp/2021-03-07-hubcomp-examp.parquet'.
#> βœ” 2021-03-07-hubcomp-examp.parquet: File name
#>   "2021-03-07-hubcomp-examp.parquet" is valid.
#> βœ” 2021-03-07-hubcomp-examp.parquet: File directory name matches `model_id`
#>   metadata in file name.
#> βœ” 2021-03-07-hubcomp-examp.parquet: `round_id` is valid.
#> βœ” 2021-03-07-hubcomp-examp.parquet: File is accepted hub format.
#> βœ” 2021-03-07-hubcomp-examp.parquet: Metadata file exists at path
#>   'model-metadata/hubcomp-examp.yaml'.
#> βœ” 2021-03-07-hubcomp-examp.parquet: File could be read successfully.
#> βœ” 2021-03-07-hubcomp-examp.parquet: `round_id_col` name is valid.
#> βœ” 2021-03-07-hubcomp-examp.parquet: `round_id` column "origin_date" contains a
#>   single, unique round ID value.
#> βœ” 2021-03-07-hubcomp-examp.parquet: All `round_id_col` "origin_date" values
#>   match submission `round_id` from file name.
#> βœ” 2021-03-07-hubcomp-examp.parquet: Column names are consistent with expected
#>   round task IDs and std column names.
#> ! 2021-03-07-hubcomp-examp.parquet: Column data types do not match hub schema.
#>   `location ` should be "character " not "integer ", `horizon ` should be
#>   "integer " not "double "
#> βœ– 2021-03-07-hubcomp-examp.parquet: `tbl` contains invalid values/value
#>   combinations.  Column `location` contains invalid values "2", "1", "5", "4",
#>   "6", "8", and "9".

hubValidations::validate_submission(
  hub_path = ".",
  file_path = file.path(
    "hubcomp-examp",
    "2021-03-07-hubcomp-examp.csv"
  ),
  skip_submit_window_check = TRUE
)
#> βœ” example-complex-scenario-hub: All hub config files are valid.
#> βœ” 2021-03-07-hubcomp-examp.csv: File exists at path
#>   'model-output/hubcomp-examp/2021-03-07-hubcomp-examp.csv'.
#> βœ” 2021-03-07-hubcomp-examp.csv: File name "2021-03-07-hubcomp-examp.csv" is
#>   valid.
#> βœ” 2021-03-07-hubcomp-examp.csv: File directory name matches `model_id` metadata
#>   in file name.
#> βœ” 2021-03-07-hubcomp-examp.csv: `round_id` is valid.
#> βœ” 2021-03-07-hubcomp-examp.csv: File is accepted hub format.
#> βœ” 2021-03-07-hubcomp-examp.csv: Metadata file exists at path
#>   'model-metadata/hubcomp-examp.yaml'.
#> βœ” 2021-03-07-hubcomp-examp.csv: File could be read successfully.
#> βœ” 2021-03-07-hubcomp-examp.csv: `round_id_col` name is valid.
#> βœ” 2021-03-07-hubcomp-examp.csv: `round_id` column "origin_date" contains a
#>   single, unique round ID value.
#> βœ” 2021-03-07-hubcomp-examp.csv: All `round_id_col` "origin_date" values match
#>   submission `round_id` from file name.
#> βœ” 2021-03-07-hubcomp-examp.csv: Column names are consistent with expected round
#>   task IDs and std column names.
#> βœ” 2021-03-07-hubcomp-examp.csv: Column data types match hub schema.
#> βœ– 2021-03-07-hubcomp-examp.csv: `tbl` contains invalid values/value
#>   combinations.  Column `location` contains invalid values "2", "1", "5", "4",
#>   "6", "8", and "9".

Summary

LucieContamin commented 1 month ago

Thank you all for the detailed answer!

I agree with Anna, that " best way to handle this is to make sure that the schema is preserved when writing out the parquet files and test for it as well." . That what I ended up doing too.

I am also curious about the " [..] purpose of schemas for .parquet files if they are not applied in the same way as for csv files. I think this is a question for the arrow team.".

bsweger commented 1 month ago

Thanks for all of the great digging and background info--this is great!

It's worth noting that this class of error is occurring on validated files. So I agree that we need to get right with schemas to the extent possible, since different downstream tools are able to infer mixed schemas with varying results.

In the meantime, I updated the R snippet on the gist to this incantation that works with the problematic filter. This seems related to the prior note from @lmullany?

[not saying this is the ideal permanent solution, just wanted to give people a way to work with the files in their current state]

 hub_path_cloud <- s3_bucket('cdcepi-flusight-forecast-hub/')
data_cloud <- connect_hub(hub_path_cloud, file_format="parquet") %>% collect()

filtered <- data_cloud %>%
filter(output_type == "quantile", location == "US")

Doing the collect() before filtering solved the ! NotImplemented: Function 'equal' has no kernel matching input types (string, int64) issue. I'll let the R experts weigh in on other ways this approach can break things.

bsweger commented 1 month ago

One thing I'm struggling to understand is whether or not we'd get this same class of error for a non-cloud hub that receives parquet-based submissions.

If individual teams are submitting parquet files, wouldn't we have the same issue with potentially mis-matched parquet schemas?

Does anyone know of a hubverse hub that is accepting parquet model-outputs?

LucieContamin commented 1 month ago

Yes we will have the same issue. I encounter the same issue in SMH.

annakrystalli commented 1 month ago

It's worth noting that this class of error is occurring on validated files. So I agree that we need to get right with schemas to the extent possible, since different downstream tools are able to infer mixed schemas with varying results.

Are they validated files as submitted or are they files that have been converted to parquet via arrow? If they are files that actually passed validation on the way in then we need to flag those and understand what's going on a bit better.

Doing the collect() before filtering solved the ! NotImplemented: Function 'equal' has no kernel matching input types (string, int64) issue. I'll let the R experts weigh in on other ways this approach can break things.

This will work but it means loading the full dataset into memory before filtering. That's really the opposite of what the workflow is designed to do so I would fix the data instead and change the snippet back when possible.

bsweger commented 1 month ago

Are they validated files as submitted or are they files that have been converted to parquet via arrow? If they are files that actually passed validation on the way in then we need to flag those and understand what's going on a bit better.

They are files that were validated when submitted as .csvs and then then converted to parquet via arrow (hence the question about other hubs that receive parquet submissions)

This will work but it means loading the full dataset into memory before filtering. That's really the opposite of what the workflow is designed to do so I would fix the data instead and change the snippet back.

For sure--but while we're still sorting through how to fix the data, it's worth presenting people with a short-term option for interacting with the FluSight data (which aren't large) using R.

I'm still looking to verify whether or not we're going to have the mis-matched schema issue on parquet files submitted by modelers (i.e., no cloud transforms). @LucieContamin has weighed in; my next step is to refer to Nick's list of hubverse hubs and see if there's anything on the list that uses parquet.

annakrystalli commented 1 month ago

They are files that were validated when submitted as .csvs and then then converted to parquet via arrow (hence the question about other hubs that receive parquet submissions)

I think this is related to the first issue I highlighted at the beginning of my comment regarding arrow changing the data types of csv columns when it reads them in and then encoding that in the parquet file schema when writing out. So I think it's to do with the transformation to parquet when pushing to cloud. Non cloud hubs do not re-write any data post validation so shouldn't be affected. I think if we make the cloud transformations schema aware, we won't get these issues.

annakrystalli commented 1 month ago

I should mention that if a parquet file schema does not conform to the hub schema on the way in it does not pass validation. So once validated a parquet file is actually more robust than a CSV file πŸ˜„!

LucieContamin commented 1 month ago

I should also mention, to clarify, that the SMH file issue was on a file that was NOT validated with HubValidation, and will probably fail the validation.

bsweger commented 1 month ago

@LucieContamin Thanks for the clarification--that's helpful!

I hope it's clear that I'm not pushing back on the idea of making the cloud transforms schema aware or advocating for less-optimal methods for data access!

My goal is to fully understand the problem space and apply the most straightforward fix at the correct point in the process, so appreciate y'alls patience as I poke around.

annakrystalli commented 1 month ago

I just wanted to note that, along with the data type issue, the transformation has also changed some location values. In the files where the column was converted to integer, the leading zeros were dropped so some values no longer correspond to valid FIPS values:

library(dplyr)
library(hubData)
hub_path_cloud <- s3_bucket('cdcepi-flusight-forecast-hub/')
data_cloud <- connect_hub(hub_path_cloud, file_format="parquet") %>% collect()
data_cloud$location |> unique()
#> [1] "6"  "01" "02" "04" "05" "06" "08" "09" "10" "11" "12" "13" "15" "16" "17"
#> [16] "18" "19" "20" "21" "22" "23" "24" "25" "26" "27" "28" "29" "30" "31" "32"
#> [31] "33" "34" "35" "36" "37" "38" "39" "40" "41" "42" "44" "45" "46" "47" "48"
#> [46] "49" "50" "51" "53" "54" "55" "56" "72" "US" "1"  "2"  "5"  "9"  "4"  "8" 

Given this, I think the best place to supply the schema is when reading in GitHub files instead of writing out cloud files as the latter will likely still result in dropping if the leading zero.

bsweger commented 1 month ago

Yep, that makes sense.

I was looking at the FIPS codes also and have a somewhat related question about the validation process.

How do we want/expect modelers to respond to validation warnings vs errors? When running the validation process against a parquet file without all 50 states and no string value (like "US"), the validation will fail because the 01-09 FIPS codes get their leading zero dropped and don't validate against the list of valid locations in tasks.json.

! 2024-05-12-parquet-test_data.parquet: Column data types do not match hub schema.  `origin_date ` should be "Date " not "character ",
  `location ` should be "character " not "integer ", `output_type_id ` should be "character " not "double "
βœ– 2024-05-12-parquet-test_data.parquet: `tbl` contains invalid values/value combinations.  Column `location` contains invalid values "9",
  "8", and "6".

But if you then remove all model-output entries for locations 01 - 09, the validation returns the "data types do not match hub schema" warning but not the error (because two-digit FIPS code do match the list of valid locations).

It's a contrived edge case that probably isn't worth spending a lot of time on...more just an operational question. We don't expect downstream data access tools to work smoothly if hub users ignore validation errors. Do we have the same stance for warnings?

[none of this matters for people using the cloud data, if we're fixing this up in the transform--it would only impact people working with local model-output data...I was able to replicate the ! NotImplemented: Function 'equal' has no kernel matching input types (string, int64) by removing two-character FIPS codes from a model-output file in a forked copy of the RSV forecast hub]

bsweger commented 1 month ago

FWIW, I moved this issue into the board's "Up Next" column because it makes sense to have the transform be schema-aware and act accordingly.

It's also a good case for moving people to the cloud when we can, because the transform function is an opportunity to provide a degree a cleanup if teams ignore errors and warnings when submitting model-output data.

Appreciate the great convo here!

annakrystalli commented 1 month ago

But if you then remove all model-output entries for locations 01 - 09, the validation returns the "data types do not match hub schema" warning but not the error (because two-digit FIPS code do match the list of valid locations).

This is expected as when matching values to expected valid values, we transform everything to character as trying to check numbers for equality is generally dodgy. So we check that everything transformed as character matches combinations of values in the expanded grid of valid values.

You could argue the results of the checks are an accurate assessment of the issues because coercing from character to integer and back to character does not lead to change of the actual value when there are no leading zeros but does in the cases where there are. Hence fixing the data type in files without 1-9 values will fix any issues whereas files with 1-9 values will also need leading zeros adding.

annakrystalli commented 1 month ago

[none of this matters for people using the cloud data, if we're fixing this up in the transform--it would only impact people working with local model-output data...I was able to replicate the ! NotImplemented: Function 'equal' has no kernel matching input types (string, int64) by removing two-character FIPS codes from a model-output file in a forked copy of the RSV forecast hub]

I'd be interested to see an example of this as my understanding is all FIPS codes are two-character.

bsweger commented 1 month ago

TL;DR: My example is a very contrived edge case, and I agree that our time is best spent making the cloud transform schema-aware. If we want to dive into this further, probably worth switching to a synchronous convo.

To reproduce:

  1. Clone this fork of the rsv-forecast-hub repo (I forked this one because it's small and accepts parquet submissions). The fork contains a model-output file with location FIPS codes > 10 (and no "US")
  2. Verify that the above model-output file validates with a warning but no error:
    hubValidations::validate_submission(
    hub_path="~/code/rsv-forecast-hub/",
    file_path="parquet-test_data/2024-05-05-parquet-test_data.parquet")
  3. Reproduce the data issue:
    > rsv_path <- "~/code/rsv-forecast-hub"
    > rsv_con <- connect_hub(rsv_path)
    > rsv_con %>% filter(location=="US") %>% collect()
    Error in `compute.arrow_dplyr_query()`:
    ! NotImplemented: Function 'equal' has no kernel matching input types (string, int64)

Again, this is a very contrived example, and I don't think it's worth spending a ton of time on.

Essentially, in this scenario, location values are interpreted as integers when the modeler writes out the parquet file. However, two digit integers match the list of accepted locations, so the validation process throws the warning but no error.

If the modeler had included a location value of 09, for example, parquet writes it as 9, which does not match the accepted value of 09 and fails validations.

annakrystalli commented 1 month ago

I should mention that the ! still represent failures. Errors are more severe and stop execution of other checks but ! failures also need to be resolved so the file still didn't pass validation (and therefore should not have ended up in the hub). There is a bit of detail about this here: https://infectious-disease-modeling-hubs.github.io/hubValidations/articles/hub-validations-class.html

bsweger commented 1 week ago

During the recent Hubverse retreat, @annakrystalli @LucieContamin and I determined that the best way forward here is to port the R-based create-hub-schema function (from the hubData package) to Python.

Then the transform package can enforce a schema when reading incoming model-output files.

As a first step, this PR refactors the transform function to ensure that it will have a more straightforward way to access a hub's hub-config directory: https://github.com/hubverse-org/hubverse-transform/pull/20