OHDSI / Andromeda

AsynchroNous Disk-based Representation of MassivE DAta: An R package aimed at replacing ff for storing large data objects.
https://ohdsi.github.io/Andromeda/
11 stars 10 forks source link

Add arrow as a backend for speedup of around 10-20x for PLP and other Hades packages #35

Open egillax opened 2 years ago

egillax commented 2 years ago

Hello everyone,

Recently I've been experimenting with using apache arrow for out of memory computation instead of andromeda with rsqlite. The speedups I'm getting are quite significant. Arrow can use dplyr verbs so the code changes required are minimal. I've already implemented it for a regular PLP workflow and tested using simulated data (using simulatePlpData) for 10e3, 100e3 and 1e6 cohort sizes (my fork of PLP is here and of Cyclops here). There's an example script in extras/testArrow.R

Following numbers are with all default settings and Lasso, I used the same simulated dataset for each comparison and same splitSeed.

10e3 - Normal PLP: 6.7 minutes, Arrow PLP: 21.5 seconds - a speedup of 19x

100e3 - Normal PLP: 58.5 minutes, Arrow PLP: 5.2 minutes - a speedup of 11x

1e6 - Normal PLP: 18.9 hours, Arrow PLP: 2.5 hours - a speedup of 7.6

My suggestion would be to add arrow as a backend. It should be quite simple. The function I used to create the arrow dataset is defined here. I also have a function to convert andromeda tables to arrow here.

I also saw that there has been some duckdb experimentation/discussion here. With adding arrow you'd get duckdb for free since there is tight integration and near copyless conversion between the two formats. So for example you can do something like:

arrowDataset %>% dplyr::stuff() %>% arrow::toduck_db() %>% duckdb::stuff() %>% arrow::to_arrow()

I've not tested this though since everything I had do do was already available in arrow.

Tagging various people for visibility and people I have discussed this with at some point.

@ablack3 @schuemie @jreps @rekkasa @tomseinen @lhjohn

Regards Egill Fridgeirsson

lhjohn commented 2 years ago

Arrow sounds great! I have been struggling with the speed of our current implementation at times, especially with massive data sets.

jreps commented 2 years ago

+1 from me - sqlite is very slow, it would be great to have data manipulation done ~10x-20x faster!

ablack3 commented 2 years ago

This sounds great. I'll work on it for the next release.

msuchard commented 2 years ago

@egillax -- whether or not we change the backend of Andromeda, pretty please make a pull-request to handle the Arrow tables in Cyclops. It's a primary aim of that package to provide highest-performance standards.

schuemie commented 2 years ago

Arrow was considered when we were looking to replace ff, but at the time I found it to be unstable. It also appears to keep all the data in memory, which is the problem we're trying to avoid with Andromeda.

egillax commented 2 years ago

Hi @schuemie,

It possible was unstable at that time, version 1.0 didn´t come until july 2020. I've not found it unstable and it seems to have been developing fast since then. It´s never crashed on me like it seems to have done for you in 2020.

Just to clarify, all the above timings are using arrow datasets which are for working with file base datasets without loading them in memory. This is also something that appears to not have been around in 2020. This is all done using dplyr verbs, which they have been adding support for fast in the last year. Aggregations were only added recently.

egillax commented 2 years ago

A little more data. I ran three version of tidyCovariates from FeatureExtraction. The arrow implementation from my plp fork, an in memory version I made and the original version which uses andromeda with rqslite. I benchmarked it using bench which measures the memory allocation as well. I used simulated data with 10000 subjects.

benchmark <- bench::mark(tidyCovariateDataArrow(plpData$covariateData),
                         tidyCovariateDataMemory(memoryData),
                         FeatureExtraction::tidyCovariateData(plpAndromeda$covariateData), 
                         check = FALSE, filter_gc = FALSE)

# A tibble: 3 × 13
  expression                                                            min   median `itr/sec` mem_alloc `gc/sec` n_itr  n_gc total_time result memory                  time       gc      
  <bch:expr>                                                       <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl> <int> <dbl>   <bch:tm> <list> <list>                  <list>     <list>  
1 tidyCovariateDataArrow(plpData$covariateData)                       985ms    985ms    1.02     27.88MB   1.02       1     1      985ms <NULL> <Rprofmem [4,547 × 3]>  <bench_tm> <tibble>
2 tidyCovariateDataMemory(memoryData)                                  4.5s     4.5s    0.222     2.47GB   2.89       1    13       4.5s <NULL> <Rprofmem>              <bench_tm> <tibble>
3 FeatureExtraction::tidyCovariateData(plpAndromeda$covariateData)    31.2s    31.2s    0.0321   505.3MB   0.0642     1     2      31.2s <NULL> <Rprofmem [12,739 × 3]> <bench_tm> <tibble>

As can be seen the arrow implementation is faster than the in-memory version, and uses the least memory of all the approaches. Another interesting thing I hadn't mentioned before is that the file sizes of the arrow dataset is smaller, in this case 253MB vs 577MB for rsqlite.

schuemie commented 2 years ago

Cool! That does look promising.

ablack3 commented 2 years ago

Here are some more experiments with the arrow functionality. I think this could work. I guess the directory structure would be something like

AndromedaTempFolder/AndromedaObjectFolder/Dataset/file.feather So we would create one folder for each table in the Andromeda object which could contain partitions or a single file.

We'd be trading SQL for the dplyr implemented in arrow which would likely be more limited. What would be helpful for me is to better understand the operations/functions Andromeda needs to support (e.g. datediff?) Is there anything in SQL that is not available in arrow that we need?

library(arrow)
#> 
#> Attaching package: 'arrow'
#> The following object is masked from 'package:utils':
#> 
#>     timestamp
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library(nycflights13)

write_dataset(airlines, "airlines", format = "feather")
write_dataset(flights, "flights", format = "feather")

andr <- list()
andr$airlines <- open_dataset("airlines", format = "feather")
andr$flights <- open_dataset("flights", format = "feather")

# count rows
andr$flights %>% 
  tally() %>% 
  collect()
#> # A tibble: 1 × 1
#>        n
#>    <int>
#> 1 336776

# head
andr$flights %>% 
  head(10) %>% 
  collect()
#> # A tibble: 10 × 19
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#>  1  2013     1     1      517            515         2      830            819
#>  2  2013     1     1      533            529         4      850            830
#>  3  2013     1     1      542            540         2      923            850
#>  4  2013     1     1      544            545        -1     1004           1022
#>  5  2013     1     1      554            600        -6      812            837
#>  6  2013     1     1      554            558        -4      740            728
#>  7  2013     1     1      555            600        -5      913            854
#>  8  2013     1     1      557            600        -3      709            723
#>  9  2013     1     1      557            600        -3      838            846
#> 10  2013     1     1      558            600        -2      753            745
#> # … with 11 more variables: arr_delay <dbl>, carrier <chr>, flight <int>,
#> #   tailnum <chr>, origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>,
#> #   hour <dbl>, minute <dbl>, time_hour <dttm>

# joins
andr$flights %>% 
  inner_join(andr$airlines, by = "carrier") %>% 
  collect()
