estuary / connectors

Connectors for capturing data from external data sources
Other
47 stars 13 forks source link

materialization: materialize CDC format #49

Closed jgraettinger closed 3 weeks ago

jgraettinger commented 2 years ago

Background

Materialize is an interesting materialization target we'd like to explore.

It has an idealized change-data-capture format that has it's baking in the way timely dataflow works. Here's a longer blog post with more background. tl;dr is that Materialize wants to consume updates like:

update (record0, time0, +2)
update (record1, time0, +1)
update (record2, time0, +1)
progress (time0, 3)
update (record1, time1, -1)
update (record2, time1, +1)
progress (time1, 2)
update (record0, time2, -1)
update (record2, time2, -1)
progress (time2, 2)
progress (time3, 0)

Materialize innately wants consistent differential delta updates of complete rows. record0, record1, etc are restatements of a record as it existed at a point in time, and the +1 / -1 are updates of that record in a multi-set -- which is differential dataflow's fundamental abstraction. Given a consistent and correct CDC stream of this kind, Materialize is able to avoid storing the source table in memory and operate only on the change stream, which I understand to be the desired ideal.

However CDC solutions don't provide that for various reasons. Either because of weak consistency semantics, or because it's not available in the logical log. For example:

A Flow + Materialize solution therefor looks interesting for a few reasons:

Assumptions

1) We don't want to pass-through a first-class notion of Flow's internal CDC log time, because it's not helpful to tell Materialize about timepoints in the distant pass. In other words, time0, time1, etc should always be ~now. This is because Materialize (and timely dataflow as a whole) have a global understanding of the marching progress of time. You can't usefully tell it timepoints of a months-old capture that's just been stood up as a new source. This may be wrong - I recall that hyper-times are possible in Timely Dataflow but that may not be in Materialize just yet.

2) Materialize sources are all pull-oriented in nature. A successful integration would require a push API where Flow tells Materialize of update and progress messages, and receives ACKs back in reply. An ACK means that Materialize has retained all CDC records through the ACK and is able to re-play them on its startup, as happens internally with current native sources. I assume this is feasible. The API could be either streaming -- which would require explicit ACKs -- or synchronous, such as an HTTP PUT with ACK being conveyed by OK response.

3) Breaking Change / Contentious if Materialize sees a progress at time1, then updates at time2, and then another progress at time1 -- it must treat the second progress at time1 as a roll-back and discard the intermediary time2 updates.

If 1) above is correct, then this is going to be required by any exactly-once integration with a streaming system, because that system a) must align its own transaction boundaries with Materialize "progress" updates for correctness, but also b) dynamically sizes its transaction boundaries based on stream behavior (like data stalls).

If 1) is incorrect, than the stream itself could encode repeatable times sent as update / progress. However, this still precludes an important optimization: being able to roll-up / compact a larger set of differential updates into a smaller set that's actually sent to Materialize.

To illustrate the specific concern, suppose that Materialize instead decided to discard all but the last-received update of a record at timeT. This is reasonable at first blush, because if transactions are consistent on their selection of timeT, than a failed transaction would presumably be followed by a re-statement of the record at timeT. However this fails to account for the second, post-recovery transaction being smaller than the transaction that failed. In other words, a transaction could read through offset N of a stream and then fail after sending updates at timeT. A next transaction could read through offset M (< N) and also send updates at timeT -- but would fail to account for records between M:N and these records would effectively be sent twice.

Desired Outcome

We can PoC a materialization connector and end-to-end example which drives a proposed / currently-non-existent Materialize push API. It would:

Connector implementation sketch

The connector supports only delta-updates, and assumes (& eventually verifies) that source collections are ~schematized as:

{"record":{"the":"record"},"delta":-2}

State

The connector uses driver checkpoints managed in recovery logs, and doesn't rely on transactions within materialize (not supported).

The driver checkpoint is:

{
  progress: {timestamp, num-records},
  next: timestamp,
}

progress is an intention of a progress update which will be written once a current transaction has been committed to the materialization's recovery log. On startup, the first thing a connector does is to tell Materialize of "progress" in its recovered checkpoint. This ensures we cannot fail to deliver "progress" if a txn committed, and is the same technique used for ACK intents within Gazette's exactly-once semantics.

