ooni / data

OONI Data CLI and Pipeline v5
https://docs.ooni.org/data
8 stars 4 forks source link

Very Wide Observation Rows + Experiment generation #17

Closed hellais closed 1 year ago

hellais commented 1 year ago

As part of this PR I did a major rework of the observation generation system.

tl;dr

If you care to read more details, see below:

Very Wide Observation Rows

Each Web Connectivity measurements ends up producing observations that are all of the same type and are written to the same DB table.

This has the benefit that we don't need to lookup the observations we care about in several disparate tables, but can do it all in the same one, which is incredibly fast.

A side effect is that we end up with tables are can be a bit sparse (several columns are NULL), but this doesn't seem to present major difficulties.

The biggest challenge in this approach is figuring out which observations are related to each other so that they can be packed into the same row. In order to do this I kept the original observation model in place, which gave me guarantees that the data structures were properly filled out, and then for each of them I tried to lookup the relevant other ones.

Any observation that doesn't have a friend, just ends up on its own database row all alone.

WAR Body writer

I worked on separating the process of archiving bodies and finding blocking fingerprints in it. Basically during the processing we create a WAR file with inside it the raw bodies and write to a dedicated database (or potentially the same in it's own table, but I need to find how to get that to perform well).

We are then able to separately scan through all these WAR files hunting for blockpage fingerprints, which is actually pretty fast. If we add new blockpage fingerprints we can just re-scan the WAR files looking for them and update the database column with what we found.

Misc performance improvements

It turns out clickhouse is not too happy when you do many writes per second to it. In their docs they state you shouldn't be making more than 1 request per second (https://clickhouse.com/docs/en/about-us/performance/#performance-when-inserting-data).

I encountered this issue when I had optimised the processor to the point that I was hitting this limit. The result is that the clickhouse process starts consuming a bunch of CPU and memory and eventually just stops dropping any connection attempt to it.

To overcome this the ClickhouseConnection database abstraction I added the concept of a row buffer, which waits to become full with a certain number of rows before flushing it to the clickhouse connection. This worked surprisingly well and improved the overall performance of the reprocessing task by 1 order of magnitude.

Quite a bit of additional changes were made to how multiprocessing is done and small tweaks here and there based on iterations.

Experiment result generation

I have added support in here for generating experiment results from the Very Wide Observation Rows. Basically we process data in batches of 1 day. For each day we first generate a ground truth database which tells us what we should expect to see by looking at all other web connectivity control measurement, but in the future maybe from other measurements too. The process of generating the ground truths is actually pretty expensive (it used to be the most expensive task) and takes about 80-90 seconds for a given day.

We then need to efficiently lookup the ground truths that are related to a specific measurement so that we can correlate them to what we are seeing in the data.

In the beginning I went for the most naive solution of just putting it all in a list and then doing a full scan of it for the relevant ground truths. As the ground truths for a given day can be in the order of the 100s of thousands, this obviously turned out to be incredibly expensive.

I briefly experimented with creating some hash maps onto the data, so that these lookups would be faster, but quickly realised I needed multiple indexes and I was basically re-inventing a database. I obviously could not use clickhouse for this purpose because doing many per second there is not what it's made for.

I then realised that I actually already had a database right inside of the standard library of python: SQLite!

So I quickly put together an in-memory groundtruth database to put all the ground truths and then do the lookup.

This made things significantly faster.

Yet this was not enough, because when you are processing a measurement, you don't actually care to look at all the ground truths for the full day, but only those which are for that specific measurement. It's pretty easy to figure out which is the subset of all ground truths you care about, so I implemented a system that does some pre-filtering and reduction of the ground truths for the full day into only those that are related to a particular measurement. Note: this part of the code was put together very quickly and is currently a bit racy and not so nice to look at, so it needs some refactoring (the goal was just to see if it would work at all).

After this last improvement, the performance went up by 1 order of magnitude.

All in all I'm glad to see that it's starting to come together and it offers the prospect of being a much more efficient and iterative way of doing analysis on OONI data.

The current state of things that Experiment Result generation is happening at a rate of 20k results per seconds that are mostly bottlenecked by the database writes.

Some significant amount of work needs to happen on validating the data outputs so that we can check if the analysis logic is good (I didn't spend much time working on this after the big ground truth refactor, so it likely has some bugs).

It's nice that the results are explainable and you can easily figure out which part of the analysis code generated a particular outcome through the blocking_meta key.

Here is an example screenshot from a query on the experiment results table: Screenshot 2022-11-25 at 20 25 07

Next steps

~We need to now start running actual queries to extract observations from the database and use them to generate experiment results. For the moment this is only implemented using observations generated directly from a raw measurement.~ This is now done 🥳

A bit more work also should be invested into figuring out how to achieve better performance for the process that generates the WAR bodies. At the moment the pipeline is very fast at processing measurements (according to some rough estimates it should be able to process the full dataset in just under 3 days on a single machine), but then it gets stuck at the end dealing with the backlog of all bodies that it still needs to write to disk.

The challenge in getting this to work is that I would ideally want to not write the same body twice in the WAR archives, so I need something (which is now a single dedicated thread) which handles all the writes to the database and the archiving to disk. Doing that though is currently done with some IPC gymnastics and a very simple on-disk queue of bodies, but it's still too slow because we are dumping the bodies to a compressed msgpack file and re-reading it from the consumer process.

One thing I would like to try is seeing if it's possible to just do this in the worker processes directly (so we don't have to do this intermediate dump and parse step), but issue with that is on the one hand we can't write too quickly to the database (otherwise it get's upset, see above point), yet we also can't buffer our writes, because otherwise we don't know if a body has already been archived.