#> # A tibble: 336,776 × 20
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#>  1  2013     1     1      517            515         2      830            819
#>  2  2013     1     1      533            529         4      850            830
#>  3  2013     1     1      542            540         2      923            850
#>  4  2013     1     1      544            545        -1     1004           1022
#>  5  2013     1     1      554            600        -6      812            837
#>  6  2013     1     1      554            558        -4      740            728
#>  7  2013     1     1      555            600        -5      913            854
#>  8  2013     1     1      557            600        -3      709            723
#>  9  2013     1     1      557            600        -3      838            846
#> 10  2013     1     1      558            600        -2      753            745
#> # … with 336,766 more rows, and 12 more variables: arr_delay <dbl>,
#> #   carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> #   air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>,
#> #   name <chr>

# joins with dataframe in R
andr$flights %>% 
  inner_join(airlines, by = "carrier") %>% 
  collect()
#> # A tibble: 336,776 × 20
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#>  1  2013     1     1      517            515         2      830            819
#>  2  2013     1     1      533            529         4      850            830
#>  3  2013     1     1      542            540         2      923            850
#>  4  2013     1     1      544            545        -1     1004           1022
#>  5  2013     1     1      554            600        -6      812            837
#>  6  2013     1     1      554            558        -4      740            728
#>  7  2013     1     1      555            600        -5      913            854
#>  8  2013     1     1      557            600        -3      709            723
#>  9  2013     1     1      557            600        -3      838            846
#> 10  2013     1     1      558            600        -2      753            745
#> # … with 336,766 more rows, and 12 more variables: arr_delay <dbl>,
#> #   carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> #   air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>,
#> #   name <chr>

# I'm kind of surprised this works

# filtering joins with data in R
andr$flights %>% 
  semi_join(head(airlines, 1), by = "carrier") %>% 
  collect()
#> # A tibble: 18,460 × 19
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#>  1  2013     1     1      810            810         0     1048           1037
#>  2  2013     1     1     1451           1500        -9     1634           1636
#>  3  2013     1     1     1452           1455        -3     1637           1639
#>  4  2013     1     1     1454           1500        -6     1635           1636
#>  5  2013     1     1     1507           1515        -8     1651           1656
#>  6  2013     1     1     1530           1530         0     1650           1655
#>  7  2013     1     1     1546           1540         6     1753           1748
#>  8  2013     1     1     1550           1550         0     1844           1831
#>  9  2013     1     1     1552           1600        -8     1749           1757
#> 10  2013     1     1     1554           1600        -6     1701           1734
#> # … with 18,450 more rows, and 11 more variables: arr_delay <dbl>,
#> #   carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> #   air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>

# filtering join with data in arrow
andr$flights %>% 
  semi_join(head(andr$airlines, 1), by = "carrier") %>% 
  collect()
#> # A tibble: 18,460 × 19
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#>  1  2013     1     1      810            810         0     1048           1037
#>  2  2013     1     1     1451           1500        -9     1634           1636
#>  3  2013     1     1     1452           1455        -3     1637           1639
#>  4  2013     1     1     1454           1500        -6     1635           1636
#>  5  2013     1     1     1507           1515        -8     1651           1656
#>  6  2013     1     1     1530           1530         0     1650           1655
#>  7  2013     1     1     1546           1540         6     1753           1748
#>  8  2013     1     1     1550           1550         0     1844           1831
#>  9  2013     1     1     1552           1600        -8     1749           1757
#> 10  2013     1     1     1554           1600        -6     1701           1734
#> # … with 18,450 more rows, and 11 more variables: arr_delay <dbl>,
#> #   carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> #   air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>

# filter using data in arrow (not supported)
andr$flights %>% 
  filter(carrier %in% andr$airlines$carrier[1:3]) %>% 
  collect()
#> Error: Filter expression not supported for Arrow Datasets: carrier %in% andr$airlines$carrier[1:3]
#> Call collect() first to pull data into R.

# filter using values in R
andr$flights %>% 
  filter(carrier %in% airlines$carrier[1:3]) %>% 
  collect()
#> # A tibble: 51,903 × 19
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#>  1  2013     1     1      542            540         2      923            850
#>  2  2013     1     1      558            600        -2      753            745
#>  3  2013     1     1      559            600        -1      941            910
#>  4  2013     1     1      606            610        -4      858            910
#>  5  2013     1     1      623            610        13      920            915
#>  6  2013     1     1      628            630        -2     1137           1140
#>  7  2013     1     1      629            630        -1      824            810
#>  8  2013     1     1      635            635         0     1028            940
#>  9  2013     1     1      656            700        -4      854            850
#> 10  2013     1     1      656            659        -3      949            959
#> # … with 51,893 more rows, and 11 more variables: arr_delay <dbl>,
#> #   carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> #   air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>

# sum
andr$flights %>% 
  count(carrier) %>% 
  inner_join(andr$airlines, by = "carrier") %>% 
  mutate(number_of_flights = n) %>% 
  select(name, number_of_flights) %>% 
  collect()
#> # A tibble: 16 × 2
#>    name                        number_of_flights
#>    <chr>                                   <int>
#>  1 United Air Lines Inc.                   58665
#>  2 American Airlines Inc.                  32729
#>  3 JetBlue Airways                         54635
#>  4 Delta Air Lines Inc.                    48110
#>  5 ExpressJet Airlines Inc.                54173
#>  6 Envoy Air                               26397
#>  7 US Airways Inc.                         20536
#>  8 Southwest Airlines Co.                  12275
#>  9 Virgin America                           5162
#> 10 AirTran Airways Corporation              3260
#> 11 Alaska Airlines Inc.                      714
#> 12 Endeavor Air Inc.                       18460
#> 13 Frontier Airlines Inc.                    685
#> 14 Hawaiian Airlines Inc.                    342
#> 15 Mesa Airlines Inc.                        601
#> 16 SkyWest Airlines Inc.                      32

# distinct
andr$flights %>% 
  distinct(carrier) %>% 
  left_join(andr$airlines, by = "carrier") %>% 
  collect()
#> # A tibble: 16 × 2
#>    carrier name                       
#>    <chr>   <chr>                      
#>  1 UA      United Air Lines Inc.      
#>  2 AA      American Airlines Inc.     
#>  3 B6      JetBlue Airways            
#>  4 DL      Delta Air Lines Inc.       
#>  5 EV      ExpressJet Airlines Inc.   
#>  6 MQ      Envoy Air                  
#>  7 US      US Airways Inc.            
#>  8 WN      Southwest Airlines Co.     
#>  9 VX      Virgin America             
#> 10 FL      AirTran Airways Corporation
#> 11 AS      Alaska Airlines Inc.       
#> 12 9E      Endeavor Air Inc.          
#> 13 F9      Frontier Airlines Inc.     
#> 14 HA      Hawaiian Airlines Inc.     
#> 15 YV      Mesa Airlines Inc.         
#> 16 OO      SkyWest Airlines Inc.

