cmu-delphi / covidcast-indicators

Back end for producing indicators and loading them into the COVIDcast API.
https://cmu-delphi.github.io/delphi-epidata/api/covidcast.html
MIT License
12 stars 17 forks source link

[fb-survey] Archive handling prevents repairing busted aggregation runs #150

Closed krivard closed 4 years ago

krivard commented 4 years ago

I'm currently working on porting the current automation (little-a) scripts for the facebook pipeline to use the new codebase. This involves running the pipeline on a small number of qualtrics files, checking the results, fixing something, then re-running the pipeline with the same settings. "Archive" is necessary for generating the correct CID outputs, but throws everything down the oubliette if you try to repair anything.

Steps to reproduce:

  1. Run the facebook pipeline with archive as one of the output settings
  2. Imagine the results fail validation in some way
  3. Make repairs
  4. Re-run the pipeline

Expected result:

  1. Updated output files

Actual result:

  1. Crash due to there being 0 output rows

The new codebase correctly stores a cumulative record of CIDs in the archive, and correctly anti-joins the input responses against that list. The problem seems to be in what it does after that. There are two discrepancies between the new codebase and the reference codebase here:

The new codebase is substantially faster to run than the reference codebase when just loading 2-3 qualtrics files (8 minutes vs 70), and that's probably why -- unzipping and zipping millions of survey responses takes a while. But this does make it impossible to recover from an aggregation run that succeeded enough to update the CID list in the archive, but didn't succeed enough that the resulting csv files are good.

A couple possibilities:

Thoughts @amartyabasu @capnrefsmmat ?

capnrefsmmat commented 4 years ago

Hmm. A few thoughts:

  1. Storing more than just the most recent data file in the RDS is probably a good idea. For our 7-day smoothed signals, we need to have at least the past 7 days of data available to correctly report them.
  2. If compression is slow, we can actually turn off the compression; apparently readr defaults to that because space is cheap and compression is slow, but Taylor overrode the default to specify bz2 compression, which is not known to be particularly fast.
  3. We could store all data in the RDS and combine it with the new data, but then before aggregating it, filter out all observed before start_date - 7 days. Then the cost would be just loading from the RDS and the initial subset, not the cost of re-aggregating it all.
capnrefsmmat commented 4 years ago

I've implemented this strategy in 08ed60d30461d47556cfe316e4403b37a6ce4ab4.

That is, what we do is

  1. Load the new data, excluding previously seen tokens.
  2. Merge it with the data in the archive.
  3. Produce the outputs using the merged data (over the time window requested in params.json).
  4. Save the new merged data (not just the new data) into the archive.

However, I have merely written the code, not tested it. I'll do a test run on the 3 weeks of real data I have available, by pretending it's the whole time period and running day-by-day.

capnrefsmmat commented 4 years ago

True to my caveat, it doesn't work correctly yet, because I'm incorrectly merging the archived data with the new data.

capnrefsmmat commented 4 years ago

Pushed a new commit that fixes the obvious problem. Basically, the strategy is to save everything in the archive. On each run, we

  1. load the archive
  2. load the specified input data
  3. reject tokens in input data that were present in the archive and remove those rows
  4. combine the two data sets into one. Write this as the new archive
  5. subset to the parts we need for estimation/individual data writing/CIDs and continue

This could have an edge case: if the input data contains a token present in the archive, from a second survey submission that was started before the submission in the archive. Is this a case we need to guard against?

krivard commented 4 years ago

Yes. There are ~40 survey responses each day that have to replace previous days' responses, and they occur in borderline regions (sample size near 100) often enough to cause problems.

This is handled by the archive loading code in the reference codebase but it is extremely slow.

capnrefsmmat commented 4 years ago

I can probably reorder operations to solve this, and avoid terminal slowness by only considering the possibility of backfill over a fixed window instead of over all time.

capnrefsmmat commented 4 years ago

I've implemented a new strategy in 8adc8fba. The archive will contain 14 days of previous data (by default; configurable) plus all tokens ever seen.

Each day, we

  1. load the archive
  2. load the specified input data
  3. merge the archived data and input data, taking the first occurrence of each token (this handles backfill of previously seen tokens)
  4. compare against the previously seen tokens and drop any responses with tokens seen earlier (i.e. any token seen more than 14 days ago)
  5. subset to the parts we need for estimation/individual data writing/CIDs and continue

Steps 3 and 4 mean we can't backfill data more than 7 days old -- we won't have enough data to do the right 7-day average for them -- but that's fine.