I could maybe just drop this requirement and live with the fact that some duplication of bodies will occur, but in some tests I have run, by de-duplicating bodies we can end up storing 5x less bodies, so it's probably worth it in the long run.

codecov-commenter commented 1 year ago

Codecov Report

Base: 70.40% // Head: 77.58% // Increases project coverage by +7.18% :tada:

Coverage data is based on head (d705d2d) compared to base (d8341c9). Patch coverage: 73.99% of modified lines in pull request are covered.

Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #17 +/- ## ========================================== + Coverage 70.40% 77.58% +7.18% ========================================== Files 29 34 +5 Lines 3261 4297 +1036 ========================================== + Hits 2296 3334 +1038 + Misses 965 963 -2 ``` | [Impacted Files](https://codecov.io/gh/ooni/data/pull/17?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni) | Coverage Δ | | |---|---|---| | [oonidata/db/speed\_estimate.py](https://codecov.io/gh/ooni/data/pull/17/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni#diff-b29uaWRhdGEvZGIvc3BlZWRfZXN0aW1hdGUucHk=) | `0.00% <0.00%> (ø)` | | | [oonidata/datautils.py](https://codecov.io/gh/ooni/data/pull/17/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni#diff-b29uaWRhdGEvZGF0YXV0aWxzLnB5) | `72.62% <50.00%> (-11.81%)` | :arrow_down: | | [oonidata/experiments/websites.py](https://codecov.io/gh/ooni/data/pull/17/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni#diff-b29uaWRhdGEvZXhwZXJpbWVudHMvd2Vic2l0ZXMucHk=) | `55.19% <54.54%> (+33.35%)` | :arrow_up: | | [oonidata/processing.py](https://codecov.io/gh/ooni/data/pull/17/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni#diff-b29uaWRhdGEvcHJvY2Vzc2luZy5weQ==) | `63.66% <63.14%> (+14.50%)` | :arrow_up: | | [oonidata/fingerprintdb.py](https://codecov.io/gh/ooni/data/pull/17/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni#diff-b29uaWRhdGEvZmluZ2VycHJpbnRkYi5weQ==) | `76.34% <63.63%> (-0.43%)` | :arrow_down: | | [tests/test\_ctrl.py](https://codecov.io/gh/ooni/data/pull/17/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni#diff-dGVzdHMvdGVzdF9jdHJsLnB5) | `65.27% <65.27%> (ø)` | | | [oonidata/db/connections.py](https://codecov.io/gh/ooni/data/pull/17/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni#diff-b29uaWRhdGEvZGIvY29ubmVjdGlvbnMucHk=) | `70.00% <74.41%> (+3.33%)` | :arrow_up: | | [oonidata/experiments/signal.py](https://codecov.io/gh/ooni/data/pull/17/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni#diff-b29uaWRhdGEvZXhwZXJpbWVudHMvc2lnbmFsLnB5) | `77.58% <80.00%> (-3.58%)` | :arrow_down: | | [oonidata/netinfo.py](https://codecov.io/gh/ooni/data/pull/17/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni#diff-b29uaWRhdGEvbmV0aW5mby5weQ==) | `78.91% <80.00%> (+0.14%)` | :arrow_up: | | [oonidata/cli/command.py](https://codecov.io/gh/ooni/data/pull/17/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni#diff-b29uaWRhdGEvY2xpL2NvbW1hbmQucHk=) | `88.03% <81.53%> (-7.48%)` | :arrow_down: | | ... and [27 more](https://codecov.io/gh/ooni/data/pull/17/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni) | | Help us with your feedback. Take ten seconds to tell us [how you rate us](https://about.codecov.io/nps?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni). Have a feature suggestion? [Share it here.](https://app.codecov.io/gh/feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni)

:umbrella: View full report at Codecov.
:loudspeaker: Do you have feedback about the report comment? Let us know in this issue.

hellais commented 1 year ago

I have now gotten the experiment generation to work based on the observation tables. After a lot of debugging and performance tweaks I managed to get it to generate web experiment results from observation tables at ~20k measurements per seconds.

Screenshot 2022-11-25 at 20 10 34

Based on these stats, it should in theory be possible to reprocess 1 B measurements (the current dataset size) in approximately half a day.

The biggest bottleneck I am having at the moment is actually writing the results of the experiments into the database. I think some more tweaking of the write buffers could be in order to find an optimal value where we can maximise the writes per seconds (I'm currently flushing to the database every 100k, but I have seen big changes in the performance by tweaking this value to something different).

Screenshot 2022-11-25 at 20 12 04

I think I'm going to land this into my integration branch, work on cleaning and documenting a bit the codebase and then landing it in main! 🥳

hellais commented 1 year ago

Ok I give up debugging the code coverage for today.

FWIW locally I see 84% code coverage, but then in the CI it's only 73% :(

Name                                        Stmts   Miss  Cover
---------------------------------------------------------------
bench.py                                       17     17     0%
oonidata/__init__.py                            5      2    60%
oonidata/__main__.py                            3      3     0%
oonidata/apiclient.py                          19     14    26%
oonidata/cli/__init__.py                        1      0   100%
oonidata/cli/command.py                       117     14    88%
oonidata/compat.py                             17      1    94%
oonidata/dataclient.py                        360     67    81%
oonidata/dataformat.py                        416     21    95%
oonidata/datautils.py                         179     38    79%
oonidata/db/__init__.py                         0      0   100%
oonidata/db/connections.py                     70      9    87%
oonidata/db/create_tables.py                   64     10    84%
oonidata/db/speed_estimate.py                  25     25     0%
oonidata/experiments/__init__.py                0      0   100%
oonidata/experiments/control.py               183      2    99%
oonidata/experiments/experiment_result.py      71      7    90%
oonidata/experiments/signal.py                 58     13    78%
oonidata/experiments/websites.py              279     62    78%
oonidata/fingerprintdb.py                      93     23    75%
oonidata/netinfo.py                           147     31    79%
oonidata/normalize.py                         331    143    57%
oonidata/observations.py                      713     74    90%
oonidata/processing.py                        556     49    91%
tests/__init__.py                               0      0   100%
tests/conftest.py                              45      5    89%
tests/sample_measurement.py                    42     42     0%
tests/test_cli.py                              34      2    94%
tests/test_ctrl.py                             72     25    65%
tests/test_dataclient.py                       70      0   100%
tests/test_dataformat.py                       41      0   100%
tests/test_db.py                               18      0   100%
tests/test_experiments.py                      90      0   100%
tests/test_fingerprints.py                     17      0   100%
tests/test_netinfo.py                          21      0   100%
tests/test_normalize.py                        10      0   100%
tests/test_observations.py                     56      0   100%
tests/test_processing.py                       74      2    97%
---------------------------------------------------------------
TOTAL                                        4314    701    84%
hellais commented 1 year ago

So I spent quite a bit of time debugging this. Apparently measuring coverage in multiprocessing code works differently between macOS and Unix systems, because one uses spawn to start the child processes, while the later uses fork().

pytest-cov, which is what we use, in theory should support this out of the box (see: https://pytest-cov.readthedocs.io/en/latest/subprocess-support.html), however since 16601bb93612f1b087452577fd63b99f9a894766 they dropped the multiprocessing.util.register_after_fork hook, which I believe is what causes it to not work properly when fork() is used.

Inspecting that diff I see that they instead rely on the coverage.py parallelism = multiprocessing configuration option to ensure coverage is being measured properly, though it seems like the docs haven't been updated to mention this (TODO: I should probably open a PR there to update their docs).

Adding a .coveragerc with those settings leads to coverage being reported properly 🥳 .

There is still an issue with the end-to-end tests sometimes deadlocking, which apparently is due to a python bug and is a known issue for pytest-cov specifically related to some shared lock not being released when used with sqlite (which we also use). I think we should probably investigate this further at some point to understand if the deadlock can also happens in production since it might not be pytest-cov specific.

For some reason the codecov comment has not been updated yet, but it is reporting the correct coverage information on their website: https://app.codecov.io/gh/ooni/data/pull/17.

I think we can proceed with landing this.