Aircloak / aircloak

This repository contains the Aircloak Air frontend as well as the code for our Cloak query and anonymization platform
2 stars 0 forks source link

Isolator cache warmup performance #2837

Closed obrok closed 6 years ago

obrok commented 6 years ago

I deployed the isolator code to @cristianberneanu's big data cloak to check out the performance. Findings:

  1. It works OK for the 34M-row accounts table - computing the isolator property for a single column takes on the order of 10 minutes.
  2. It crashes a lot for the 5G-row cstransactions. It seems it was able to compute the property for some columns in about 5 hours each. For other columns it times out after 12 hours in the database.
    • This makes it so that this table (and transactions, which it didn't even start processing after 3 days) cannot be queried in practice, because the queries will block until isolators are computed for the columns used in the query, which never happens, because of the timeouts.
    • @cristianberneanu notes that the transactions and cstransactions tables are projected, which most likely makes it slower than a regular table of comparable size.

So it seems like currently the limit with this solution is about 1e8 rows, perhaps 1e9 rows without a projection. We don't necessarily need to do anything about that right now, except maybe documenting. In the near future (this or next milestone) we could try to detect such timeouts and react, perhaps by excluding the given column from querying. I don't think it makes sense to extend the timeout, because it would mean waiting more than 12 hours per column. This assumes we don't figure out a way to make the isolator query faster, which would of course be nice.

For the particular case of something like this transactions table we could offer a virtual table that only contains the last N months of data. Because there is an index on created_at querying that table will be much faster.

@sasa1977, @sebastian (if you're receiving emails) - WDYT?

sebastian commented 6 years ago

Can we sample large tables to speed it up? I am not sure how large the sample would need to be to reduce chances of false positives and negatives to an acceptable level (nor yet quite clear on what an acceptable level would be).

cristianberneanu commented 6 years ago

I have a suggestion on how to improve performance a bit here:

There are two queries executed for each column: one to get the count of isolating values and the second one to get the count of unique values.

  SELECT COUNT(keep) FROM (
        SELECT 1 AS keep
        FROM table
        GROUP BY column
        HAVING COUNT(DISTINCT user_id) = 1
      ) x
 SELECT COUNT(keep) FROM (
        SELECT 1 AS keep
        FROM table
        GROUP BY column
      ) x

These results can be obtained with a single query like this:

  SELECT SUM(CAST(isolating AS integer)) AS isolating_values,  COUNT(*) AS unique_values FROM (
        SELECT COUNT(DISTINCT user_id) = 1 AS isolating
        FROM table
        GROUP BY column
      ) x

For the currency column in the accounts table, this drops the execution time from 30 seconds to 18 seconds.

One problem here is that we don't support boolean conditions in the select list. So either we add support for that or we need to hack around it using an extra cast: COUNT(DISTINCT user_id) = 1 is equivalent to NOT CAST(COUNT(DISTINCT user_id) - 1 AS bool). That should work since all counts should be >= 1.

And a side note: since we ignore null user ids in the normal queries, we should probably add a similar filter to these queries as well.

obrok commented 6 years ago

I have a suggestion on how to improve performance a bit here:

This is neat, but doesn't seem to help at all with the transactions case - the problem there is that a single query takes over 12 hours.

cristianberneanu commented 6 years ago

Well, in that case two queries are going to be even worse :)

sasa1977 commented 6 years ago

The queries we issue look pretty simple to me. Since we issue these queries via the cloak engine, I wonder whether these tables are at all queryable through aircloak?

Regarding the proposed optimization, I think it's a nice trick. One question is though, whether it would help with non-sql databases, such as mongo?

I share Pawel's sentiments that this probably won't be enough. I think we'll need to introduce some kind of sampling, because a query which takes hours will make the cloak unusable.

One approach which comes to mind is to stream the entire table into the cloak and compute the counts there. An obvious downside is that we're moving a lot of data into the cloak. However, this approach opens up some interesting possibilities. First, we can compute the entire table in a single pass. More importantly, we can introduce an "on-the-fly" computation. We could set the isolated properties after e.g. 1M rows are processed. At this point, the table is already queryable, while in the background we keep processing the remaining rows (and update the isolated property occasionally).

We should also consider computing concurrently for each table, because a large table will affect the usability of all other tables which are still pending in the queue.

obrok commented 6 years ago

The queries we issue look pretty simple to me. Since we issue these queries via the cloak engine, I wonder whether these tables are at all queryable through aircloak?

  1. The queries become more complex in the case of projections/virtual tables, as is the case with the transactions table
  2. @cristianberneanu was querying these huge tables with scoping by created_at. I think querying the whole table via aircloak would be just as untenable as computing isolation

We should also consider computing concurrently for each table, because a large table will affect the usability of all other tables which are still pending in the queue.

This has the downside of potentially overloading the DB server and increasing the total time required for these computations.

I share Pawel's sentiments that this probably won't be enough. I think we'll need to introduce some kind of sampling, because a query which takes hours will make the cloak unusable.

I'm doing some simulations on how effective sampling would be, will report the results.

cristianberneanu commented 6 years ago

Also, queries over columns with large numbers of unique values seem a lot slower than queries over columns with few unique values in them. For example, for the created_at column, the query takes 160 seconds. Maybe we should declare such columns, like datetimes and foreign keys as, as isolating by default.

obrok commented 6 years ago

I made some back-of-the-envelope simulations to look at how reliable sampling would be (code here https://gist.github.com/obrok/1c299c46c250e7612b79924f2b082c86). Seems like it could work quite decently when sampling 10% of the table and treating a table as isolating if 90% of the values in the sample have just one user. The most difficult case seems to be each value having very few users, but still more than 1. In that case the sampling is often tricked into thinking all those values are unique.

cristianberneanu commented 6 years ago

The nightly cloak was deployed 10 hours ago, but some data sources are still stuck in analyzing.

cristianberneanu commented 6 years ago

The nightly cloak was deployed 10 hours ago, but some data sources are still stuck in analyzing.

This was probably because of some bugs in the isolator checks, should be fixed by #2846 .

sebastian commented 6 years ago

We should also consider computing concurrently for each table, because a large table will affect the usability of all other tables which are still pending in the queue.

This has the downside of potentially overloading the DB server and increasing the total time required for these computations.

Maybe we shouldn't run the queries concurrently, but we could consider running queries for different tables in a round robin fashion across tables and data sources in a cloak. That way a small table has a chance of becoming ready sooner, instead of being backlogged behind a giant table? I.e. if we have data source A with table a1 and a2, and data source B with tables b1 and b2 then we could run the queries in the order columns from a1, column from b1, column from a2 ... etc


Has anyone played any more with sampling? From @obrok's calculations above it seems like 10% sample rate should be fine.

obrok commented 6 years ago

I.e. if we have data source A with table a1 and a2, and data source B with tables b1 and b2 then we could run the queries in the order columns from a1, column from b1, column from a2 ... etc

If the tables have roughly the same number of columns, then this doesn't help much.

sebastian commented 6 years ago

So I discussed this a bit further with @obrok offline.

I am OK with setting an upper limit on the size of the databases our isolator mechanism can support. It needs to be documented very clearly.

However we need a workaround in that case for the situations where the number of rows exceed the limit. Potential limits I have seen be thrown around are in the 10s of millions of rows. We know we will exceed this number for both DZ Bank and Datev (we are selling to DZ Bank at the moment, and have two active projects with Datev), and we exceed this in the NYC Taxi database case too (~173 million rows) so we cannot produce a solution where there isn't a manual workaround for the cases where the number of rows exceed whatever we set as the limit. A workaround could be manually specifying which columns are isolating in the cloak configuration. This is ugly, but better than denying service.

Further questions we should investigate/answer:

a) Get a random (smallish) set of distinct values from the table:

SELECT TOP X column
FROM table
WHERE RAND() > 0.5 

b) Get isolator status for these values:

