MaterializeInc / materialize

The data warehouse for operational workloads.
https://materialize.com
Other
5.67k stars 458 forks source link

Data plane consistency #34

Closed jamii closed 2 years ago

jamii commented 5 years ago

Problem

https://github.com/MaterializeInc/materialize/issues/33 deals with control plan consistency. Regardless of which solution we adopt, we still have a related issue on the data plane - we don't have a notion of data plane time and so we don't have any kind of consistency for PEEK. Consequences:


Solution A

@frankmcsherry suggested rather than allowing workers to read freely from Kafka, add an Advance command which contains a Kafka timestamp and allows workers to read up to the timestamp. This ties data plane times to control plane times, so we can now define PEEK consistency by saying that it must return results that are correct as of the last Advance before the PEEK.

Initial questions:

jamii commented 5 years ago

Kafka timestamp = Vec<(Topic, Partition, Seq)>.

It looks like the number of partitions can increase, and partitions can move around, but data can't get rebalanced between partitions.

benesch commented 5 years ago

multiple timely workers might resolve PEEK at different timestamps? (I'm not sure of this, but I think @benesch mentioned it)

Yep, this is right. I made some tiny progress towards addressing this today: the gateway node now picks a timestamp for PEEK, which gets forwarded through to the dataflow command, and so the dataflow command will wait until the probe is up-to-date before returning results. There's still a problem here, which is that the computation might be too up-to-date; we need to filter out sufficiently new datums when we scan over the arrangement.

in a replicated setting, PEEK might go back in time if executed against different servers

I think this can be fixed as part of #33, by having the global sequencer decide what the PEEK timestamp should be, rather than the gateway node.

benesch commented 5 years ago

Kafka timestamp = Vec<(Topic, Partition, Seq)>

There's also the u64 timestamp, intended to represent ms since the Unix epoch, that is attached to each message in newer versions of Kafka: https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message

Unfortunately Kafka makes zero guarantees about the value of this timestamp. (I.e., it can go backwards.)

frankmcsherry commented 5 years ago

@frankmcsherry suggested rather than allowing workers to read freely from Kafka, add an Advance command which contains a Kafka timestamp and allows workers to read up to the timestamp.

Another option is to use the Kakfa timestamps, which can be supplied by the producing application rather than by the Kafka system, and insist that PEEK commands have an associated timestamp, or range of timestamps.

For example, one way to implement a per-key PEEK is to take the relation you wish to query and join it against a "peek stream" which contains keys that you would like to see and (importantly) timestamps attached to them. You might have a peek stream that looks like:

(key1, time1,     +1)
(key1, time1 + 1, -1)

which would have the effect of reporting the values associated with key1 at the interval [time1, time1 + 1). The subtraction removes the posted "interest" in the key so that changes at future times are not surfaced.

This is more "computation" than just barging over and sniffing around in the trace, but it uses the existing machinery to ensure that we get correct and consistent answers.

benesch commented 5 years ago

Another option is to use the Kakfa timestamps, which can be supplied by the producing application rather than by the Kafka system, and insist that PEEK commands have an associated timestamp, or range of timestamps.

I went down this road without much success. Since Kafka makes no guarantees about timestamp order, it's quite likely that you'll see timestamps out of order. Even with a single partition topic, if you've got multiple producers, you're essentially guaranteed to have out-of-order timestamps.

So how do you figure out when to allow Timely to advance time? I couldn't manage anything workable. The closest I got was configuring a maximum allowable out-of-orderness—let's call it max skew. With a max skew of 1s, say, you know that once you've seen a record at 1:00:05, you're free to advance time to 1:00:04. This works ok while messages are continually produced, but falls over as soon as the stream is complete, because you can never get access to the last second worth of messages. (This is acutely felt in tests, where you're inserting some known set of data and then attempting to perform a computation with a known result over the entire stream; it's probably less problematic in a real-world setting, where most of your streams will be continually producing data and you don't care if they're missing the last second worth of records.) So you need to send along a dummy message at 1:00:06 to allow 1:00:05 to be closed out. Once you do that, you've introduced the notion of metadata messages [1], so you might as well go whole hog and demand that Kafka streams adhere to some timestamp-advancing protocol, where the producer periodically publishes a "timestamp closed" message, at which point the Timely computation is advanced.

Is there a clever solution that I've overlooked?

[1] At some point we were discussing using a separate Kafka topic to send timestamp metadata information. I think the problem described above still applies, in that asking producers to periodically send timestamp messages over a different Kafka topic is a pretty painful deviation from standard Kafka timestamp practice (i.e., "LOL HERE'S A TIMESTAMP YOLO").

For example, one way to implement a per-key PEEK is to take the relation you wish to query and join it against a "peek stream" which contains keys that you would like to see and (importantly) timestamps attached to them.

This is interesting! But it also seems orthogonal to the question of how to assign to incoming external data a reasonable timestamp, which is the problem I'm struggling with the most right now.

frankmcsherry commented 5 years ago

Is there a clever solution that I've overlooked?

I think this is a thing we'll need to work through in any case. Timely and differential will by default stall if one never advances the lower bounds on input timestamps, where the only other remedies are i. multi-temporal timestamps (e.g. event x system time) or ii. start dropping data the way Kafka does with its "max out of orderedness".

One resolution that felt sane was to introduce watermarks into the streams (this is how timely does its own streams). The partitioning is a bit weird here, but one candidate solution was to have a "watermarks" stream which contains (topic, partition, timestamp, offset) to indicate that in some partition, by offset the lower bound timestamp holds for all future timestamps.

I guess there are perhaps two almost-orthogonal problems:

  1. What do you do if the input has no watermarks? Invent them, or deal with unboundedly out of order data?
  2. How does one communicate watermarks when you have them (e.g. from CDC out of a database)? Can we take something like a CDC mechanism, and put the data into Kafka without losing the information about completeness of sequence numbers / timestamps?

About to give a PWL talk, but we should talk more about this!

benesch commented 5 years ago

I guess there are perhaps two almost-orthogonal problems...

Yes, sounds right to me!

  1. What do you do if the input has no watermarks? Invent them, or deal with unboundedly out of order data?

Yeah, right now we're stamping any data from Kafka with its arrival time at the receiving node. This works medium well. Where it gets icky is when you've put some data into Kafka and then peek a derived dataflow in materialized immediately, expecting to see the results of what you just put in Kafka. More likely than not, the computation won't have updated yet.

Is there some way of dealing with unboundedly out of order data?

  1. How does one communicate watermarks when you have them (e.g. from CDC out of a database)? Can we take something like a CDC mechanism, and put the data into Kafka without losing the information about completeness of sequence numbers / timestamps?

What I'm working on right now is teaching our Kafka source to handle the CockroachDB CDC timestamp watermarks (https://www.cockroachlabs.com/docs/stable/change-data-capture.html#what-is-change-data-capture), which is pretty perfectly suited to periodically advancing Timely computations. It's sort-of akin to (topic, partition, timestamp, offset), except we can entirely offload the computation of watermarks to Cockroach.

benesch commented 5 years ago

Here's a rough proposal. The basic idea is to provide strict consistency guarantees when joining streams from the same data source, while degrading to eventual consistency when joining streams across data sources. So if you join two streams of data from the same CockroachDB cluster, we should give you the correct answer, full stop. If you join streams from a MySQL server and a CockroachDB cluster, we should give you an answer that's approximately correct, where the deviation is the result of the inevitable clock skew between the MySQL server and the CockroachDB cluster.

I can imagine that a certain class of user might want to more precisely manipulate timestamps, in order to join the MySQL stream at time TMySQL with the CockroachDB stream at time TCRDB, because the user somehow knows that TMySQL corresponds to the same instant in actual time as TCRDB. (@rjnn, this would be a great product question.) My understanding is that this requires multi-temporal queries, the implementation of which is awfully fuzzy in my brain. (More specifically, multi-temporal queries seem straightforward to express in a manually-written differential dataflow, but extremely difficult to express in a Materialize SQL query; plus, most of the infrastructure assumes that every dataflow uses the same timestamp type.)

So if we want an approximate temporal link between timestamps across disparate data sources, I think that means we need to use wall clock time, and hope that users run NTP to have reasonably-synchronized clocks. Since we've loosened the bound on correctness, I think that's acceptable; correctness will be skewed in direct proportion to the clock skew.

We can declare our global timestamp type to be a u128, where the high 96 bits are the number of nanoseconds since the Unix epoch, and the low 32 bits are a potential logical component provided by one of our data sources (i.e., CockroachDB). CockroachDB timestamps map directly to this type, as does the standard representation of time in most languages, i.e., the number of milliseconds or nanoseconds since the Unix epoch.

Then, when we have a well-behaved data source like CockroachDB, which can promise monotonic timestamps, we use the provided timestamps directly. When we don't, we stamp incoming records with their arrival time, using our own clock. Implementing this clock is somewhat tricky, as we want it to be monotonic yet be closely tied to wall clock time. We can use https://github.com/rubrikinc/kronos for inspiration, but the tl;dr is that this is possible if you tolerate some flatlining in the presence of backwards time jumps.

Finally, there's the problem of PEEK: which timestamp do you get when you ask to read some data? I think we want three variants:

Thoughts? This was a speed-of-thought brain dump, so I'll likely have some revisions.

frankmcsherry commented 5 years ago

I wanted to pop back in here and scratch down some other candidate proposals. These are more "operational" proposals, whereas I think @benesch had slightly more semantics-ful proposals up above. We aren't going to try and define consistency here, but rather just correctness and possibly the knobs a user might be asked to use to modulate the inevitable delay in receiving the correct answer.

Some quick background: differential dataflow expects to see triples (data, time, diff) as input, where time is just payload and can be set arbitrarily. We can also change time arbitrarily if we want, or filter streams based on time. For a supplied stream of update triples, differential dataflow computes a well-defined "correct answer", but it will not start this process for time until it receives a signal that we will not see further time inputs (we can work around this, but let's take it as a rule for the moment).

We do have the ability to modify time and having done so introduce statements about time no longer being available, if we want to be in that business. We might want to be, to work around defects in our input sources of data.

I figured I would lay out a few progressively more sophisticated approaches. Summarized, they are:

  1. Correct received timestamp data to be in-order. Clear-ish semantics, but not what was asked for.
  2. Maintain an "out of order" window, and only correct timestamps more out of order than this window. More likely to be what was asked for, but with greater built-in latencies.
  3. Take a second stream of "timestamp progress" events, correlated to offsets in the first stream. This allows more fine control, but requires someone to produce such a stream (perhaps using rules above, but with the potential for more responsive control).
  4. Take a second stream of additional timestamps, strictly increasing, correlated to offsets in the first stream. The second time dimensions allow differential dataflow to make forward progress, but still be clear about its output (e.g. that it has processed through input time t1 as of system time t2).

None of these proposals invent policy about how one should treat their confusing and ill-specified times in their clusters with different clocks, or anything like this. These are just a few flavors of mechanism that we could present as menu items that would likely be good building blocks for other people to decide how they should treat their inputs.

@benesch observes that (1.) is a lot like "eventual consistency", in that at the end of a topic, or during any sufficiently long lull, we should have the same answer no matter what we did with the timestamps (at least, as long as we fast-forward rather than drop records).

Take timestamped usize input records and if out-of-order records are observed fast-forward the "late" records to the local high-watermark.

This approach works great on in-order data, and may have the property that it provides a deterministic experience on a fixed input stream. The caveat here is that if we need to fast-forward the times, we'll want to make sure that e.g. we do this per-topic-partition, rather than however we might have bundled them in timely workers. If Kafka shows us from which part a message was received (crosses-fingers) then each worker can maintain for each part the highest timestamp it has seen, and fast forward any times to be at least this time.

Time advances as new timestamped records show up in the input stream, which advances the high watermarks. If new records do not show up we would stall (and might need to prompt progress with bogo records).

Take timestamped usize input records and maintain a fixed out-of-orderedness window, say of width. Again we track the high-watermark, but we only fast-forward records to the watermark minus width.

Kafka has a parameter that lets you specify an out-of-orderness, and we could read this out (or rely on the user to specify the same value). Kafka will just drop records that are more than the out-of-order bounds, and we can rely on this / ensure that this happens at ingestion by dropping records ourselves.

Time now advances with a built-in delay of width, which likely introduces noticeable delay. If it is set to 10 seconds, then our output is always ten seconds behind what it could have been if we knew that the input were current, even if the input are current. As before, time advances only as new records are introduced.

This approach is meant to be a generalization of the first approach, which is essentially this with a zero width.

Take timestamped usize input records, and a second stream of progress data that indicate changes in a low watermark corresponding to sequence numbers (or Kafka offsets) in each input stream.

In this framing, each input operator now has two external sources of information, a raw native stream (perhaps Kafka, perhaps some other queue) which does not require timestamp information, and a second stream designed to correct that deficiency by relating moments in that stream with advances in timestamp lower bounds.

This allows us to generalize the approaches above, by having an external agent commit to the timestamp advances (by shoving them through Kafka) rather than by baking in constants / strategies into the operators. Perhaps the external agent uses the same strategy, but it has a degree of freedom now that better interfaces could exploit. When we have no information, perhaps we tick the lower bounds forward as the high-watermark minus width, and when we do have information we can more aggressively advance time.

Take timestamped usize input records, and a second stream that assigns to each record a second usize timestamp, where the second timestamp strictly increases. The timestamp used for each record is now vec![time1, time2]. Optionally, the second stream can indicate progress for the first stream.

This is the first "multitemporal" approach, and if we just look at the first coordinate it looks a lot like the prior approaches that let the user specify the time. The second coordinate exists so that we have a dimension that we can strictly advance, and thereby prompt computation to occur. This is a bit weird, so I'm happy to talk it through more, but it is something that allows us to start working on incomplete input times while still being totally clear about the interpretation of the output (roughly "this is input time time1 as of system time time2").

One potential advantage here is that we may not need deterministic second time dimensions; we would want these for 100% reproducibility, but if we could relax our requirement to "accumulations for time1 are durable, even if the associated time2 are not". What we are revealing are partial "in progress" accumulations of the input data, which are helpful but not yet committed. Arguably, either are useful / appropriate.

frankmcsherry commented 5 years ago

Another even-farther-out-there proposal, for inputs that have deeply incomparable timestamps (e.g. Kafka and Cockroach, whose usize and u128 times aren't meant to be comparable).

Multitemporal timestamps allow us to use a type e.g. Vec<Timestamp> as our timestamp type, where Timestamp is some enum over known flavors (e.g. either usize or u128). At each input operator, we capture an appropriate timestamp and place it in a consistent position (e.g. Kafka first, Cockroach second), and in the other positions insert the low-watermarks for each of the other inputs.

The result here is that we get outputs that say things like:

(data1, (k_time1, c_time1), diff1)
(data2, (k_time2, c_time2), diff2)
(data3, (k_time3, c_time3), diff3)
(data4, (k_time4, c_time4), diff4)

which we should read as changes at times (k_time, c_time) where there is informally (not actually) a linear sequence of pairs that describe how the low watermarks of the two inputs evolved.

There is a race in that Kafka low-watermark progress isn't coupled to Cockroach low-watermark progress, and so the results are not durable in that sense. We could see identical accumulations at any (k_peek, c_peek) no matter the interleaving if we had used times 0 for other systems, but this would prevent computation from getting done (the Kafka input maintains the ability to produce data at Cockroach time zero). However, this is now a knob we can play with, perhaps advancing the maintained capabilities with a materialized instruction that unblocks computation once we are sure that we've evaded any consistency concerns (e.g. no pending peeks).

cuongdo commented 4 years ago

For the demo, the blocking part of this issue is the current need to force a write to advance time. Without that, the view doesn't incorporate all results. This would look janky in a customer demo.

jamii commented 4 years ago

Notes from the meeting a few weeks back:

image

frankmcsherry commented 4 years ago

For the demo, the blocking part of this issue is the current need to force a write to advance time. Without that, the view doesn't incorporate all results. This would look janky in a customer demo.

To expand on this, when records show up from Kafka they come with timestamps. We want to respect these timestamps, because in principle they communicate important semantic meaning, like external ordering constraints or consistency properties. For example, two updates as part of a transaction may come out with the same timestamp, and we should present results that always have both updates or neither.

The caveat here is that without information from Debezium, when we see a record with timestamp time, we are certain it should have that timestamp but we are not certain that there is not another record due in which will also have that same timestamp. We should not process the record until we are certain there will be no further records, or we risk violating consistency.

The only potential sources of information we have are the Kafka stream itself, which we are transforming to be not-out-of-order, or a Debezium heartbeat that could tell us that the backing database has indeed advanced beyond this time. When more records arrive we are able to advance the timestamp to whatever time they present, which we do. We could add Debezium heartbeats (which apparently require tickling the database, as it only gets upcalled when events actually happen in the database), but until this happens the "correct answer" is to do nothing.

Commit logs should have punctuation in them, to clearly indicate when a transaction / timestamp has completed.

cuongdo commented 4 years ago

This is marked as a 0.4 feature, but I'm not clear on what action is desired here. @frankmcsherry do you have thoughts regarding what might make sense for 0.4?

frankmcsherry commented 4 years ago

I think that assignment might be vestigial from an earlier time. I think this is in the "production ready" blockers pile, but it might also be a good idea to file some more specific issues instead of this one (which is maybe over-broad). We do have a fair pile of TODOs on data-plane consistency, which I think mostly fall under "fleshing out BYO consistency" (from my pov).

benesch commented 2 years ago

Calling this one superseded by the recently defined correctness properties: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/platform/ux.md#correctness. There are various epics that track the implementation of those properties (e.g., #11631).