next is the timestamp to use in the next transaction. The rationale for committing it into the driver state, rather than a transaction independently picking the current time, is that "update" messages of a failed transaction are guaranteed to be repeated by a successor transaction. If assumption 3) doesn't pan out then Materialize would at least discard the earlier "updates" which are explicitly repeated (TODO this is unsatisfying and still needs design work).

Transaction lifecycle:

snowzach commented 2 years ago

After looking at this and reviewing the Materialize CDC format, I come out with the following question... Is the Materialize Time value really important as an actual time value or is it more important that it's just a monotonically increasing value. It seems to me that if we could come up with a function of our checkpoint value + key ranges that would produce an increasing value, we could use that in place of the Materialize time value. The only issues that I can think of is that multiple key ranges working on the same checkpoint might force a re-ordering in Materialize if they come in out of order.

Example, if you have 4 shards working on the same materialization, each shard would get time 1,2,3 and 4. It could provide the progress update for it's relevant time (1-4) If shard 3 finished before shard 2 though, when shard 2 pushed it's update it would make Materialize re-order those updates (even though in theory that's not necessary) I think that's part of the Materialize magic though that it can apply those efficiently.

So questions to be answered I think are

  1. Can the time value in Materialize only used for ordering (and not actually relevant to a time)? (And doesn't matter what the time is really)
  2. Can we produce that number across shards such that each checkpoint from the source corresponds to an increasing values across shards in the materializations?
  3. Is there any sort of significant performance penalty if the time values, despite always increasing come in out of order.
jgraettinger commented 2 years ago

Okay, I spent a bunch of time spelunking through the codebase and engineering design docs of materialize, and learned a lot. Top of mind:

A sketch that's taking shape in my head:

Okay, now the harder parts:

(I think we can get started with just the above, writing raw data directly into a Gazette journal it creates on demand as part of its Apply RPC. However, to do this 100% correctly, we need a bit more):

jgraettinger commented 2 years ago

Next steps

There's some lingering "can this even work?" risk in the plan above that we can cheaply verify:

If we can demonstrate a capability to feed materialize in this way, I have confidence in the rest of the design sketch.

snowzach commented 2 years ago

@jgraettinger I currently have writing a static file in Avro OCF format in the Debezium envelope and having it be read by Materialize. It is working but not totally as I would expect. I need to research that more and see if it's a problem with my format or something else I am doing. (Essentially writing a bunch of values, create a materialized view with the average of those values and I'm getting the last value instead of the average value)

snowzach commented 2 years ago

Unfortunately I don't think a pipe is going to work for this case. As part of opening the OCF file/pipe it appears to open it and then immediately closes it thus ending with a broken pipe error. Perhaps it's just reading the schema before re-opening the file. Not sure but at any rate I tried a combination of things to get it to continue to read but it doesn't seem to work. I've tried immediately re-opening the pipe but it's disconnected at that point and not pulling anything else. I've also tried the Materialize TAIL option as well as not including it but it doesn't seem to help.

The only other thing I could think to try is emulating a Kafka broker/server to feed it with data. This is probably not a trivial solution but could be useful for other destinations such as Clickhouse which I know also prefers to read data from Kafka.

jgraettinger commented 2 years ago

Yea, it opens the file to read the Avro OCF header, and will close it again on drop, before opening it for real to read data.

It may still be possible by getting this working by getting a little lower level. Specifically, the writer to the pipe could handle the returned SIGPIPE error by closing its descriptor and immediately open it again for writing. Essentially it restarts itself as soon as the reader goes away. Not 💯 this can work but I don't see why not.

snowzach commented 2 years ago

This actually appears to be a pretty decent starting point for speaking the Kafka wire protocol. https://github.com/travisjeffery/jocko I've already got it to the point where's trying to communicate and decoding the first message. I just need to follow the code and feed it everything it needs at this point.

melody413 commented 8 months ago

Hi I am now migrating the DynamoDB to Postgres by using estuary. But Postgres connector is not well. How to set the Postgres address(host and port)? Postgres server is running on my local PC.

Please help me asap. Thanks

dyaffe commented 8 months ago

Hi @melody413,

We're happy to help you here! We're a cloud service so we generally connect to endpoints that can be accessed via the internet. If you have a database that's on localhost, the best bet is to set up a SSH tunnel as defined here in our docs.

Let me know if you have any questions!

-Dave

williamhbaker commented 3 weeks ago

I'm going to close this out since I think the original intent has been accomplished through the implementation of the Kafka read gateway for Flow, and additional work on that is tracked separately.