Step 3 may be slow but at least is bounded, since the archive will contain at most 14 days of data. Step 4 can become arbitrarily slow, because we must check against all prior tokens, but that is the price we must pay. If this becomes an obstacle, I think the first speedup option would be to use data.table to store the input and archive data with the token as key, so it can do the join efficiently.

krivard commented 4 years ago

Okay, running the full back-catalog of data takes ~22 minutes, so 🔥 nicely done.

However, when I run from 1 April to yesterday to build an archive file, then run what would have run this morning, I get some small discrepancies vs what's coming out of the reference codebase. Eyeball analysis suggests the new codebase is excluding some records that get counted by the reference codebase.

I've put the archive file, the qualtrics files, and the weights files on Box in the delphi-qualtrics folder, in case you wind up with free hours before Wednesday and want to take a look. The column datatype changes should all be available in the fb-package branch now.

capnrefsmmat commented 4 years ago

My guess is that the complete case filter (that requires some minimum number of question to be filled in) has an off-by-one, so we occasionally exclude a very-partially-completed survey that the prior pipeline would have kept. Just a guess based on the diff. I can probably check this later, and if I'm right, add a couple test cases for it.

We'll have to remember to alert the API list in advance that the smoothed signals will be reissued (but hey, they can get the old issues from the API for free!).

But given all the work that went into matching, I'm just happy it's so close...

krivard commented 4 years ago

I fixed a problem where we were prematurely removing low-population ZIPs before aggregations were run; this seems to have fixed maybe 10% of the discrepancies. New discrepancies file here.

The first few pages suggest the remaining issues are largely focused on value and stderr for megacounties, but this does not bear out. A quick analysis on just the raw files, counting differences in excess of 1%:

total megacounties
value 2785 618
stderr 2959 455
sample size 3117 318
value & stderr 2460 432
value & sample size 2316 282
stderr & sample size 2709 295
all 3 2227 272
capnrefsmmat commented 4 years ago

Hmm. Counting differences in excess of 1% means we won't detect a sample size change of 1 for any location, since sample_size >= 100. Since we report to multiple decimal points, you could use a pretty small threshold. Also, if you could exclude smoothed signals from the diff, that'd be great (since we expect those to differ).

Looking at the diffs, I see:

That's suggestive, but I don't know what it suggests. There seem to be much more extensive differences in weighted files than in unweighted files. Not sure what to make of this -- I don't know of a single cause that could affect both weighted and unweighted, but would affect the weighted more.

Also, I pushed a fix that updates msa_list.csv to contain Puerto Rican MSAs, since we had already made that change to covid-19/geographical_scope/ but not propagated it to this indicator. I verified that the zips file matches.

krivard commented 4 years ago

I pulled your fix, then remembered that I'd neglected to re-generate the archive file after fixing the low-population ZIPs removal. Re-ran, then re-ran the analysis using a 0.1% (0.001 proportional) difference threshold and added a callout for weighted/unweighted signals.

Changes total megacounties weighted unweighted
Any column 4513 828 4059 454
value 3814 806 3360 454
stderr 4479 804 4048 431
sample size 4051 374 4051 0
value & stderr 3783 782 3352 431
value & sample size 3356 352 3356 0
stderr & sample size 4040 373 4040 0

This almost certainly points to a problem with how we're handling survey weights.

krivard commented 4 years ago

Okay, I put together a test environment with a shorter round-trip running time to see if we can speed this process up a bit. It includes two qualtrics-like files and their weights, with the intention that you run with one file to generate the archive, then run with the second to test loading it. You need Box access to download it: 19-aug-comparison-tests.tgz

I wound up backing off to just running each pipeline while loading both files and no archive handling. Here's the diff table for that:

Changes total megacounties weighted unweighted
Any column 2034 832 0 2034
Column 2 2015 826 0 2015
Column 3 2022 822 0 2022
Column 4 1612 410 0 1612
Columns 2 & 3 2003 816 0 2003
Columns 2 & 4 1593 404 0 1593
Columns 3 & 4 1610 410 0 1610
Columns 2 & 3 & 4 1591 404 0 1591

So we currently have no problems with weighted figures after all. My current hypothesis is that the additional diffs from before are due to archive drift: we've been fixing small bugs for 8 weeks but not regenerating the archive files each time to avoid clobbering history. That was a concern before we activated data versioning in the API, but is less of a big deal now that past issues can be retrieved from the API at will. If this hypothesis is correct, it means that even once we get both pipelines to generate the same output for the same data, we'll still have a discontinuity when we shift to the new pipeline. We'll want to reissue all data going back to 6 April at that time, and be very clear with users about the changeover.