# mean, min, max
andr$flights %>% 
  group_by(carrier) %>% 
  summarise(mean_delay = mean(dep_delay, na.rm = TRUE),
            min_delay = min(dep_delay, na.rm = TRUE),
            max_delay = max(dep_delay, na.rm = TRUE)) %>% 
  left_join(andr$airlines, by = "carrier") %>% 
  select(name, mean_delay, min_delay, max_delay) %>% 
  arrange(desc(mean_delay)) %>% 
  collect()
#> # A tibble: 16 × 4
#>    name                        mean_delay min_delay max_delay
#>    <chr>                            <dbl>     <dbl>     <dbl>
#>  1 Frontier Airlines Inc.           20.2        -27       853
#>  2 ExpressJet Airlines Inc.         20.0        -32       548
#>  3 Mesa Airlines Inc.               19.0        -16       387
#>  4 AirTran Airways Corporation      18.7        -22       602
#>  5 Southwest Airlines Co.           17.7        -13       471
#>  6 Endeavor Air Inc.                16.7        -24       747
#>  7 JetBlue Airways                  13.0        -43       502
#>  8 Virgin America                   12.9        -20       653
#>  9 SkyWest Airlines Inc.            12.6        -14       154
#> 10 United Air Lines Inc.            12.1        -20       483
#> 11 Envoy Air                        10.6        -26      1137
#> 12 Delta Air Lines Inc.              9.26       -33       960
#> 13 American Airlines Inc.            8.59       -24      1014
#> 14 Alaska Airlines Inc.              5.80       -21       225
#> 15 Hawaiian Airlines Inc.            4.90       -16      1301
#> 16 US Airways Inc.                   3.78       -19       500

# date functions
library(wakefield)
#> 
#> Attaching package: 'wakefield'
#> The following object is masked from 'package:dplyr':
#> 
#>     id
#> The following object is masked from 'package:arrow':
#> 
#>     string
date_df <- tibble::tibble(date1 = date_stamp(1e5, T), date2 = date_stamp(1e5, T))
write_dataset(date_df, "date_df", format = "feather")
andr$date_df <- open_dataset("date_df", format = "feather")

# date types are preserved
andr$date_df %>% 
  collect()
#> # A tibble: 100,000 × 2
#>    date1      date2     
#>  * <date>     <date>    
#>  1 2022-01-27 2021-08-27
#>  2 2021-05-27 2021-07-27
#>  3 2021-05-27 2021-11-27
#>  4 2021-10-27 2022-04-27
#>  5 2021-05-27 2021-07-27
#>  6 2021-10-27 2021-07-27
#>  7 2022-02-27 2021-06-27
#>  8 2021-11-27 2021-10-27
#>  9 2022-01-27 2022-01-27
#> 10 2022-03-27 2022-01-27
#> # … with 99,990 more rows

# date difference does not work
andr$date_df %>% 
  mutate(date_diff = date2 - date1) %>% 
  collect()
#> Error in `handle_csv_read_error()`:
#> ! NotImplemented: Function 'subtract_checked' has no kernel matching input types (array[date32[day]], array[date32[day]])

# probably not much we can do with dates without first pulling the data into R
andr$date_df %>% 
  mutate(year = lubridate::year(date1)) %>% 
  collect()
#> Error: Expression lubridate::year(date1) not supported in Arrow
#> Call collect() first to pull data into R.

andr$date_df %>% 
  mutate(min_date = min(date1)) %>% 
  collect()
#> Error: window functions not currently supported in Arrow
#> Call collect() first to pull data into R.

# interestingly this works
andr$date_df %>% 
  mutate(date_number = as.integer(date1)) %>% 
  collect()
#> # A tibble: 100,000 × 3
#>    date1      date2      date_number
#>  * <date>     <date>           <int>
#>  1 2022-01-27 2021-08-27       19019
#>  2 2021-05-27 2021-07-27       18774
#>  3 2021-05-27 2021-11-27       18774
#>  4 2021-10-27 2022-04-27       18927
#>  5 2021-05-27 2021-07-27       18774
#>  6 2021-10-27 2021-07-27       18927
#>  7 2022-02-27 2021-06-27       19050
#>  8 2021-11-27 2021-10-27       18958
#>  9 2022-01-27 2022-01-27       19019
#> 10 2022-03-27 2022-01-27       19078
#> # … with 99,990 more rows

andr$date_df %>% 
  summarise(min_date_number = min(as.integer(date1))) %>% 
  collect()
#> # A tibble: 1 × 1
#>   min_date_number
#>             <int>
#> 1           18774

# window functions are not supported
andr$flights %>% 
  group_by(carrier) %>% 
  mutate(mean_delay = mean(dep_delay))
#> Error: window functions not currently supported in Arrow
#> Call collect() first to pull data into R.

# can we create a modified copy of an arrow table without pulling the data into R?
andr$flights %>% 
  semi_join(andr$airlines[1:4,], by = "carrier") %>% 
  write_dataset("flights2", format = "feather")

andr$flights2 <- open_dataset("flights2", format = "feather")

andr$flights2 %>% 
  collect()
#> # A tibble: 106,538 × 19
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#>  1  2013     1     1      542            540         2      923            850
#>  2  2013     1     1      544            545        -1     1004           1022
#>  3  2013     1     1      555            600        -5      913            854
#>  4  2013     1     1      557            600        -3      838            846
#>  5  2013     1     1      558            600        -2      753            745
#>  6  2013     1     1      558            600        -2      849            851
#>  7  2013     1     1      558            600        -2      853            856
#>  8  2013     1     1      559            600        -1      941            910
#>  9  2013     1     1      559            559         0      702            706
#> 10  2013     1     1      600            600         0      851            858
#> # … with 106,528 more rows, and 11 more variables: arr_delay <dbl>,
#> #   carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> #   air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>

# check memory usage
bench::mark(andr$flights %>% write_dataset("flights2", format = "feather"),
            andr$flights %>% collect() %>% write_dataset("flights2", format = "feather"),
            check = FALSE, filter_gc = FALSE)
#> # A tibble: 2 × 6
#>   expression                                                                  
#>   <bch:expr>                                                                  
#> 1 andr$flights %>% write_dataset("flights2", format = "feather")              
#> 2 andr$flights %>% collect() %>% write_dataset("flights2", format = "feather")
#> # … with 5 more variables: min <bch:tm>, median <bch:tm>, `itr/sec` <dbl>,
#> #   mem_alloc <bch:byt>, `gc/sec` <dbl>

# seems like joins are happening outside of R
bench::mark(andr$flights %>% inner_join(andr$airlines, by = "carrier") %>% collect(),
            flights %>% inner_join(airlines, by = "carrier") %>% collect(),
            check = FALSE, filter_gc = FALSE)
#> # A tibble: 2 × 6
#>   expression                                                              
#>   <bch:expr>                                                              
#> 1 andr$flights %>% inner_join(andr$airlines, by = "carrier") %>% collect()
#> 2 flights %>% inner_join(airlines, by = "carrier") %>% collect()          
#> # … with 5 more variables: min <bch:tm>, median <bch:tm>, `itr/sec` <dbl>,
#> #   mem_alloc <bch:byt>, `gc/sec` <dbl>

Created on 2022-04-27 by the reprex package (v2.0.1)

leeevans commented 2 years ago

@ablack3

