Closed bsweger closed 6 months ago
Depending on where we land with this, we might need a related task for ensuring that hubUtils
will work with the new organization.
Capturing some notes from Luke Mullany about common use cases:
I think it is in fact hard to say what the most common use cases might be.. However, I am thinking perhaps the submission date is first, and then the targets, and then the modeling team last. I think I would suspect getting an entire set of data for a particular submission date (i.e. all models and all targets) would be priority for many analyses. .. The next filter would probable by on the specific type of target.. I guess I suggest the modeling team last, because although modeling team-specific analyses are certainly important, in the context of "hub", the whole purpose is to leverage/compare/contrast multiple models and their forecasts Note all the above depends critically on all model submissions within a given "round" having identical "submission_dates".. If not, replace the above with some logic that replaces "submission_date" with something that does identify the "round"..
for many repos, partitioning on more than round may be overkill, and lead to a huge number of small files. for example, on my local machine, I just now cloned the flusight 1 repo, read in all 4,026 submissions (i.e. across numerous flu seasons and models), combined into a single data.table, and creating a hive-partitioned dataset using arrow.. (with submission date and target) now this would create >3000 files, many of which are quite small where total amount of metadata is starting to dominate relative to actual data of interest!. Granted, this is true partly because the submission dates in flusight are not sufficiently specific to round (see my other comment). if i recreate the partition only on submission_date or only on target, obviously the number of files and the total size of the combined partitioned dataset will be much smaller.
One level of partition indeed sounds about right. And definitely much easier to implement in a generalisable fashion than trying to add further partitions. Would be interesting to benchmark different queries on the two hubs (one standard partitioned and the one you repartitioned on submission date).
Okay, prefacing with the fact that the flusight repo is very small - only 29,600,000 rows, here are some initial benchmarks on the following task:
open_dataset()
filter(model == "Delphi-Stat")
group_by(submission_date)
or group_by(target)
summarize(meanvalue = mean(value, na.rm=T))
collect()
We compare three partitioning options
targ
in benchmark table)sdate
in benchmark table)both
in benchmark table)target
:
cmds1 = alist(
targ = open_dataset("flusight1_targ/") %>%
filter(model=="Delphi-Stat") %>%
group_by(target) %>%
summarize(meanvalue = mean(value, na.rm=T)) %>%
collect(),
sdate = open_dataset("flusight1_date/") %>%
filter(model=="Delphi-Stat") %>%
group_by(target) %>%
summarize(meanvalue = mean(value, na.rm=T)) %>%
collect(),
both = open_dataset("flusight1_date_targ/") %>%
filter(model=="Delphi-Stat") %>%
group_by(target) %>%
summarize(meanvalue = mean(value, na.rm=T)) %>%
collect()
)
Unit: milliseconds expr min lq mean median uq max neval targ 64.7840 82.7043 198.1586 91.2229 100.0729 674.7837 10 sdate 133.0033 136.7588 139.9850 140.2119 142.3588 147.1574 10 both 1155.5496 1199.8241 1261.3381 1216.4838 1283.8325 1569.8637 10
2. When grouping variable is `submission_date`:
```r
cmds2 = alist(
targ = open_dataset("flusight1_targ/") %>%
filter(model=="Delphi-Stat") %>%
group_by(submission_date) %>%
summarize(meanvalue = mean(value, na.rm=T)) %>%
collect(),
sdate = open_dataset("flusight1_date/") %>%
filter(model=="Delphi-Stat") %>%
group_by(submission_date) %>%
summarize(meanvalue = mean(value, na.rm=T)) %>%
collect(),
both = open_dataset("flusight1_date_targ/") %>%
filter(model=="Delphi-Stat") %>%
group_by(submission_date) %>%
summarize(meanvalue = mean(value, na.rm=T)) %>%
collect()
)
Unit: milliseconds
expr min lq mean median uq max neval
targ 72.2872 83.7387 222.6143 88.8084 103.2500 1037.7222 10
sdate 126.4035 135.8261 178.6519 139.8237 147.7848 535.2068 10
both 1191.7232 1204.7832 1233.7476 1237.5590 1251.3735 1290.6913 10
So, in this admittedly very limited example, its clear that the doubly partitioned dataset (partitioned on both target and submission date) causes some slow down, and that the smaller of the two single partitions (i.e. target) is faster, even when the grouping variable is mismatched (i.e. when grouping by submission_date, the smaller partition was still faster)
Nice!
Would be interesting to also try a query that collects forecasts for say the last n
rounds for a given output type.
I believe that might be quite a common query for getting data to pass on to ensemble methods.
yeah, that might be common; probably a good implementation is via identifying the last n round indicator, and then joining.. something like this:
get_last_n_rounds <- function(dataset,target_type, n=5) {
inner_join(
open_dataset(dataset) %>% filter(target == target_type),
open_dataset(dataset) %>% distinct(submission_date) %>% arrange(desc(submission_date)) %>% slice_head(n=n),
by="submission_date"
)
}
cmds4=alist(
targ = get_last_n_rounds("flusight1_targ/", target="Season onset") %>% collect(),
sdate = get_last_n_rounds("flusight1_date/", target="Season_onset") %>% collect(),
both = get_last_n_rounds("flusight1_date_targ/", target="season_onset") %>% collect()
)
microbenchmark(list=cmds4, times=10)
Unit: milliseconds
expr min lq mean median uq max neval
targ 469.5469 491.2216 528.6626 498.8077 536.0588 746.3325 10
sdate 490.5871 501.6316 543.4063 515.6881 554.7503 728.3711 10
both 1859.2467 1893.0724 2014.3022 1980.3282 2027.8139 2365.7026 10
but you all may have different ideas.. arrow as far as I know does not support a number of alternative ways you might consider doing this (i.e. via n()
, row_number()
, grouped slicing (e.g. slice_head(n=<n>, by=..)
, etc, etc.
Again note that I'm using "submission_date" as a stand in for the concept of "round", but this is for illustration only, and I really mean the latter (or whatever the current hubverse parlance is for a single "round")
Actually, I think that partitioning by "round" (above "submission date"), might be the best choice, but only for retrospective storage
If the storage is being accumulated over time, then I think it make sense to have a separate (possibly partitioned) dataset for each round. That is, the data are accumulated for a round, and that round contstitutes a dataset, that could be partitioned by target. Then the next round is accumulated sometime later, and that round constitutes a dataset, which again could be partitioned by target, etc, etc.
And it seems we wouldn't want to have a different approach to storage partitioning for retrospective and prospective collection of data, right? Meaning, that generating a single hive-partiioned multi-file dataset for an entire repo, partitioned by round (aka the above examples in the benchmarking threads), makes little sense.
One thought on this is that for the scenario hub examples, there are few rounds and each one has a lot of data. For a typical the forecast hub (at least so far) there are a lot of rounds, and relative to the entire project, each round would have relatively little data.
I agree that it probably makes sense to standardize storage and partitioning structure for all hubs in the cloud.
I don't have a clear sense of the trade-offs for having multiple datasets for a hub (one per round) vs. trying to partition on round. @annakrystalli was saying something about how she thought that "appending" a round to an existing dataset could be an expensive procedure computationally. (The entire partition would have to be rebuilt?)
From the end user perspective, I think we pretty definitely want to keep the ability to issue one "query" or "collect statement" to gather data from multiple rounds at once.
Yes @nickreich - its that issue of appending (i.e. one can't really append, but rather needs to load-append-re-partition) that led me to the comments above re separate datasets per round..
re the tradeoffs, would be good to do some testing with an actual S3 bucket, and actual large size repo.. If there were substantial trade offs (i.e. with a single round-partitioned dataset being optimal), then one might rethink our consensus that one single approach is best. That is, we might consider a different more optimzed storage structure for "archived/ no-longer-active" hubs vs "currently active" hubs
I'm not super familiar with the ins and outs of arrow-style datasets, but if we did have separate datasets per round, would that kind of set-up be compatible with a single "collect-style" argument to load data from multiple rounds? If not, what would the right strategy be to support single-argument-style queries on the data? E.g. would we need build a user-facing function that would wrap a few collect calls for different rounds?
I'll try to unpick a little bit the limitations I think we are facing and how I think it's best to proceed.
While arrow::write_dataset()
is a great tool for partitioning a dataset, especially complex partitions, the inability to use it to append to an already partitioned dataset makes it costly to use for regular updates as it would effectively require reading and re-writing the entire hub dataset to S3 every time a new file is added (or perhaps on a less frequent schedule).
Having said that, @bsweger & Matt pointed out that we could do away with write_dataset()
and re-organise the data ourselves. How easy and how much investment we'll need to make to develop the code to do that will depend on how complex we want the repartitioning to be.
Given each file submitted to a hub by a given team relates to a single round and includes the round_id
as the first element of the file name, it would be relatively straight forward to simply place it in a round ID folder on S3 instead of a team folder without any change to the file itself actually we would need to add the model_id
to the file. This would be pretty easy to do.
Re-partitioning across targets, however, would require that individual submission files be read, split across multiple files according to target and then, most likely appended to existing files on S3 to avoid splitting up into too many tiny files. Ensuring this is accomplished robustly obviously will require more work. More complex partitioning == more work etc.
My suggestion moving forward would be to:
This should then give us a better idea of whether it's worth the effort to develop additional tools to support more complex regular repartitioning.
It might also be good to get a feeling for the actual cost and limits (e.g. do GitHub Actions have a limit of how much data they can process by a free runner?) of re-partitioning the hub using write_dataset()
as it grows.
Also +1 on the suggestion to actually start benchmarking over S3 too.
@lmullany could you point me to the repo you have been using for your benchmarking experiments so far?
actually, i've been thinking about this a bit more, and perhaps there is a way that we can "append" to a single dataset, without reloading the prior data, appending the new, and rewriting the updated datastet.
@annakrystalli , here is the repo I've been working with: https://github.com/cdcepi/FluSight-forecasts
Below is screenshot of five seasons - for purposes of playing around/testing, I just read in all datasets with these five seasons
Let's say we have an existing single dataset containing the entire hub's data collected thus far partitioned by round (here round is a string date, but it could be anything (i.e. "R1", "R2" etc, or "1", "2", "3", etc..) [note, this is just some data from Scenario Modeling Hub (not the FluSight repo mentioned throughout the benchmarking in this thread]
It might look something like this, with say 10 completed rounds (in this case, labelled by date string from 2021-05-02 to 2022-07-19
if we run
open_dataset("active_hub") %>% glimpse()
, we set that there are 9 million rows across 10 parquet files:
Now, lets say we have some new data (new_data
) from the next round, lets say "2022-10-19", and we want to add/append this to the same arrow multi-file dataset.
We can simply do:
# write the new data to the same dataset
write_dataset(new_data, "active_hub", partitioning = "round")
and now out dataset looks like this:
and similarly if we again run
open_dataset("active_hub") %>% glimpse()
we get
The single dataset has been properly appended, without re-reading or re-partitioning the existing data
Right, but most hubs are organised under model IDs and we would want to repartition them across round IDs.
sure, but within a round, we only have to row bind all the submissions from a single round, and then append that single new round to an arrow dataset.. I guess what I'm saying is that the process seems trivial regardless of whether it is an active or closed/complete hub
perhaps I'm missing something here. We could schedule a conf call to discuss further, sometime this week?
It seems like this may have already been settled -- but responding to Anna's comment above alluding to a possibility of partitioning by target, wanted to note that although every hub has a concept of a "target", we don't impose any requirement that model outputs record a field called target
(e.g., if the hub only collects forecasts for one target, dropping that into the model output files would be superfluous). So if we're aiming to build a "one solution for all hubs" kind of answer here, we likely don't want to hard code target
as a part of that.
Thanks @elray1 !
Ideally nothing is hardcoded and any re-partitioning is configurable by admins. I was just referring to the examples we had been discussing above, with target being one example given.
@lmullany , myself and @bsweger actually have a call set up for tomorrow! I was going to suggest you should join us if you can!
Linking to notes from the chat the @lmullany and @annakrystalli and I had this morning (Anna and Luke, please correect anything I got wrong): https://docs.google.com/document/d/1NgEZ1i_mXukoOMrgPWFKWm8xZKdFs6G1sWxeOIfvmT4/
Our small working group is coming at this problem from different angles, which is very useful (imo)
A high-level question that emerged is: what are hoping to gain by re-organizing/munging model-output data in the cloud? What would make the additional dev and maintenance of moving parts worthwhile?
A few comments:
given this discussion this morning, I agree that simply mirroring the structure, but writing to parquet, might be pretty high on the list on candidate approaches.
if we are going to index a db, or partition for some optimization, round-driven queries (both of @elray1 examples) I think are very common (whether that's for an analysis/viz of a single round, or a set of recent rounds, etc).
Just adding the link to the initial benchmarking here too: https://partition-benchmarking.netlify.app/
To add on to @elray1 's comments above about common queries. I think a common secondary analysis of forecast data involves pulling down all forecasts of one target, for all models, for perhaps all or a subset of rounds. E.g. for an analysis of accuracy of forecasts of deaths, I just want "k step ahead incident hospitalizations" (for all possible values of k) from the past 20 weeks. Because this is a secondary analysis and not some interactive web app, "instantaneous" speed is not critical.
Another comment here is that another partition that we could consider would be by output_type. One advantage here is that it might move us more towards a zoltar-style "one-table-per-output-type" kind of model which could have advantages later and help us get around some of the downstream data-type challenges that we have when output_type_id can take both character and string values (e.g., here).
Just to note that unfortunately I don't think splitting across output types would solve the data type issue if accessing the data as an arrow dataset as the dataset as a whole uses a single schema across all files. If there are data type differences across files on the same column you need to explicitly define a single data type to be used for those columns or opening the dataset fails.
Having said that, if you know you are only interested in a single output type, perhaps you could just connect to the output type sub directory (rather then the whole model output dir) and set the required data type for the output type Id column.
Closing this one because we've verified that writing the model-output files as parquet (and added columns for round_id, team, and model) works with the existing hubData
tools.
At this point, we need more real-world usage patterns before deciding to re-organize the data. We had talked about potentially using Zoltar query history as a data point (and thanks @matthewcornell for volunteering to help with that!), but let's wait until we get some hubverse-centric feedback about this.
In current hubverse repos, the model-output data is organized like:
The cloud work done to date mirrors this structure in AWS S3 buckets.
Our current hypothesis is that this file organization isn't aligned with how most people what to query the data. For example, when
hubUtils
connects to an S3 bucket and uses Arrow to present information, the performance of that operation depends on how well the query maps to how the data is physically stored.Thus, the decision for how we want to organize the data once it's in the cloud is important.