In the meantime, I'd prefer to track down the diffs above, since those did occur while running both pipelines on the same data with the same settings. In the process of debugging the above, I wound up adding a thing to both pipelines that outputs which ResponseIDs went into each geo on each day, just to check set membership. The branch for that is here[1]. There are no differences in the response sets included in each bucket between the old and new codebases.

[1] the branch notably does not include the normalization updates. Adding those in does not change the diff set, which I do not understand.

capnrefsmmat commented 4 years ago

I think your point [1] makes sense -- the estimation code repeatedly rescales weights to sum to 1, so I think the normalization by day should not matter, at least in the new pipeline. But maybe I'm wrong. If we're looking for a difference between weighted and unweighted estimates, normalization by day is one:

weights_cid = function (rows, wt=cid.weights, to=LAST_WEIGHTS) {
  rows %>>%
    dplyr::inner_join(wt, by=c("token"="cid")) %>>%
    ("after joining to cid weights" ? nrow(.)) %>>%
    dplyr::filter(Date <= to) %>>%
    dplyr::group_by(Date) %>>%
    dplyr::mutate(weight=weight/sum(weight)) %>>%
    dplyr::ungroup()
}

weights_1 = function (rows) {
  rows %>>%
    dplyr::mutate(weight=1)
}

Note that weights_cid rescales every day's weights to sum to 1, but weights_1 does not.

I don't see how this would change the estimates, because of the repeated rescaling, but maybe I'm looking in the wrong spot in the old pipeline? prepare_NOSHARE_WITHSMALL_aggregation in aggregations-setup.R would seem to be unaffected by this, but maybe something else is?

Maybe instead of writing out all geo IDs used to calculate a response, we should write out the entire weight vector, so we can compare those between the two pipelines and identify the differences...

krivard commented 4 years ago

Something definitely looks fishy with the weight vectors. The architecture is different so it's hard to tell if I inserted the writes in the wrong place, but while the reference codebase generates two unique weights per ResponseID-geo (presumably for with and without the participation weights), the new codebase generates substantially more than that:

$ head test/weights_per_assignment.txt reference/weights_per_assignment.txt 
==> test/weights_per_assignment.txt <==
     16 [redacted-1] 10900
     16 [redacted-1] 346
     16 [redacted-1] 42095
     16 [redacted-1] pa
     32 [redacted-2] 40060
     31 [redacted-2] 431
     30 [redacted-2] 51087
     32 [redacted-2] va
     16 [redacted-3] 430
     16 [redacted-3] 47260

==> reference/weights_per_assignment.txt <==
      2 [redacted-1] 10900
      2 [redacted-1] 346
      2 [redacted-1] 42095
      2 [redacted-1] pa
      2 [redacted-2] 40060
      2 [redacted-2] 431
      1 [redacted-2] 51087
      2 [redacted-2] va
      1 [redacted-3] 430
      1 [redacted-3] 47260

here's what I added to aggregate.R:

    195         mixed_weights <- mix_weights(ind_df[[var_weight]] * ind_df$weight_in_location,
    196                                      s_mix_coef, params$s_weight)
    197 
    198         write_csv(tibble(ResponseId=ind_df$ResponseId,geo=target_geo,weight=mixed_weights),"weight_vectors.csv",append=TRUE)
    199 
    200         sample_size <- sum(ind_df$weight_in_location)
    201 
    202         new_row <- compute_fn(
    203           response = ind_df[[metric]],
    204           weight = mixed_weights,
    205           sample_size = sample_size)

(beware you have to disable parallel and it takes 30 minutes)

capnrefsmmat commented 4 years ago

How was weights_per_assignment.txt generated?

Note that aggregate.R loops over all indicators, so you're going to get a CSV output separately for raw_cli, raw_ili, raw_wcli, ... The old pipeline does all estimation in one big pass.

krivard commented 4 years ago

Cleaning/normalizing/deduplicating:

reference/weight_vectors.csv: reference
        sed 's/,\([a-z]*\)\([0-9]*\),/,\1,\2,/' ${REF}/facebook/prepare-extracts/weight_vectors.csv | \
        sed 's/,statefips,/,state,/;s/,fips,/,county,/;s/,,/,msa,/;' | \
        sed -f states.sed | \
        sed 's/,0*\([1-9]\)/,\1/g' | \
        awk 'BEGIN{FS=OFS=","} {print $$1,$$3,sprintf("%0.9f",$$4)}' | \
        LC_ALL=C sort -u >$@

weight_vectors: test reference/weight_vectors.csv
        sed '/,usa,/d' weight_vectors.csv | \
        sed 's/,0*\([1-9]\)/,\1/g' | \
        awk 'BEGIN{FS=OFS=","}{print $$1,$$2,sprintf("%0.9f",$$3)}' |
        LC_ALL=C sort -u >test/weight_vectors.csv