I haven't tried it but DuckDB can be used for SQL access to Apache Arrow tables: https://duckdb.org/2021/12/03/duck-arrow.html

schuemie commented 2 years ago

I don't think we need the SQL at all. Andromeda was set up to not rely on SQL, just on dplyr syntax (which happened to be supported through SQL when using RSQLite as engine).

The most complicated uses of Andromeda I'm aware of are this code in FeatureExtraction and this code in CohortMethod. I'm not aware of any datediffs needed. @jreps : what are your requirements for Andromeda operations?

ablack3 commented 2 years ago

I started working on an Andromeda implementation that uses arrow (no SQL or duckdb). I have not checked in my code yet. I'm currently stuck on this function. I switched to using S3. The current implementation has S4 methods but creates the object by assigning the class attribute only and doesn't call new. It was extending SQLite connection which was S4. Anyway, I'll keep at it.

#' @param x    An [`Andromeda`] object.
#' @param i    The name of a table in the [`Andromeda`] object.
#' @param value A data frame, [`Andromeda`] table, or other 'DBI' table.
#' @export
#' @rdname
#' Andromeda-class
"[[<-.Andromeda" <- function(x, i, value) { 
  # checkIfValid(x)
  if(!is.null(value) && !inherits(value, "data.frame") && !inherits(value, "arrow_dplyr_query")) {
    abort("value must be null, a dataframe, or an dplyr query using an existing andromeda table")
  }

  if (is.null(value)) {
    if (i %in% names(x)) {
      r <- unlink(file.path(attr(x, "path"), i), recursive = TRUE)
      if (r == 1) abort(paste("Removal of Andromeda dataset", i, "failed."))
    }
  } else {
    # .checkAvailableSpace(x)
    arrow::write_dataset(value, file.path(attr(x, "path"), i), format = "feather")
    `[[<-.list`(x, i, arrow::open_dataset(file.path(attr(x, "path"), i), format = "feather")) # This line won't work.
     x[[i]] <- arrow::open_dataset(file.path(attr(x, "path"), i), format = "feather") # This doesn't work either
     NextMethod(generic = "[[<-", object = x, i = i, value = arrow::open_dataset(file.path(attr(x, "path"), i), format = "feather")) # does not work
  } 
  x
}

Seems like I need nextMethod.

ablack3 commented 2 years ago

I think I have a the basic idea working for assignment.

"[[<-.Andromeda" <- function(x, i, value) {
  print("using Andromeda method")
  arrow::write_dataset(value, file.path("temp", i), format = "feather")
  value <- arrow::open_dataset(file.path("temp", i), format = "feather")
  NextMethod()
}

a <- list()
class(a) <- "Andromeda"

a[["cars"]] <- cars
#> [1] "using Andromeda method"

class(a$cars)
#> [1] "FileSystemDataset" "Dataset"           "ArrowObject"      
#> [4] "R6"

Created on 2022-05-02 by the reprex package (v2.0.1)

ablack3 commented 2 years ago

There is a problem with loading and saving zero row dataframes. arrow::write_dataset() does nothing with a zero row dataframe.

df <- cars[cars$speed > 1e6,]
print(df)
#> [1] speed dist 
#> <0 rows> (or 0-length row.names)

arrow::write_dataset(df, here::here("df"), format = "parquet")

d <- arrow::open_dataset(here::here("df"), format = "parquet")
#> Error: IOError: Cannot list directory '.../RtmpvMgShC/reprex-850311752e99-bonny-fox/df'. Detail: [errno 2] No such file or directory

Created on 2022-05-03 by the reprex package (v2.0.1)

ablack3 commented 2 years ago

I pushed my initial implementation to the arrow branch in case anyone wants to give it a try. Arrow greatly simplifies Andromeda because we only have to deal with a set of files rather than a database (e.g. indexes are not needed anymore).

I could use help thinking through how the functions in Operations.R file should be adapted to take advantage of arrow. I'm not sure the current implementations make as much sense when you have partitioned feather files. Perhaps the batches should be the partitions. Maybe we can take advantage of arrow::map_batches()

schuemie commented 2 years ago

Great work @ablack3 !

I tried playing with the new branch. One behavior that is different (arguably better) with the arrow backend is that copies are independent:

a <- Andromeda::andromeda()

b <- a

b$cars <- cars
a
# # Andromeda object
# # Physical location:  C:\Users\mschuemi.EU\AppData\Local\Temp\Rtmp8IUZjQ\file2c287dc86e5c
# 
# Tables:

b
# # Andromeda object
# # Physical location:  C:\Users\mschuemi.EU\AppData\Local\Temp\Rtmp8IUZjQ\file2c287dc86e5c
# 
# Tables:
# $cars (speed, dist)

However, I can't seem to actually access the tables?

a <- Andromeda::andromeda(cars = cars)
colnames(a$cars)
# NULL
ablack3 commented 2 years ago

thanks for giving it a try! I think you found a bug. Copies are not actually independent. a and b are two different Andromeda objects that point to the same files. The 'path' attribute points to the folder where the feather files are stored.

library(Andromeda)
a <- andromeda(cars = cars)

# names is implemented by the arrow package
names(a$cars)
#> [1] "speed" "dist"

# colnames is not implemented by arrow but could be added in Andromeda
colnames(a$cars)
#> NULL

# colnames works if you pull the data into R first 
colnames(collect(a$cars))
#> [1] "speed" "dist"

b <- a
attr(a, "path") == attr(b, "path")
#> [1] TRUE

list.dirs(attr(a, "path"), full.names = F)
#> [1] ""     "cars"
list.dirs(attr(a, "path"), full.names = F)
#> [1] ""     "cars"

b$cars <- NULL

list.dirs(attr(a, "path"), full.names = F)
#> [1] ""
list.dirs(attr(a, "path"), full.names = F)
#> [1] ""

a
#> # Andromeda object
#> # Physical location:  /var/folders/xx/01v98b6546ldnm1rg1_bvk000000gn/T//RtmpmPXb7A/file1274499a75dd
#> 
#> Tables:
#> $cars (speed, dist)

a$cars %>% collect()
#> Error in `handle_csv_read_error()`:
#> ! IOError: Failed to open local file '/private/var/folders/xx/01v98b6546ldnm1rg1_bvk000000gn/T/RtmpmPXb7A/file1274499a75dd/cars/part-0.feather'. Detail: [errno 2] No such file or directory

Created on 2022-05-10 by the reprex package (v2.0.1)

ablack3 commented 2 years ago

This is tricky. I fixed 'names.Andromeda' but if someone copies an an Andromeda object I can't really update all the other andromeda objects that refer to the same file location.

So perhaps it would be better to create a copy every time an andromeda object is modified? However this seems like potentially a lot of unnecessary copying of large datasets. Maybe another option is to use reference classes

library(Andromeda)
a <- andromeda(cars = cars)
b <- a
names(a)
#> [1] "cars"

# remove the cars file
b$cars <- NULL

# the new state is reflected in both a and b
a
#> # Andromeda object
#> # Physical location:  /var/folders/xx/01v98b6546ldnm1rg1_bvk000000gn/T//RtmpwFFo6h/file1182f64ac99ff
#> 
#> Tables:
b
#> # Andromeda object
#> # Physical location:  /var/folders/xx/01v98b6546ldnm1rg1_bvk000000gn/T//RtmpwFFo6h/file1182f64ac99ff
#> 
#> Tables:
names(a)
#> character(0)

