Aircloak / aircloak

This repository contains the Aircloak Air frontend as well as the code for our Cloak query and anonymization platform
2 stars 0 forks source link

Find simple fix to KIT timing attack #4881

Closed yoid2000 closed 3 years ago

yoid2000 commented 3 years ago

@sebastian @cristianberneanu @edongashi

The KIT (Karlsruhe Institute of Tech) micro-timing attack for the Dogwood bounty is going to be published.

I'm wondering if there might be an easy fix to the attack that we could deploy to show that the attack doesn't work.

The attack works by detecting a difference in delay of execution time for queries where the data returned from the database to the cloak has either one user or zero users.

The difference in execution time is roughly 1 ms.

The theory is that most of this difference is due to how the cloak processes the two answers. If there are zero rows, then the cloak immediately knows that the answer should be suppressed. If there is at least one row, then the cloak has to do some computation to determine if suppression should take place or not. Presumably this computation takes upwards of 1ms, and that accounts for the difference.

My question is whether you think this theory might be correct (i.e. there is indeed computation that might take upwards of 1ms to run).

And if so, is there a potentially easy fix? For instance, running a dummy LCF computation that is roughly equivalent to the real one that would have taken place?

edongashi commented 3 years ago

Could the following be the empty rows check? https://github.com/Aircloak/aircloak/blob/58f78a65e754b7dca8f23d683cd0407d85d369ac/cloak/lib/cloak/query/aggregator.ex#L247-L263

The computation of either branch is fairly simple. I suppose the function call may incur a tiny difference...

yoid2000 commented 3 years ago

Hmmmm. make_non_empty_buckets() calls the logger:

https://github.com/Aircloak/aircloak/blob/58f78a65e754b7dca8f23d683cd0407d85d369ac/cloak/lib/cloak/query/aggregator.ex#L265-L271

Is it possible that the logger takes up that time? Does anyone know what the Logger setting is for attack.aircloak?

If this is the reason, then the fix might be as easy as adding a Logger call at line 250...

edongashi commented 3 years ago

I see another potential difference here https://github.com/Aircloak/aircloak/blob/58f78a65e754b7dca8f23d683cd0407d85d369ac/cloak/lib/cloak/query/aggregator.ex#L173-L176

Edit: I think this isn't related...

yoid2000 commented 3 years ago

Is aggregate_lcf_buckets(() called before the LCF decision or after? (From the name of the function, it sounds like the thing that decides how to merge LCF'd buckets. This attack results in LCF either way.)

edongashi commented 3 years ago

The execution order goes like this: https://github.com/Aircloak/aircloak/blob/58f78a65e754b7dca8f23d683cd0407d85d369ac/cloak/lib/cloak/query/aggregator.ex#L44-L49

yoid2000 commented 3 years ago

I checked the logs between queries with one user and with zero users. I'll put the (edited) logs here, and then in the next comment write more.

Lines prepended with *** are the log events in the one-user trace that don't appear in the zero-user trace. (I also put a couple of elapsed timing notes.)

Zero users:

 07:32:35.701 [debug] query_id=<qid> Running statement `select count(DISTINCT lastname)
from accounts
where lastname like 'abcdefgh'` ...
 07:32:35.702 [debug] query_id=<qid> Query <qid> state changed to: parsing...
 07:32:35.774 [debug] query_id=<qid> Query <qid> state changed to: compiling...
 07:32:35.794 [debug] query_id=<qid> Query <qid> state changed to: awaiting_data...
 07:32:35.795 [debug] query_id=<qid> Offloading query ...: `SELECT "__ac_regular_stats"."__ac_count_duid","__ac_distinct_stats0"."__ac_count_distinct","__ac_distinct_stats0"."__ac_noise_factor_count","__ac_distinct_stats0"."__ac_noise_factor_sum","__ac_distinct_stats0"."__ac_noise_factor_min","__ac_distinct_stats0"."__ac_noise_factor_max","__ac_distinct_stats0"."__ac_noise_factor_stddev","__ac_regular_stats"."__ac_grouping_id","__ac_regular_stats"."__ac_min_uid","__ac_regular_stats"."__ac_max_uid","__ac_regular_stats"."__ac_min_uid" AS "__ac_nlc__1","__ac_regular_stats"."__ac_max_uid" AS "__ac_nlc__2" FROM (SELECT "__ac_uid_grouping"."__ac_grouping_id" AS "__ac_grouping_id",COUNT("__ac_uid_grouping"."uid") AS "__ac_count_duid",MIN("__ac_uid_grouping"."uid") AS "__ac_min_uid",MAX("__ac_uid_grouping"."uid") AS "__ac_max_uid" FROM (SELECT "accounts"."uid" AS "uid",0 AS "__ac_grouping_id" FROM "accounts" WHERE ("accounts"."uid" IS NOT NULL AND ("accounts"."lastname" = 'abcdefgh')) GROUP BY "accounts"."uid") AS "__ac_uid_grouping" GROUP BY "__ac_uid_grouping"."__ac_grouping_id") AS "__ac_regular_stats" INNER JOIN (SELECT "__ac_uid_grouping"."__ac_grouping_id" AS "__ac_grouping_id",SUM("__ac_uid_grouping"."__ac_count_distinct") AS "__ac_count_distinct",COUNT("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_count",SUM("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_sum",MIN("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_min",MAX("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_max",STDDEV("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_stddev" FROM (SELECT "__ac_distinct_values"."__ac_grouping_id" AS "__ac_grouping_id","__ac_distinct_values"."__ac_user_id" AS "__ac_user_id",COUNT("__ac_distinct_values"."__ac_target") AS "__ac_count_distinct",(CAST("__ac_distinct_values"."__ac_user_id" IS NOT NULL AS integer)*CAST(COUNT("__ac_distinct_values"."__ac_target") AS bigint)) AS "__ac_noise_factor" FROM (SELECT 0 AS "__ac_grouping_id","accounts"."lastname" AS "__ac_target",CASE WHEN (MIN("accounts"."uid") = MAX("accounts"."uid")) THEN MIN("accounts"."uid") ELSE NULL END AS "__ac_user_id" FROM "accounts" WHERE ("accounts"."uid" IS NOT NULL AND ("accounts"."lastname" = 'abcdefgh')) GROUP BY "accounts"."lastname") AS "__ac_distinct_values" GROUP BY "__ac_distinct_values"."__ac_grouping_id", "__ac_distinct_values"."__ac_user_id") AS "__ac_uid_grouping" GROUP BY "__ac_uid_grouping"."__ac_grouping_id") AS "__ac_distinct_stats0" ON ("__ac_distinct_stats0"."__ac_grouping_id" = "__ac_regular_stats"."__ac_grouping_id")` ...
 07:32:35.795 [debug] query_id=<qid> Selecting data ...
 07:32:35.965 [debug] query_id=<qid> Anonymizing query result ...
 07:32:35.966 [debug] query_id=<qid> Executing SQL query: SELECT "__ac_regular_stats"."__ac_count_duid","__ac_distinct_stats0"."__ac_count_distinct","__ac_distinct_stats0"."__ac_noise_factor_count","__ac_distinct_stats0"."__ac_noise_factor_sum","__ac_distinct_stats0"."__ac_noise_factor_min","__ac_distinct_stats0"."__ac_noise_factor_max","__ac_distinct_stats0"."__ac_noise_factor_stddev","__ac_regular_stats"."__ac_grouping_id","__ac_regular_stats"."__ac_min_uid","__ac_regular_stats"."__ac_max_uid","__ac_regular_stats"."__ac_min_uid" AS "__ac_nlc__1","__ac_regular_stats"."__ac_max_uid" AS "__ac_nlc__2" FROM (SELECT "__ac_uid_grouping"."__ac_grouping_id" AS "__ac_grouping_id",COUNT("__ac_uid_grouping"."uid") AS "__ac_count_duid",MIN("__ac_uid_grouping"."uid") AS "__ac_min_uid",MAX("__ac_uid_grouping"."uid") AS "__ac_max_uid" FROM (SELECT "accounts"."uid" AS "uid",0 AS "__ac_grouping_id" FROM "accounts" WHERE ("accounts"."uid" IS NOT NULL AND ("accounts"."lastname" = 'abcdefgh')) GROUP BY "accounts"."uid") AS "__ac_uid_grouping" GROUP BY "__ac_uid_grouping"."__ac_grouping_id") AS "__ac_regular_stats" INNER JOIN (SELECT "__ac_uid_grouping"."__ac_grouping_id" AS "__ac_grouping_id",SUM("__ac_uid_grouping"."__ac_count_distinct") AS "__ac_count_distinct",COUNT("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_count",SUM("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_sum",MIN("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_min",MAX("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_max",STDDEV("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_stddev" FROM (SELECT "__ac_distinct_values"."__ac_grouping_id" AS "__ac_grouping_id","__ac_distinct_values"."__ac_user_id" AS "__ac_user_id",COUNT("__ac_distinct_values"."__ac_target") AS "__ac_count_distinct",(CAST("__ac_distinct_values"."__ac_user_id" IS NOT NULL AS integer)*CAST(COUNT("__ac_distinct_values"."__ac_target") AS bigint)) AS "__ac_noise_factor" FROM (SELECT 0 AS "__ac_grouping_id","accounts"."lastname" AS "__ac_target",CASE WHEN (MIN("accounts"."uid") = MAX("accounts"."uid")) THEN MIN("accounts"."uid") ELSE NULL END AS "__ac_user_id" FROM "accounts" WHERE ("accounts"."uid" IS NOT NULL AND ("accounts"."lastname" = 'abcdefgh')) GROUP BY "accounts"."lastname") AS "__ac_distinct_values" GROUP BY "__ac_distinct_values"."__ac_grouping_id", "__ac_distinct_values"."__ac_user_id") AS "__ac_uid_grouping" GROUP BY "__ac_uid_grouping"."__ac_grouping_id") AS "__ac_distinct_stats0" ON ("__ac_distinct_stats0"."__ac_grouping_id" = "__ac_regular_stats"."__ac_grouping_id")
     15ms
 07:32:35.981 [debug] query_id=<qid> Terminating streamer process
 07:32:35.981 [debug] query_id=<qid> Initializing anonymizers ...
 07:32:35.981 [debug] query_id=<qid> Query <qid> state changed to: processing...
 07:32:35.981 [debug] query_id=<qid> Processing low count users ...
 07:32:35.981 [debug] query_id=<qid> Aggregating groups ...
 07:32:35.981 [debug] query_id=<qid> Query <qid> state changed to: post_processing...
 07:32:35.981 [info] query_id=<qid> JSON_LOG {"execution_time":280,"query_id":"<qid>","status":"Successful.","type":"query_complete"}
 07:32:35.981 [info] query_id=<qid> query finished
 07:32:35.982 [info] query_id=<qid> sending query result to Air
 07:32:35.984 [info] query_id=<qid> query result sent

One user:

 07:34:22.326 [debug] query_id=<qid> Running statement `select count(DISTINCT lastname)
from accounts
where lastname like 'Leblanc'` ...
 07:34:22.327 [debug] query_id=<qid> Query <qid> state changed to: parsing...
 07:34:22.398 [debug] query_id=<qid> Query <qid> state changed to: compiling...
 07:34:22.418 [debug] query_id=<qid> Query <qid> state changed to: awaiting_data...
 07:34:22.419 [debug] query_id=<qid> Offloading query ...: `SELECT "__ac_regular_stats"."__ac_count_duid","__ac_distinct_stats0"."__ac_count_distinct","__ac_distinct_stats0"."__ac_noise_factor_count","__ac_distinct_stats0"."__ac_noise_factor_sum","__ac_distinct_stats0"."__ac_noise_factor_min","__ac_distinct_stats0"."__ac_noise_factor_max","__ac_distinct_stats0"."__ac_noise_factor_stddev","__ac_regular_stats"."__ac_grouping_id","__ac_regular_stats"."__ac_min_uid","__ac_regular_stats"."__ac_max_uid","__ac_regular_stats"."__ac_min_uid" AS "__ac_nlc__1","__ac_regular_stats"."__ac_max_uid" AS "__ac_nlc__2" FROM (SELECT "__ac_uid_grouping"."__ac_grouping_id" AS "__ac_grouping_id",COUNT("__ac_uid_grouping"."uid") AS "__ac_count_duid",MIN("__ac_uid_grouping"."uid") AS "__ac_min_uid",MAX("__ac_uid_grouping"."uid") AS "__ac_max_uid" FROM (SELECT "accounts"."uid" AS "uid",0 AS "__ac_grouping_id" FROM "accounts" WHERE ("accounts"."uid" IS NOT NULL AND ("accounts"."lastname" = 'Leblanc')) GROUP BY "accounts"."uid") AS "__ac_uid_grouping" GROUP BY "__ac_uid_grouping"."__ac_grouping_id") AS "__ac_regular_stats" INNER JOIN (SELECT "__ac_uid_grouping"."__ac_grouping_id" AS "__ac_grouping_id",SUM("__ac_uid_grouping"."__ac_count_distinct") AS "__ac_count_distinct",COUNT("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_count",SUM("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_sum",MIN("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_min",MAX("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_max",STDDEV("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_stddev" FROM (SELECT "__ac_distinct_values"."__ac_grouping_id" AS "__ac_grouping_id","__ac_distinct_values"."__ac_user_id" AS "__ac_user_id",COUNT("__ac_distinct_values"."__ac_target") AS "__ac_count_distinct",(CAST("__ac_distinct_values"."__ac_user_id" IS NOT NULL AS integer)*CAST(COUNT("__ac_distinct_values"."__ac_target") AS bigint)) AS "__ac_noise_factor" FROM (SELECT 0 AS "__ac_grouping_id","accounts"."lastname" AS "__ac_target",CASE WHEN (MIN("accounts"."uid") = MAX("accounts"."uid")) THEN MIN("accounts"."uid") ELSE NULL END AS "__ac_user_id" FROM "accounts" WHERE ("accounts"."uid" IS NOT NULL AND ("accounts"."lastname" = 'Leblanc')) GROUP BY "accounts"."lastname") AS "__ac_distinct_values" GROUP BY "__ac_distinct_values"."__ac_grouping_id", "__ac_distinct_values"."__ac_user_id") AS "__ac_uid_grouping" GROUP BY "__ac_uid_grouping"."__ac_grouping_id") AS "__ac_distinct_stats0" ON ("__ac_distinct_stats0"."__ac_grouping_id" = "__ac_regular_stats"."__ac_grouping_id")` ...
 07:34:22.419 [debug] query_id=<qid> Selecting data ...
 07:34:22.630 [debug] query_id=<qid> Anonymizing query result ...
 07:34:22.632 [debug] query_id=<qid> Executing SQL query: SELECT "__ac_regular_stats"."__ac_count_duid","__ac_distinct_stats0"."__ac_count_distinct","__ac_distinct_stats0"."__ac_noise_factor_count","__ac_distinct_stats0"."__ac_noise_factor_sum","__ac_distinct_stats0"."__ac_noise_factor_min","__ac_distinct_stats0"."__ac_noise_factor_max","__ac_distinct_stats0"."__ac_noise_factor_stddev","__ac_regular_stats"."__ac_grouping_id","__ac_regular_stats"."__ac_min_uid","__ac_regular_stats"."__ac_max_uid","__ac_regular_stats"."__ac_min_uid" AS "__ac_nlc__1","__ac_regular_stats"."__ac_max_uid" AS "__ac_nlc__2" FROM (SELECT "__ac_uid_grouping"."__ac_grouping_id" AS "__ac_grouping_id",COUNT("__ac_uid_grouping"."uid") AS "__ac_count_duid",MIN("__ac_uid_grouping"."uid") AS "__ac_min_uid",MAX("__ac_uid_grouping"."uid") AS "__ac_max_uid" FROM (SELECT "accounts"."uid" AS "uid",0 AS "__ac_grouping_id" FROM "accounts" WHERE ("accounts"."uid" IS NOT NULL AND ("accounts"."lastname" = 'Leblanc')) GROUP BY "accounts"."uid") AS "__ac_uid_grouping" GROUP BY "__ac_uid_grouping"."__ac_grouping_id") AS "__ac_regular_stats" INNER JOIN (SELECT "__ac_uid_grouping"."__ac_grouping_id" AS "__ac_grouping_id",SUM("__ac_uid_grouping"."__ac_count_distinct") AS "__ac_count_distinct",COUNT("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_count",SUM("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_sum",MIN("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_min",MAX("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_max",STDDEV("__ac_uid_grouping"."__ac_noise_factor") AS "__ac_noise_factor_stddev" FROM (SELECT "__ac_distinct_values"."__ac_grouping_id" AS "__ac_grouping_id","__ac_distinct_values"."__ac_user_id" AS "__ac_user_id",COUNT("__ac_distinct_values"."__ac_target") AS "__ac_count_distinct",(CAST("__ac_distinct_values"."__ac_user_id" IS NOT NULL AS integer)*CAST(COUNT("__ac_distinct_values"."__ac_target") AS bigint)) AS "__ac_noise_factor" FROM (SELECT 0 AS "__ac_grouping_id","accounts"."lastname" AS "__ac_target",CASE WHEN (MIN("accounts"."uid") = MAX("accounts"."uid")) THEN MIN("accounts"."uid") ELSE NULL END AS "__ac_user_id" FROM "accounts" WHERE ("accounts"."uid" IS NOT NULL AND ("accounts"."lastname" = 'Leblanc')) GROUP BY "accounts"."lastname") AS "__ac_distinct_values" GROUP BY "__ac_distinct_values"."__ac_grouping_id", "__ac_distinct_values"."__ac_user_id") AS "__ac_uid_grouping" GROUP BY "__ac_uid_grouping"."__ac_grouping_id") AS "__ac_distinct_stats0" ON ("__ac_distinct_stats0"."__ac_grouping_id" = "__ac_regular_stats"."__ac_grouping_id")
      16ms
 *** 07:34:22.648 [debug] query_id=<qid> Query <qid> state changed to: ingesting_data...
      2ms
 07:34:22.650 [debug] query_id=<qid> Terminating streamer process
 07:34:22.651 [debug] query_id=<qid> Initializing anonymizers ...
 07:34:22.651 [debug] query_id=<qid> Processing low count users ...
 07:34:22.651 [debug] query_id=<qid> Aggregating groups ...
 *** 07:34:22.651 [debug] query_id=<qid> Query <qid> state changed to: processing...
 07:34:22.651 [debug] query_id=<qid> Query <qid> state changed to: post_processing...
 07:34:22.651 [info] query_id=<qid> JSON_LOG {"execution_time":325,"query_id":"<qid>","status":"Successful.","type":"query_complete"}
 07:34:22.651 [info] query_id=<qid> query finished
 07:34:22.651 [info] query_id=<qid> sending query result to Air
 07:34:22.654 [info] query_id=<qid> query result sent
yoid2000 commented 3 years ago

The most suspicious extra line in the one-user trace is the log message:

state changed to: ingesting_data...

Note that this consumes an extra 2ms, which is roughly the averaged difference in the attack queries.

edongashi commented 3 years ago

When we get a query end without any row we don't update ingesting_data state at all. I think empty queries go straight to :done. The state updater calls a genserver which could be expensive. wrong

https://github.com/Aircloak/aircloak/blob/41d4d27640c7315830eca4c0d038c19bf9ddbfb5/cloak/lib/cloak/query/db_emulator.ex#L113-L122

I also fear that the actual row fetching and mapping from the database takes time...

yoid2000 commented 3 years ago

Given the exploit that Edon found, this really does need to be fixed. Jesus.

How about if we always go into :ingesting_data, and we always explicitly try to read data from the DB?

Then we only go to :done state when the DB gives us whatever end-of-data that it normally gives during ingestion.

This would not only make the log messages identical for the zero-users and one-user cases, but would insert extra delay into the execution which hopefully would make the attack less effective or even not at all effective.

sebastian commented 3 years ago

How about putting a lower bound on query execution time? Say have a timer that ensures a query never completes in less than 5ms? That way we don't need to deal with complex changes to the execution path.

edongashi commented 3 years ago

The presence of :ingesting_data update is clearly visible outside of cloak and must be fixed. Also the idea of rounding runtime sounds good. Maybe round up to nearest multiple of x ms?

sebastian commented 3 years ago

The presence of :ingesting_data update is clearly visible outside of cloak and must be fixed.

Yes, that's clearly the case.

Also the idea of rounding runtime sounds good. Maybe round up to nearest multiple of x ms?

I have played with this idea in the past. One could make the runtime fall into a grid of money rounded set of growing time intervals. This would help make the system more robust against timing attacks more generally. However, I think in this instance it seems what we need really is nothing but a lower bound?

edongashi commented 3 years ago

I think in this instance it seems what we need really is nothing but a lower bound?

My suspicion is that air query lifecycle updater gets one extra message for non-empty queries, causing the delay. If that's the case then we don't need any further fixes.

yoid2000 commented 3 years ago

How about putting a lower bound on query execution time? Say have a timer that ensures a query never completes in less than 5ms? That way we don't need to deal with complex changes to the execution path.

This may well be more complex than whatever change to the execution path is needed. Let's at least understand what changes would be needed...

Also in general, putting a lower bound doesn't necessarily fix the problem. If the attack can come up with a pair of queries that just happens to take longer than X msec, whatever X is, then they can still run the attack.

The presence of :ingesting_data update is clearly visible outside of cloak and must be fixed.

Right. Not just a matter of timing.

There seem to be two possible sources of delay:

  1. A round trip to the backend database by the cloak to ingest data.
  2. Some extra processing in the Air to deal with the additional log message

It seems to me that if we fix the first one, then we automatically fix the second one. And a sounder fix to the first one would be to not only add the debug message, but to literally check the backend DB to see if there is more data (even though we "know" that there isn't).

Why not at least look at what the changes would look like.

@cristianberneanu you have some thoughts?

yoid2000 commented 3 years ago

One could make the runtime fall into a grid of money rounded set of growing time intervals.

I missed this. This probably generally works. There is still the danger that an attack exploits the middle point of two such intervals, so that query1 triggers one time interval whereas query2 triggers the next one.

But it does seem to me that this would likely be a way more complex fix than changing the ingestion excution path...

sebastian commented 3 years ago

But it does seem to me that this would likely be a way more complex fix than changing the ingestion execution path...

Depending on how complex we need it to be, maybe.

Do we need each execution stage to take a money rounded amount of time? If so, then it's somewhat more complex, but not necessarily overly so. If we only care about the entirety of the query execution time (given we should be outputting rounded time values in the log anyway) then it's not complex at all. It would then be a matter of recording a timestamp at the start of the query, and then sleeping for the diff to the nearest allowed time bucket before returning the results.

sebastian commented 3 years ago

but to literally check the backend DB to see if there is more data (even though we "know" that there isn't).

Can you elaborate on this?

We tend to stream data. The database tells us when there isn't any more data. What would the check be? Rerunning the query? Just forcing some arbitrary round trip?

yoid2000 commented 3 years ago

We tend to stream data. The database tells us when there isn't any more data. What would the check be? Rerunning the query? Just forcing some arbitrary round trip?

No no, nothing so complex as that.

I imagine we currently do something like this:

query_result = makeQuery()
if query_result.amount_of_data != None:
    state.set('ingesting_data')
    while data_to_stream:
        data += stream_more_data()
    data.process()
data.post_process()
etc.

I'm suggest we do this instead:

query_result = makeQuery()
state.set('ingesting_data')
while data_to_stream:
    data += stream_more_data()
data.process()
data.post_process()
etc.

In other words, we simply don't do the optimization of checking in advance as to whether there is data to stream or not.

If there is no data, then the while data_to_stream loop never actually receives any data. But, it should go to the database and ask for data so that it is doing nearly the same amount of work it would have done if there was one row to retrieve.

Of course maybe it isn't this simple. I'm just imagining it works as I say above, but maybe it doesn't...

cristianberneanu commented 3 years ago

The current handling of the ingesting_data lifecycle step was done deliberately so that it separate database execution time from the cloak execution time for reporting purposes (so that customers know when the database is slow or when the cloak is slow). I am not against merging the two.

Money-rounding total time sounds good to me, as it might handle more complicated cases better (longer running queries, multiple anonymizing queries, costly joins, etc.)

edongashi commented 3 years ago

@cristianberneanu my hack in this commit changes it so that we update the status before entering the stream loop. I suppose there won't be a big time difference between these two callsites?

We absolutely need to report the :ingesting_data status even if there are no data.

cristianberneanu commented 3 years ago

But wouldn't that end up sending the ingest_data message twice? I would instead change https://github.com/Aircloak/aircloak/blob/master/cloak/lib/cloak/query/db_emulator.ex#L105-L106 to:

state_updater.(nil, :ingesting_data)
%Query{query | subquery?: not query.emulated? and query.type != :anonymized, where: Query.offloaded_where(query)}
|> DataSource.Streamer.rows()

and drop reporting from streaming completely.

edongashi commented 3 years ago

I have swapped reporter for nil when calling process_chunks as a minimally intrusive hack to test if this actually makes a difference.

edongashi commented 3 years ago

Hmm, if we drop reporter at the root then we assume connection will be successful. It can fail here https://github.com/Aircloak/aircloak/blob/aa84ecdcc8db9f01df08efaddebade60f5a969b3/cloak/lib/cloak/data_source/connection.ex#L43-L51

cristianberneanu commented 3 years ago

Why does that matter? If the connection fails, the entire pipeline is stopped.

edongashi commented 3 years ago

Okay, if that's not an issue then this can be cleaned up by a lot. reporter is being passed so much around it's annoying...

cristianberneanu commented 3 years ago

Yeah, I remember we added a lot of complexity in the pipeline just so that we report the ingestion state on data arrival instead of before issuing the request.

sebastian commented 3 years ago

Considering this fixed and ready to be closed.