SELECT isolator query ... FROM table
WHREE column IN (list of random column values)

We cannot release this version before we have a solution.

cristianberneanu commented 6 years ago

However we need a workaround in that case for the situations where the number of rows exceed the limit.

A more complex lookup algorithm might help a bit more than adding exceptions manually: if a query is not done in 10 minutes or so, it should be cancelled, moved at the end of the queue and sampling should be activated for it.

obrok commented 6 years ago

A more complex lookup algorithm might help a bit more than adding exceptions manually: if a query is not done in 10 minutes or so, it should be cancelled, moved at the end of the queue and sampling should be activated for it.

Good idea. If we extend this timeout to an hour it should support tables with ~10^8 rows, like the smaller table in the big_data cloak.

sasa1977 commented 6 years ago

I'll play a bit with streaming to cloak, just to get a feel of how much overhead does it require.

cristianberneanu commented 6 years ago

I did some experiments on the big_data data set. I will put my notes here, although I don't think this info is very useful or actionable:

  1. Having an index makes the computation ~3 times faster.

Processing 20 M rows without an index takes between 50 and 90 seconds (depending on the data in that column). Adding an index drops the time to between 15 and 32 seconds. But we can't require indexes for all columns beforehand, as that would be too wasteful.

  1. The computation is proportional with the number of unique values in the column.
Unique values (M) Time (s)
20 90
19.5 58
14 50
7 38
0.5 27
0.12 27

I assume that reducing the number of unique values will also reduce the memory consumption.

This has implications for columns having lots of unique values, like timestamps. Truncating those columns to minutes or even seconds should reduce the amount of memory needed and make the computation faster. Not sure if the returned result will still make sense.

  1. In some cases, the computation can be made a lot faster using LATERAL subqueries.

If most of the values are not isolating, using LATERAL subqueries makes the computation 3-5x faster (from 41 seconds to 8 seconds or from 38 seconds to 12 seconds). Otherwise, the query can become twice as slow. Since this has drawbacks, it is PostgreSQL specific and we can't really use it from the cloak, it doesn't seem very helpful.

select count(*), sum(cast(c = 1 as integer)) from (
  select count(t2.id) as c from 
     (select name from customers group by 1) as t1,
   lateral
     (select distinct id from customers where name = t1.name limit 2) AS t2
   group by t1.name
) t;
sasa1977 commented 6 years ago

I did some quick experiments with the streaming approach, and got some promising early results.

I tested locally by creating a larger performance dataset (basically the same as compliance dataset). On the notes_changes table (1.6M rows), I've gotten a speedup of about 4X.

The basic idea is to stream all the rows to cloak, and then aggregate in Elixir code.

This bring us the following benefits:

The current implementation uses only two processes: one which streams the data, and another which aggregates it into an ETS table. The streamer works concurrently to the aggregator, so the total latency is roughly max(network_latency, processing_latency). To avoid possible message queue bottlenecks, the streamer buffers at most one chunk.

The aggregator is currently sequential, but there's potential to make it concurrent, so we can aggregate different rows (and/or columns) in parallel.

The big downside of this approach is that we need to fit all the unique values of a single table in memory. This could be problematic, especially for larger values (e.g. free form long text columns). Perhaps we could try hashing the values to reduce the memory pressure (obviously, this makes the algorithm less reliable). Or we could just stop the computation if we run out of memory.

I was also playing with DETS instead of ETS. The benefit of DETS is that memory usage is constant (since we store everything to disk). Unfortunately, the performance is pretty poor, so I don't think this is a good approach.

I'll try to play a bit more with larger datasets we have in the MPI network, and see how that works. But judging by the current results, it seems that this is a promising option.

cristianberneanu commented 6 years ago

Both airdb and stage-airdb have the temp-file limit set to 512 MBs. That is pretty low and is making even some of the simplest queries error out on the nyctaxi dataset. Can I increase it on stage to see if it makes things better? There is enough disk space to be able to run multiple queries with a larger file limit.

sebastian commented 6 years ago

@cristianberneanu sure, give it a shot!

cristianberneanu commented 6 years ago

One of the main causes of the OOM crashes is that the VM hosting the nyctaxi data set had only 8 GB of RAM assigned. Pascal increased the amount up to 16 GB, which made the db server able to handle more load, although the query for computing the isolating status still crashes after a while.

But the query can be re-written to require less memory usage as this:

SELECT SUM(CAST(isolating AS integer)) AS isolating_values,  COUNT(*) AS unique_values FROM (
    SELECT MAX(id) - MIN(id) = 0 AS isolating
    FROM customers
    GROUP BY birthday
) x;

This version manages to finish successfully (although it takes between 15 and 90 minutes to compute the result per-column for 173 M rows and there are 14 columns in that table). Compared with the current version, it is about 3 times faster in all cases I tested on the 20 M rows table in the big_data data set (kind of ashamed I didn't think of it sooner...).

In any case, we should move the nyctaxi data set to the acatlas2 host, as that machine is a way better fit for hosting it.

obrok commented 6 years ago
SELECT SUM(CAST(isolating AS integer)) AS isolating_values,  COUNT(*) AS unique_values FROM (
    SELECT MAX(id) - MIN(id) = 0 AS isolating
    FROM customers
    GROUP BY birthday
) x;

Important to note that this version won't work for non-integer id

obrok commented 6 years ago

Oh, and it doesn't seem to be a legal aircloak query? If so, then we would need to have a specialized version per supported data source.

sasa1977 commented 6 years ago

important to note that this version won't work for non-integer id

This could perhaps be solved by select max(id) = min(id).

Oh, and it doesn't seem to be a legal aircloak query?

We don't support it, or there's a deliberate restriction?

cristianberneanu commented 6 years ago

The original condition was max(id) = min(id), which should work for all column types. But since we don't support boolean conditions in the select list, I rewrote it as cast((max(id) - min(id)) as boolean) which is supported by our compiler, although doesn't work for strings.

In case the user id is a string, we could hash the min and max values to integers, for which we have support in our system already. But the hash function requires emulation on mongodb data sets, which means it will be slow (although it might still be usable since the data set for TeamBank is small).

Probably the best option would be to add support for boolean conditions, but that won't be a trivial change at all.

So there are some drawbacks here as well ...

sebastian commented 6 years ago

We don't want to add boolean conditions for the analyst, but adding it as part of the queries our query generation backends can generate should be fine (from an anon perspective of course).

cristianberneanu commented 6 years ago

We don't want to add boolean conditions for the analyst

Why not? They can already work around it somewhat using casts.

sebastian commented 6 years ago

Why not? They can already work around it somewhat using casts.

More context in this issue: https://github.com/Aircloak/aircloak/issues/2417

obrok commented 6 years ago

Why not? They can already work around it somewhat using casts.

If so, then we should ban that workaround :)

obrok commented 6 years ago

In case the user id is a string, we could hash the min and max values to integers, for which we have support in our system already. But the hash function requires emulation on mongodb data sets, which means it will be slow (although it might still be usable since the data set for TeamBank is small).

One of my worries in this context is that if we produce some tricky, optimized version of this query it will perform well only on a specific backend (probably postgres) and not help any on the other backends. I'm almost leaning towards producing a per-backend version of the query now, if not for the various emulation features and virtual tables, which the isolator check would need to understand.

sasa1977 commented 6 years ago

I've been playing a bit with the streaming approach on the big_data database. I've mostly experimented with the customers table (20M).

The results were not very exciting, although there are some positives. Computing isolated for the entire table took about 20 minutes. On the cloak machine, the streaming approach is memory hungry. It allocated about 10GB of memory for the customers table. During the calculation, two CPU cores were pegged to 100%.

Computing isolated with the official version (offloading the query to the database) took about 4 minutes (for all columns). Therefore, the streaming approach is about 5 times slower in this case.

I must say that I was surprised with these numbers, given the reports earlier in this thread (e.g. "for the 34M-row accounts table - computing the isolator property for a single column takes on the order of 10 minutes").

A positive thing about the streaming approach is that I didn't notice any memory or CPU pressure on the database server. In contrast, the current approach with offloaded queries pegged the CPU of the database server to 100% and used a couple of GB of extra memory.

I also noticed that in the current approach, temp memory allocated by PostgreSQL is not released until we close the connection. It looks like, at least with PostgreSQL, we might be able to reduce memory usage if we didn't use the connection pool for isolated queries.

I didn't analyze deeper, but my impression is that the bottleneck in the streaming approach is the decoding part. I experimented with plain streaming/iterating (i.e. with no additional calculations), and that was still running slow. During that test, I observed a single CPU pegged at 100%, while all others were idle.

In any case, a very rough estimate is that with streaming, we can process at the rate of about 1M rows per minute.

obrok commented 6 years ago

for the 34M-row accounts table - computing the isolator property for a single column takes on the order of 10 minutes

4 minutes and 10 minutes are of the same order in my mind :) I'm sorry if that generates confusion

obrok commented 6 years ago

Oh, I guess you do mean "for all columns" not "for each column". Perhaps @cristianberneanu's change improved things?

sasa1977 commented 6 years ago

From what I could tell, the current mechanism (offloaded query) varies a lot depending on the variance of the data. So for one column it can take much longer than for another. Perhaps some indexes were being added, or otherwise you just reported the worst case.

yoid2000 commented 6 years ago

Quick question. If we can label the column with queries that are like this:

SELECT count(DISTINCT col) FROM table

Then will the memory etc. problems go away?

obrok commented 6 years ago

Then will the memory etc. problems go away?

I don't think so. This query still requires keeping all distinct values in memory or in a tempfile.

cristianberneanu commented 6 years ago

Quick question. If we can label the column with queries that are like this: SELECT count(DISTINCT col) FROM table Then will the memory etc. problems go away?

I think so. Or, at least, should be a lot more efficient. This version only needs to keep a list of the distinct values present in that column. Although DISTINCT implies sorting in PostgreSQL, so this version:

SELECT count(*) FROM (SELECT 1 FROM table GROUP BY col) AS t

should be way faster.

The current version needs to keep a list of all the distinct values and a list of all the distinct user ids for each value. The improved version only needs to keep max user id and min user id for each distinct value.

The slowest columns are those with lots of distinct values (like timestamps with sub-second accuracy) and large user ids (like it is the case for the nyctaxi dataset).

But the hash function requires emulation on mongodb data sets, which means it will be slow (although it might still be usable since the data set for TeamBank is small).

Also, the current version is partially emulated already on mongodb data sets as global aggregators can't be properly offloaded to the mongo aggregation pipeline.

yoid2000 commented 6 years ago

So a couple of thoughts. First, I think if we have the number of users, the number of rows, and the number of distinct column values, then in many cases we can determine isolating or not from just this information. In cases where we can't we additionally need to estimate the correlation between value and user (still thinking about this).

Second, have we considered just taking a small random set of values (10 or 20 or them), and then seeing how many distinct users are associated with each value? (Maybe this is a different form of sampling than what @obrok played with?)

obrok commented 6 years ago

in many cases ... In cases where we can't

IMO this is very problematic, because it's twice the amount of work in terms of implementation and maintenance and we still have to solve the hard case anyway. A simplification that only works in a limited number of cases like this can be considered, once we have the general case solved to a satisfying degree.

Second, have we considered just taking a small random set of values (10 or 20 or them), and then seeing how many distinct users are associated with each value? (Maybe this is a different form of sampling than what @obrok played with?)

How would you like to find these values? Would these be 10 or 20 random rows or 10 or 20 random values from a list of DISTINCT?

That + what @cristianberneanu said gave me an outline of an idea... It would require 2 scans, but should have less memory pressure. We take N random values defined as random selections from the list of distinct values (first scan). This number can be fairly large, but constant (let's say 10k). Then we scope the table by those values and compute the isolation as today, in the database (what @yoid2000 said). This step should have a bounded memory footprint, because in principle we should need only one integer in memory per value (the count of users for that value). An extra benefit here is that as an improvement we can combine results from many such computations to get greater accuracy over time.

@cristianberneanu, @sasa1977 - does this make sense or am I missing something?

obrok commented 6 years ago

I guess the problem would be that we would either need to fetch all values into the cloak or somehow sample them in the DB (after doing DISTINCT).

yoid2000 commented 6 years ago

IMO this is very problematic, because it's twice the amount of work in terms of implementation and maintenance and we still have to solve the hard case anyway. A simplification that only works in a limited number of cases like this can be considered, once we have the general case solved to a satisfying degree.

If counting distinct values in the DB is acceptable, then I'll go ahead and think through the rest of it. With the intent that there is a general solution that doesn't require any prohibitively expensive queries...

cristianberneanu commented 6 years ago

I am more in favor of implementing the min/max optimization first and retesting. That, coupled with increased RAM on airdb/stage and moving the nyctaxi dataset to acatlas2, might completely eliminate the OOM issues. If performance is still not acceptable, we can experiment with truncating timestamps at second intervals and hashing text user ids or using count(distinct col) instead.

sasa1977 commented 6 years ago

Taking a large-ish constant sample of distinct values looks good to me. It should be tested how this performs in practice. I also wonder can this be done in mongodb?

sasa1977 commented 6 years ago

coupled with increased RAM on airdb/stage and moving the nyctaxi dataset to acatlas2, might completely eliminate the OOM issues

In practice, this means that we might require extra resources on the database server machine. When we ship our system to clients, we need to warn them that without enough available RAM on the db server, starting our system might lead to the crash of their database.

sebastian commented 6 years ago

Unless we are confident the procedure "will always work" we have to provide a means of manually classifying columns. When the manual classifications are given we should not perform automatic checks against the DB. If we were to not offer such an alternative there would be no way of fully starting our system in cases where the isolation queries cannot be executed successfully. I'll create an issue for this.

yoid2000 commented 6 years ago

I am more in favor of implementing the min/max optimization first and retesting.

This is fine with me. Nevertheless I'll sketch what I'm thinking in case we need/prefer it.

cristianberneanu commented 6 years ago

because in principle we should need only one integer in memory per value (the count of users for that value).

Counting the users per value requires keeping a list of all the distinct users for that value.

I also wonder can this be done in mongodb?

Mongo has a sample operation, although we don't support it currently.

yoid2000 commented 6 years ago

Here is more or less what I have in mind. Let U be number of distinct users. Let R be number of rows. Let V be number of distinct values per column. So:

SELECT count(*) AS R FROM table

SELECT count(DISTINCT uid) AS U FROM table

SELECT count(DISTINCT col) AS V FROM table

  1. If U/V<0.5, then we can label the column as non-isolating.
  2. If U/V>0.5 AND R/U=1, then we can label the column as isolating.
  3. If R/V>10, then we can (with high confidence) label the column as non-isolating.

(These constants, 0.5, and 10, can be adjusted based on experimentation.)

If none of these are true, then we need to know if there is a strong correlation between values and users. For this we do queries of the sort:

SELECT count(DISTINCT col) AS V2 FROM table 
WHERE <roughly half of users are selected>
SELECT count(DISTINCT uid) AS U2 FROM table 
WHERE <the same roughly half users are selected>

Perfect correlation (every distinct value is assigned to one user) would result in (U2/U)=(V2/V) being true. The lower the correlation, the higher the value of V2/V.

With some thought/experimentation, we can compute where the isolating/non-isolating threshold should be for values of R, U, V, U2, and V2. My intuition is that this would give us a perfectly acceptable measure.

obrok commented 6 years ago

If none of these are true, then we need to know if there is a strong correlation between values and users. For this we do queries of the sort:

IMO this would only be worth it if we never had to go to the "worst case" and only required R, U, V to find the answer. Computing R, U, V is already about as complex as the other approaches we're considering here.

cristianberneanu commented 6 years ago

Computing R, U, V is already about as complex as the other approaches we're considering here

But one (big) advantage here is they can be computed for all columns with a single sequential scan, if there is enough memory available.