# however a still contains a reference to the file that no longer exists
length(a)
#> [1] 1
length(b)
#> [1] 0

a$cars
#> FileSystemDataset with 1 Feather file
#> speed: double
#> dist: double
#> 
#> See $metadata for additional Schema metadata

a$cars %>% collect()
#> Error in `handle_csv_read_error()`:
#> ! IOError: Failed to open local file '/private/var/folders/xx/01v98b6546ldnm1rg1_bvk000000gn/T/RtmpwFFo6h/file1182f64ac99ff/cars/part-0.feather'. Detail: [errno 2] No such file or directory

Created on 2022-05-11 by the reprex package (v2.0.1)

ablack3 commented 2 years ago

UPDATE: I think I have a solution for copies of the andromeda object by syncing the dataset reference with the existing files every time any method is called.

The arrow branch can now handle 0 row dataframes but if a dplyr query produces a result that has zero rows which is then written to andromeda the assignment fails (NULL is assigned). so that is still an issue. I created an issue about it on the arrow Jira https://issues.apache.org/jira/browse/ARROW-16575.

ablack3 commented 2 years ago

I think the arrow branch is ready for some alpha testing and some early user feedback. devtools::install_github("OHDSI/Andromeda", ref = "arrow") What works: object creation, dplyr syntax for data manipulation, saving and loading (tests are passing). (Andromeda objects are now cleaned up only when the R session ends and not before.) What does not work: batchApply, groupApply, appendToTable, batchTest (These implementations should be reconsidered to take advantage of the feather file format) What has been removed: everything related to indexes, ability to set column names, user defined attributes are now saved as json instead of rds.

If this seems like a significant improvement and the way to go I can work on implementing the apply functions. I should also create some meaningful performance tests.

ablack3 commented 2 years ago

I think this could work for appending. Feather files don't seem to support appending but they can be read an written without copying (not sure how this works but it is pretty cool).

library(dplyr)
library(arrow)
library(wakefield)

# create a dataframe with 10 million rows
df <- r_data_theme(n = 1e7, "survey")
df
#> # A tibble: 10,000,000 × 11
#>    ID    Item_1 Item_2 Item_3 Item_4 Item_5 Item_6 Item_7 Item_8 Item_9 Item_10
#>    <chr>  <int>  <int>  <int>  <int>  <int>  <int>  <int>  <int>  <int>   <int>
#>  1 00001      1      4      2      4      5      2      3      3      2       5
#>  2 00002      5      2      4      3      5      1      5      4      1       5
#>  3 00003      3      3      2      4      2      4      1      5      2       3
#>  4 00004      3      4      4      4      1      5      2      1      2       4
#>  5 00005      4      4      2      1      5      5      3      3      4       4
#>  6 00006      2      4      5      4      2      4      1      3      5       3
#>  7 00007      3      5      5      3      2      4      4      2      5       3
#>  8 00008      1      1      5      3      3      2      3      3      4       4
#>  9 00009      4      4      2      2      2      3      3      2      4       3
#> 10 00010      4      3      5      1      2      5      3      4      2       3
#> # … with 9,999,990 more rows

# save the data in arrow format and open a link to it in R
system.time({
  write_dataset(df, "df", "arrow")
  d <- open_dataset("df", format = "arrow")
})
#>    user  system elapsed 
#>   0.594   0.155   0.792

# append to an existing file system dataset
system.time({
  s <- Scanner$create(d)
  write_dataset(concat_tables(s$ToTable(), arrow_table(df)), "df", format = "arrow")
  d <- open_dataset("df", format = "arrow")
})
#>    user  system elapsed 
#>   0.575   0.377   0.940

nrow(d)
#> [1] 20000000

# remove the file
unlink("df", recursive = T)

Created on 2022-05-18 by the reprex package (v2.0.1)

Kind of crazy how fast arrow read/writes are.

egillax commented 2 years ago

Hi @ablack3,

I did some testing. Looks very good. I could run the tidyCovariate function on the new andromeda object by changing one word (one count() to n()) and by disabling the checks in the beginning of that function. And I get similar speedup as I have seen before, so significantly faster than the sqlite version.

However the reasons the checks are failing are because FeatureExtraction has it's own S4 class covariateData which inherits from the Andromeda class. Now that you changed Andromeda to an S3 maybe this inheritance is broken? At least running the following used to give true but now gives false:

> covData <- FeatureExtraction:::createEmptyCovariateData(cohortId=1, aggregated=F, temporal=F)
> inherits(covData, 'Andromeda')
[1] FALSE

Also another of the test FeatureExtraction::isAggregatedCovariateData() does depend on the object having colnames() defined which used to be the case but doesn't work anymore, so maybe needs to be defined.

ablack3 commented 2 years ago

So I think there are a couple next steps for this issue (which I think will solve all of the currently open issues on Andromeda).

  1. Adjustments need to be made in the packages that extend Andromeda (FeatureExtraction, possibly CohortMethod, ...)
  2. Assess if we actually need the Andromeda methods that have not been implemented yet and come up with implementations that take advantage of arrow. (e.g batchApply). I'm currently not sure how to do this since arrow is quite different than SQLite.
schuemie commented 2 years ago

Could you explain a bit more about what you mean by (2) ('Asses if actually...')?

The batchApply() function is used for example in Cyclops. In general, it seems essential for a package like Andromeda to have some way to process the large data (that may not fit in memory) in smaller chunks.

egillax commented 2 years ago

With regards to batchApply() I think it could just be a wrapper around arrows map_batches. That's what I did when trying it out earlier with Cyclops and seemed to work.

Also @ablack3 I see a possible issue with your appendToTable implementation. By converting to an arrow table I believe you are loading everything in memory. Did you try this with data bigger than memory?

ablack3 commented 2 years ago

Could you explain a bit more about what you mean by (2) ('Assess if actually...')?

I want to check that all of the remaining functions are necessary and need to be implemented on arrow of is any of them can be deprecated. The remaining functions are batchApply groupApply BatchTest

batchApply and possibly groupApply functionality could be covered by map_batches. One problem with map_batches is that "This is experimental and not recommended for production use."

Should we use map_batches? Is map_batches a drop in replacement for batchApply? If so can we simply export map_batches instead of wrapping it and renaming it batchApply?

The batchApply() function is used for example in Cyclops. In general, it seems essential for a package like Andromeda to have some way to process the large data (that may not fit in memory) in smaller chunks.

But batchApply does read all the data into R, just not all at once. Possibly a dumb question but how does this save space? If I have 10GB of data and read it in 1GB at a time don't I still have to allocate 10GB of RAM? Is the limitation the size of a single object (e.g. I can create 10 object of 1GB each but not a single object of 10GB). Or perhaps garbage collection is running in between reads?

Also @ablack3 I see a possible issue with your appendToTable implementation. By converting to an arrow table I believe you are loading everything in memory. Did you try this with data bigger than memory?

I think I'm lacking a good mental model for how arrow works. Where, when, and how is the data processing actually done? I understand that arrow can only read in the data that is necessary but aggregation is actually performed in R when collect is called right?

There is an arrow course at the upcoming useR conference but unfortunately it is full. :/

So one thing I did learn is that arrow supports "zero copy reads" so apparently even though I convert the "Dataset" into an arrow table it is somehow read into memory without being read into memory. Black magic I guess?

library(pryr)
library(arrow)

# datasource: https://www.kaggle.com/datasets/raddar/amex-data-integer-dtypes-parquet-format?resource=download
file.size("~/Downloads/test.parquet")
#> [1] 3301659934

system.time({
  d <- open_dataset("~/Downloads/test.parquet", format = "parquet")
})
#> user  system elapsed 
#> 0.017   0.001   0.018 

object_size(d)
# 261,088 B

# append to an existing file system dataset
system.time({
  dataScanner <- arrow::Scanner$create(d)
  dataArrowTable <- dataScanner$ToTable()
})
#> user  system elapsed
#> 8.225   5.432   3.820 

class(dataArrowTable)
#> [1] "Table"        "ArrowTabular" "ArrowObject"  "R6"  

object_size(dataScanner)
#> 172,136 B

object_size(dataArrowTable)
#> 284,632 B

So it looks like reading in an Arrow table does not require much memory. But I did notice that RStudio reported the "memory used by session" increased to about 2 GB.

image

Aside: The torch package is on CRAN and the tarball is 42 megabytes. I thought CRAN had a size limitation of 5MB.

Sorry for all the questions and thanks for the input!

schuemie commented 2 years ago

but how does this save space?

The idea of batchApply() is that you only have small parts of the entire data in memory at one time, do some operation on it, and then no longer keep it in memory. The Cyclops example is a bit awkward in that the operation actually implies storing it in memory, just in Cyclops' internal hyper-efficient sparse-matrix memory storage.

can we simply export map_batches instead of wrapping it and renaming it batchApply?

I prefer we wrap map_batches. That makes for a nice separation between Andromeda's interface and it's implementation. Similarly, groupApply() and batchTest() should continue to be supported, and if possible, colnames() and count() as well.

egillax commented 2 years ago

+1 for wrapping map_batches into batchApply.

@ablack3 I believe the arrow table is an in memory object. I think what you are measuring with object_size() on the table is the c++ pointer. And what you see in rstudio under memory usage by session is the actual memory usage. An arrow table actually has an nbytes() method which I believe to be the memory usage of the underlying c++ buffers.

I did some experiments with the nyc_taxi dataset referenced on the arrow website. After some initial troubles where I had to exclude some columns that were giving me issues I managed to crash my rsession by calling collect() on it. The dataset is 37GiB and my RAM is 32 GiB, so no surprises there. I however also crash my session when emulating the appendToTable process of creating a scanner and converting it to table.

I did come up with a Linux specific way of measuring the peak memory usage by my rsession process . It's option number 3 from here.

Then I can run the emulated appendToTable process on the nyc-taxi dataset:

library(dplyr)
library(arrow)

# some of the columns in the dataset were giving me errors, something about
# strings and null, so I made a new dataset without string or null columns
# ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
# 
# write_dataset(ds %>% select(-c(vendor_id, store_and_fwd_flag,
#                                payment_type, rate_code_id)), 
#               path,
#               partitioning = c('year', "month"))
path <- './new_taxi/'
new_ds <- open_dataset(path)

subset <- new_ds %>% filter(year==2018) %>% filter(month==1) %>% compute()

scanner <- Scanner$create(new_ds %>% filter(year==2017))
table <- scanner$ToTable()

arrow::concat_tables(subset, table) 

This results in about 9 GiB of peak memory use.

On the other hand I made an alternative implementation which dumps the dataframe to be appended to a file and then creates a new dataset from the list of files of the current dataset and the dataframe file.

library(dplyr)
library(arrow)

path <- './new_taxi/'
new_ds <- open_dataset(path)

subset <- new_ds %>% filter(year==2018) %>% filter(month==1) %>% compute()

tempFile <- tempfile()

write_parquet(subset, tempFile)

appended_ds <- open_dataset(c(new_ds$files, tempFile))

This give me about 1 GiB peak memory usage of the process. the subset$nbytes() tells me the arrow table should contain about 600 MiB. This seems to match up approximately with what rstudio reports under used by session.

ablack3 commented 2 years ago

Thanks for the experiments. I'll implement batchApply.

I think my implementation was inspired by Wes McKinney's comment here:

image
egillax commented 2 years ago

I did try this today as well with feather files instead of parquet with the same results.

I don't think zero-copy means the data is still on disk. I think it means that the data is moved to the destination without any unnecessary copies. See for example this.

ablack3 commented 2 years ago

On the topic of batchApply(tbl, fun, ..., batchSize). This is essentially adding a batchSize argument to map_batches which is a bit awkward because the size of record batches that map_batches is mapping over is defined when the file is written not when the file is read. To implement batchApply(tbl, fun, ..., batchSize) using map_batches I had to copy the dataset to a temp location, setting the batchSize and then use map_batches. I'm thinking a better approach would be to set the batch size when an andromeda object is created which seems to align better with arrow. @egillax Does that make sense to you?

https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc

egillax commented 2 years ago

I'm not sure I understand how you were setting the batchSize when creating an andromeda object. Could you clarify that a bit?

I see there is a default batch size of 132k rows in map_batches . Is there a reason we would want to change that or even have control of the batch size?

I also see that a feather file has something called chunk_size with a default of 64k rows.. these very similar but different terms are getting me a bit confused.

I do agree though that copying the dataset just to change the batchSize seems a bit awkward.

ablack3 commented 2 years ago

I'm not sure I understand how you were setting the batchSize when creating an andromeda object. Could you clarify that a bit?

Here is how I'm setting the size of the recordBatches.

library(arrow)
library(nycflights13)

write_dataset(airlines, "arrow/airlines", format = "feather", max_rows_per_file = 4)
ds <- open_dataset("arrow/airlines", format = "feather")
map_batches(ds, nrow, .data.frame = F)
#> [[1]]
#> [1] 4
#> 
#> [[2]]
#> [1] 4
#> 
#> [[3]]
#> [1] 4
#> 
#> [[4]]
#> [1] 4

Created on 2022-06-24 by the reprex package (v2.0.1)

I see there is a default batch size of 132k rows in map_batches . Is there a reason we would want to change that or even have control of the batch size?

Where do you see that? I didn't find any reference to batch size in map_batches. I'm looking here.

Yea I'm not sure yet what the difference between chunk size and recordBatch size is. I'll spend some more time with the documentation. I do think that the notion of a batch size has to do with how the data is written and stored on disk rather. I don't think it is possible to change the batch size while reading a FileSystemDataset.

ablack3 commented 2 years ago

I think I have a solution for batchApply using a Scanner. I implemented a new version of appendToTable based on the suggestion from @egillax.

The only remaining function is groupApply I think.

All prior tests are passing but I did have to modify the tests that involve table appending. When using AppendToTable there are no longer any guarantees about row order. (I'm not sure if we were really guaranteeing that row order was preserved in the SQLite implementation.) Anyway I had to change the tests a bit to reflect that row order is not preserved.

Updates are in the arrow branch.

ablack3 commented 2 years ago

The current groupApply implementation seems to work. I'm not sure that it is the best way to accomplish batching by groups with arrow but all of the tests pass so hopefully it is ok for now.

So I'd say the full andromeda interface has been implemented with arrow although there are a few small changes. For example close now attempts to delete the andromeda object. I've notices when running tests on the windows github actions runner the andromeda file can't be deleted (for reasons I don't understand) but this does not seem like a big deal. When R is restarted all newly created Andromeda objects in the andromeda temp folder should be removed.

https://github.com/OHDSI/Andromeda/actions/runs/2577456192

schuemie commented 2 years ago

This is looking very promising! It would make life a lot easier if count() and colnames() could somehow be supported.

Also, it seems the current version does not delete the files from the Andromeda temp folder when R is closed?

ablack3 commented 2 years ago
library(Andromeda)

# count currently does work with FileSystemDataset objects 
andr <- andromeda(iris = iris)

andr$iris %>% 
  count(Species) %>% 
  collect()
#> # A tibble: 3 × 2
#>   Species        n
#>   <fct>      <int>
#> 1 setosa        50
#> 2 versicolor    50
#> 3 virginica     50

# colnames does not work (although `names()` does work which might be a suitable alternative)
colnames(iris)
#> [1] "Sepal.Length" "Sepal.Width"  "Petal.Length" "Petal.Width"  "Species"
colnames(andr$iris)
#> NULL

# colnames is not a generic function
sloop::is_s3_generic("colnames")
#> [1] FALSE

# if I make it a generic then I'm overwriting the base implementation
colnames <- function(x) UseMethod("colnames")
colnames.columns <- function(x) print(paste(x$names, collapse = ", "))

l <- structure(list(names = c("Doric", "Ionic", "Corinthian")), class = "columns")

colnames(l)
#> [1] "Doric, Ionic, Corinthian"

colnames.FileSystemDataset <- function(x) names(x)
colnames(andr$iris)
#> [1] "Sepal.Length" "Sepal.Width"  "Petal.Length" "Petal.Width"  "Species"

# But now Andromeda::colnames conflicts base::colnames
colnames(iris)
#> Error in UseMethod("colnames"): no applicable method for 'colnames' applied to an object of class "data.frame"
base::colnames(iris)
#> [1] "Sepal.Length" "Sepal.Width"  "Petal.Length" "Petal.Width"  "Species"

Created on 2022-06-28 by the reprex package (v2.0.1)

Also, it seems the current version does not delete the files from the Andromeda temp folder when R is closed?

Each time an andromeda object is created it's location is saved in the Package environment. (ref)

Then when garbage collection runs or at the end of the R session the files should be deleted (ref)

This works as expected on Mac but I noticed some strange behavior on Windows with regards to unlink so I might be missing something. If you remove the R objects and run gc() do they get cleaned up? What about if you restart the R session?

schuemie commented 2 years ago

The released version of Andromeda attaches a finalizer to each Andromeda object, and therefore there is an expectation it will be called when the object is no longer referenced and garbage collection occurs.

I'm not sure why you would expect that to happen in the arrow branch. There's currently a single finalizer at the package level, which will only be called when R is shut down. I'll try to find out why that does not appear to happen on Windows, but having a solution where temp files only get cleaned up at R shutdown will not work (when opening and closing a lot of Andromeda objects in a single session). Is there a reason for the switch from per-Andromeda to per-R-session finalizer?

ablack3 commented 2 years ago

I'm not sure why you would expect that to happen in the arrow branch.

You're right. In the arrow branch I have attached the finalizer to the package environment.

I'll try to find out why that does not appear to happen on Windows, but having a solution where temp files only get cleaned up at R shutdown will not work (when opening and closing a lot of Andromeda objects in a single session). Is there a reason for the switch from per-Andromeda to per-R-session finalizer?

Thanks. I'm not sure how to attach a finalizer to each andromeda object. The reason for the switch is that I need an environment or external pointer to attach the finalizer to and the arrow-andromeda object has neither. (See example below)

# This is basically the new andromeda constructor
path <- tempfile(tmpdir = Andromeda:::.getAndromedaTempFolder())
dir.create(path)
andromeda <- structure(list(), class = "Andromeda", path = path)

reg.finalizer(andromeda, function(a) invisible(unlink(a$path, recursive = TRUE)))
#> Error in reg.finalizer(andromeda, function(a) invisible(unlink(a$path, : first argument must be environment or external pointer

Created on 2022-06-29 by the reprex package (v2.0.1)

The current Andromeda finalizer used the database connection which is an external pointer.

I can attach a finalizer to a FileSystemDataset though...

arrow::write_dataset(cars, "cars")
d <- arrow::open_dataset("cars")

reg.finalizer(d, function(d) print("It works!"))
#> NULL

rm(d)
gc()
#> [1] "It works!"
#>           used (Mb) gc trigger  (Mb) limit (Mb) max used (Mb)
#> Ncells 1139383 60.9    2417757 129.2         NA  1324251 70.8
#> Vcells 2027567 15.5    8388608  64.0      32768  3676384 28.1

Created on 2022-06-29 by the reprex package (v2.0.1)

I find the behavior of reg.finalizer a little hard to predict. If I add an environment to the andromeda object and then attach the finalizer to that I would expect the clean up to work but it does not.

path <- tempfile(tmpdir = Andromeda:::.getAndromedaTempFolder())
dir.create(path)
e <- new.env()
e$path <- path
andr <- structure(list(), class = "Andromeda", env = e)
reg.finalizer(attr(andr, "env"), function(e) unlink(e$path, recursive = TRUE))
#> NULL
file.exists(path)
#> [1] TRUE
rm(andr)
gc()
#>           used (Mb) gc trigger (Mb) limit (Mb) max used (Mb)
#> Ncells  768111 41.1    1313695 70.2         NA  1313695 70.2
#> Vcells 1388745 10.6    8388608 64.0      32768  2347089 18.0
file.exists(path)
#> [1] TRUE

Created on 2022-06-29 by the reprex package (v2.0.1)

ablack3 commented 2 years ago

Another solution that I would expect to work but does not. I must be missing something.

newAndromeda <- function(){
  path <- tempfile(tmpdir = Andromeda:::.getAndromedaTempFolder())
  dir.create(path)
  ptr <- new("externalptr")
  andr <- structure(list(), class = "Andromeda2", path = path, ptr = ptr)
  cleanup <- function(ptr){
    ptr
    print("running finalizer")
    if(unlink(path, recursive = TRUE) == 1) warning("error with file deletion")
  }
  reg.finalizer(attr(andr, "ptr"), cleanup)
  andr
}
andr <- newAndromeda()
path <- attr(andr, "path")
file.exists(path)
#> [1] TRUE
rm(andr)
gc()
#>           used (Mb) gc trigger (Mb) limit (Mb) max used (Mb)
#> Ncells  767993 41.1    1324238 70.8         NA  1324238 70.8
#> Vcells 1388740 10.6    8388608 64.0      32768  2345481 17.9
file.exists(path)
#> [1] TRUE

Created on 2022-06-29 by the reprex package (v2.0.1)

schuemie commented 2 years ago

Perhaps the problem isn't that the finalizer isn't called, but that the unlinking fails (because the files are locked)?

I tried to run your example code to test this, but

andr <- structure(list(), class = "Andromeda", env = e)
# Error in dir.exists(attr(x, "path")) : invalid filename argument

throws an error. Does this assume the Andromeda in the arrow branch (which is what I was using)?

ablack3 commented 2 years ago

Does this assume the Andromeda in the arrow branch (which is what I was using)?

For these examples it should not matter. I'm using Andromeda:::.getAndromedaTempFolder() which hasn't changed. In the example below the Andromeda package is not used at all. I'm just trying to get a working prototype implementation.

How about this implementation? Create a new environment whose parent is the empty environment and add it to the Andromeda object. Attach the finalizer to the environment.

newAndromeda <- function(){
  path <- tempfile()
  dir.create(path)
  andr <- structure(list(), 
                    class = "blah",
                    path = path,
                    env = rlang::new_environment(list(path = path)))

  cleanup <- function(env){
    print("running finalizer...")
    r <- unlink(env$path, recursive = TRUE) 
    if(r == 1) warning("Problem with file deletion. Possible file lock.")
  }

  reg.finalizer(attr(andr, "env"), cleanup)
  andr
}
andr <- newAndromeda()
path <- attr(andr, "path")
file.exists(path)
#> [1] TRUE
rm(andr)
gc()
#> [1] "running finalizer..."
#>          used (Mb) gc trigger (Mb) limit (Mb) max used (Mb)
#> Ncells 541237 29.0    1195415 63.9         NA   701951 37.5
#> Vcells 983215  7.6    8388608 64.0      32768  1903618 14.6
file.exists(path)
#> [1] FALSE

Created on 2022-06-30 by the reprex package (v2.0.1)

schuemie commented 2 years ago

Looks great!

ablack3 commented 2 years ago

The arrow branch has been updated and cleanup test restored and passing on github actions runners.

schuemie commented 2 years ago

Could you help me understand how I can get a row count? This code no longer works:

library(dplyr)
a <- Andromeda::andromeda(cars = cars)
a$cars %>%
  count()
# FileSystemDataset (query)
# n: int32
# 
# See $.data for the source Arrow object
ablack3 commented 2 years ago

You have to add collect, pull, or compute at the end of the dplyr query. collect will return a dataframe/tibble. pull(n) will return a number. compute will return an in-memory Arrow table.

library(dplyr)
a <- Andromeda::andromeda(cars = cars)
a$cars %>%
  count() %>% 
  collect()
#> # A tibble: 1 × 1
#>       n
#>   <int>
#> 1    50

a$cars %>%
  count() %>% 
  pull(n)
#> [1] 50

a$cars %>%
  count() %>% 
  compute()
#> Table
#> 1 rows x 1 columns
#> $n <int64>
#> 
#> See $metadata for additional Schema metadata

Created on 2022-07-05 by the reprex package (v2.0.1)

ablack3 commented 2 years ago

The next step is getting the rest of the Hades packages working with the Andromeda arrow branch. As @egillax pointed out earlier in this thread the change to S3 will require changes to how Andromeda is extended in other Hades packages that extend Andromeda.

Analytic packages like CohortMethod have an Andromeda object called cohortMethodData that extends FeatureExtraction's CovariateData object that extends the Andromeda object in this package. This results in wrappers around Andromeda functions like saveAndromeda

# From FeatureExtraction
saveCovariateData <- function(covariateData, file) {
  if (missing(covariateData))
    stop("Must specify covariateData")
  if (missing(file))
    stop("Must specify file")
  if (!inherits(covariateData, "CovariateData"))
    stop("Data not of class CovariateData")

  Andromeda::saveAndromeda(covariateData, file)
}

# From CohortMethod
saveCohortMethodData <- function(cohortMethodData, file) {
  if (missing(cohortMethodData))
    stop("Must specify cohortMethodData")
  if (missing(file))
    stop("Must specify file")
  if (!inherits(cohortMethodData, "CohortMethodData"))
    stop("Data not of class CohortMethodData")

  Andromeda::saveAndromeda(cohortMethodData, file)
  writeLines("To use this CohortMethodData object, you will have to load it from file (using loadCohortMethodData).")
}

Perhaps a generic save function defined in Andromeda could be used for all extensions. If we are going to be extending Andromeda for each analytic method then it probably makes sense for Andromeda to provide a set of generics needed by each analytic package (e.g. save, load) right?

I also want to check that we really need this kind of multi-level inheritance. Andromeda's purpose is to be a list of on-disk dataframes and we typically don't need to extend dataframes. There is a maxim "Composition over inheritance" that might apply here but I'm not sure.

I also have a more concrete question: What is the purpose of adding the package attribute to a class attribute? How does that get used?

  covariateData <- Andromeda::loadAndromeda(file)
  class(covariateData) <- "CovariateData"
  attr(class(covariateData), "package") <- "FeatureExtraction"

Adding my work on FeatureExtraction here: https://github.com/ablack3/FeatureExtraction/tree/arrow

schuemie commented 2 years ago

Hi @ablack3 . Seems we're working in parallel. I've already modified DatabaseConnector to work with the new Andromeda version (see the arrow branch), and am now also working on FeatureExtraction. I've pushed my work so far up to the arrow branch here.

I've modified the inheritance from S4 to S3, and that seems to work fine. Although I agree with your "composition over inheritance", I was being lazy, and since inheritance means I can just use the Andromeda save function to save the new object, I stuck to that. I somehow really like that we can save CovariateData objects into a single file, and I don't see a way to do that with composition.

I have most of FeatureExtraction working, but right now I'm stuck with an inability to remove tables from an Andromeda object. It happens in the tidyCovariates() function, but I can create a much simpler example:

library(dplyr)
a <- Andromeda::andromeda(cars = cars)
x <- a$cars %>%
  count() %>%
  pull()

a$cars <- NULL
# Error in `[[<-` at Andromeda/R/Object.R:231:2:
#   ! Removal of Andromeda dataset cars failed.
# Run `rlang::last_error()` to see where the error occurred.
close(a)

(The error doesn't occur when I don't do an operation like the count().)

FYI: Setting the package attribute of the class was an S4 thing. Without it, S4 classes don't work outside their package. It is not required for S3 classes (and in fact will throw a warning if I remember correctly).

schuemie commented 2 years ago

Digging a bit deeper, it seems any dyplr collect() call locks the arrow file, at least on Windows, preventing deletion. It appears the lock is applied when collect.arrow_dplyr_query calls do_exec_plan() here. And specifically when do_exec_plan() calls ExecPlan$Run() here.

It seems some of this code is changing in the current development version of the arrow package, but since that fails to build in their CI I'm hesitant to test whether that solves this problem.

ablack3 commented 2 years ago

I'll work on the file removal issue on Windows. I made a small change (unlink each file individually) and now it only fails some of the time so maybe that's progress 😄