Followed by eg

$ awk -F, '{print $1,$2}' reference/weight_vectors.csv | uniq -c > reference/weights_per_assignment.txt
krivard commented 4 years ago

Hitting each pair multiple times is fine, but all the weighted variants should be using the same weight vector, no?

Oh sorry, this would probably help:

$ diff -y test/weight_vectors.csv reference/weight_vectors.csv  | head
[redacted-1],10900,0.001028191                           <
[redacted-1],10900,0.001244623                           <
[redacted-1],10900,0.001729508                           <
[redacted-1],10900,0.002521956                           <
[redacted-1],10900,0.004911434                             [redacted-1],10900,0.004911434
[redacted-1],10900,0.005632392                             [redacted-1],10900,0.005632392
[redacted-1],10900,0.000759465                           <
[redacted-1],10900,0.000770435                           <
[redacted-1],10900,0.000836708                           <
[redacted-1],10900,0.000858530                           <
capnrefsmmat commented 4 years ago

Yes, there should be the same weight vectors. raw_cli should have weight vectors that are just WeightInLocation (rescaled); raw_wcli should have weight vectors that depend on both the weights and WeightInLocation.

If the difference is only in weighted aggregations, and not unweighted, that suggests a problem in how we join weights. For example, the old pipeline does

weights_cid = function (rows, wt=cid.weights, to=LAST_WEIGHTS) {
  rows %>>%
    dplyr::inner_join(wt, by=c("token"="cid")) %>>%
    ("after joining to cid weights" ? nrow(.)) %>>%
    dplyr::filter(Date <= to) %>>%   # <= note this filter
    dplyr::group_by(Date) %>>%
    dplyr::mutate(weight=weight/sum(weight)) %>>%
    dplyr::ungroup()
}

The new pipeline does not do any filtering; join_weights in weights.R loads all the weights. That means that a weight from August 4 can be matched to a survey from the future. But that shouldn't be a problem; the new pipeline does remove duplicated tokens (in merge_responses in responses.R), so the original submission of that token would be the only one to survive. Unless there's a weird corner case.

Not sure what else could be going on. Something is up with how we join and process the weights, apparently resulting in more responses being included with weights than before.

krivard commented 4 years ago

If the difference is only in weighted aggregations, and not unweighted

It's actually the other way around (weighted is fine; unweighted is broken) which is ... pretty spooky.

That means that a weight from August 4 can be matched to a survey from the future.

Subtle distinction: we don't care if an individual weight gets matched to a survey from the future, but we want to make sure we're not publishing weighted figures before the weights for that day have been published. This is generally not possible with the raw figures (though the filter there makes doubly sure of it), but the smoothed figures will impute data beyond the weights horizon -- there's another filter at write time I believe to make that happen.

capnrefsmmat commented 4 years ago

Oh right, I keep getting turned around. Given that uniform weights are a special case of weights in general... I have no hypotheses to explain this. Unless it's what I noted above, that the old pipeline normalizes weights by day but does not normalize uniform weights by day. That should not matter, but stranger things have happened.

My other hypotheses have been things like "responses with missing weights are being dropped even from the uniform weights case", which I don't think is true. I can't think of many other ways that unweighted estimates should be different.

krivard commented 4 years ago

I annotated the weights vector file with the sensor name, and the additional weights all come from smoothed signals, which makes sense. Smoothed signals are excluded from the table, so whatever it is, it's not the weight vectors.

krivard commented 4 years ago

I annotated the weights vector file with the metric value, and am now only printing for the raw_cli indicator, and eyeballing the diff I found at least one instance where the percentage was wrong (!!; 50 instead of 0) and a couple places where the weight vector was slightly off. I'm re-running the reference copy to drop the wcli lines to make a more automated analysis easier.

capnrefsmmat commented 4 years ago

By "metric value" you mean PercentCLI in the old pipeline and new_row$val in the new pipeline?

Note that filter_WITHSMALL applies the Jeffreys correction to the entire data frame, and in the new pipeline, summarize_indicators_day calls post_fn near the end to do the same thing. Probably best to ensure your output is consistent between the two. And if everything matches before and not after the Jeffreys correction, then we have a hint.

krivard commented 4 years ago

Found it -- we were missing an if/elsif/else block in the mixing weights, which was only relevant for regions with exactly 100 or 101 samples.

Next for me:

krivard commented 4 years ago

Current state: raw_cli matches; raw community signals do not.

New test environment up in Box; corresponding branch is 24-aug-comparison-tests; in the old codebase the run script is here and corresponding branch here.

@brookslogan, can you assist?

krivard commented 4 years ago

Remaining fixes resolved: