MaterializeInc / materialize

The Cloud Operational Data Store: use SQL to transform, deliver, and act on fast-changing data.
https://materialize.com
Other
5.72k stars 466 forks source link

Compaction invalidates results at certain times, not reflected by us #2803

Open frankmcsherry opened 4 years ago

frankmcsherry commented 4 years ago

Traces maintained in materialized all have a since compaction frontier, which describes the times for which the trace is capable of producing correct outputs. Times beyond the frontier will produce correct accumulations, and times not beyond the frontier may or may not produce correct results.

In several cases we happily use traces without consulting their since frontiers, producing results that are only correct for the upper bound of these frontiers.


In more detail:

Because we want to maintain bounded memory footprints as a computation runs, we take advantage of differential's ability to compact traces. Trace compaction is enabled by irrevocably downgrading the set of times at which differential is obliged to faithfully represent the contents of each trace, which allows it to compact updates that lie outside this set of times. The accumulations may still be correct at some times, but we may have lost some historical detail that allowed us to correctly describe updates in the past.

There is an associated policy for how to compact things at the moment, but this could change and generally the since member associated with each trace is where we should find this information.

When we perform queries like SELECT <complicated> we pick a timestamp, and work relatively hard to make sure that the timestamp will produce correct results (specifically, that it is beyond each of the since frontiers of involved materializations). Good for us!

However, at the moment this is the only case in which we do this.

  1. When we create a materialization from other materializations (and perhaps unmaterialized sources) we should set the since of this new materialization to be the upper bound of the since frontiers of the participating materializations (directly and reachable from views).

  2. When we create a sink from other materializations (and perhaps unmaterialized sources) we should be aware that we are not producing correct results at all times. We should find a way to communicate that the outputs are only correct from some since frontier forward. That isn't a concept that most other stream media speak about, so it's not exactly clear where this goes in a Kafka sink or an output file (the filename?).

  3. We probably want to start to think about attaching a since to each source. We currently assume that a source provides complete and correctly timestamped updates from the beginning of time, which is not always going to be an accurate assumption (should we ever want to read in sinks that we've produced, for example, we have very specific information about from what times the data are accurate). As with sinks above, this isn't information that Kafka or files or whatever makes clear, so we may have to learn to invent it for them, but we should be able to accept and respect that information when provided.

cc @ruchirK @justinj

ruchirK commented 4 years ago

So I played around with the billing demo to see if I could reproduce any weirdness w/ two sinks reflecting different [lower, upper) bounds of times that they understand

  1. Ran the billing demo as normal with 100 messages. The billing demo automatically creates a sink for us at the begining
  2. Observe the number of records stored in the billing_monthly_statement dataflow's arrangement's vs the number of rows that view is outputting
materialize=> select * from mz_records_per_dataflow_global order by records;
  id  |                                name                                | records
------+--------------------------------------------------------------------+---------
 --snip--
 1674 | Dataflow: materialize.public.billing_monthly_statement             |    4159
 ---snip---
materialize=> select count(*) from billing_monthly_statement;
 count
-------
   396
(1 row)

this indicates that compaction has not yet occurred.

  1. Trigger compaction by sending a single message via the billing demo, after which we see a much smaller number of records (sending another message is optional i think the compaction would have eventually happened)
materialize=> select * from mz_records_per_dataflow_global order by records;
  id  |                                name                                | records
------+--------------------------------------------------------------------+---------
 --snip--
 1674 | Dataflow: materialize.public.billing_monthly_statement             |     499
  1. Create a new sink after the compaction has occurred
CREATE SINK billing_sink2 FROM `billing_monthly_statement` INTO KAFKA BROKER 'localhost:9092' TOPIC 'billing_monthly_statements' FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
  1. Verify that the sinks have different amounts of data in them

a. First grab the sink's actual topic names

materialize=> SELECT * from mz_catalog_names NATURAL JOIN mz_kafka_sinks;
 global_id |               name               |                             topic
-----------+----------------------------------+---------------------------------------------------------------
 u21       | materialize.public.billing_sink  | billing_monthly_statements-u21-1588086043-8610528124089188498
 u24       | materialize.public.billing_sink2 | billing_monthly_statements-u24-1588086043-8610528124089188498
(2 rows)

and then use kafkacat to see how many messages each have

billing_sink

altaria-2:billing Test$ kafkacat -C -b localhost:9092 -o end -t billing_monthly_statements-u21-1588086043-8610528124089188498
% Reached end of topic billing_monthly_statements-u21-1588086043-8610528124089188498 [0] at offset 4164

billing_sink2

altaria-2:billing Test$ kafkacat -C -b localhost:9092 -o end -t billing_monthly_statements-u24-1588086043-8610528124089188498
% Reached end of topic billing_monthly_statements-u24-1588086043-8610528124089188498 [0] at offset 450
  1. Reingest the data into Materialize. this was a bit tricky because I forgot we have to specify envelope debezium even for schemas managed by the registry
CREATE MATERIALIZED SOURCE reingest1 FROM KAFKA BROKER 'localhost:9092' TOPIC 'billing_monthly_statements-u21-1588086043-8610528124089188498' FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://localhost:8081' ENVELOPE DEBEZIUM;

and a similar thing for the other sink (I named that source reingest2)

  1. Now we can make a view that queries those for any inconsistencies
CREATE VIEW check AS
SELECT
reingest1.execution_time_ms AS t1,
reingest2.execution_time_ms AS t2
FROM
reingest1,reingest2

WHERE
reingest1.month" = reingest2.month AND
reingest1.client_id = reingest2.client_id AND
reingest1.cpu_num = reingest2.cpu_num AND
reingest1.memory_gb = reingest2.memory_gb;

but I didn't find anything when I queried for inconsistencies

materialize=> SELECT * from check where t1 != t2;
 t1 | t2
----+----
(0 rows)
frankmcsherry commented 4 years ago

I think this is largely addressed by #3005. It adds protective measures to ensure (modulo bugs) that we don't report invalid output updates, but it leaves open certain patterns where we do not pre-empt the protective measures and determine the right as_of frontiers ahead of time.

Still open aspects: 2. how should we tell sinks/tails about the chose since frontier? 3. should we attempt to learn about since frontiers for inputs?