MaterializeInc / materialize

The data warehouse for operational workloads.
https://materialize.com
Other
5.67k stars 458 forks source link

[Epic] Table support for push sources #22836

Open chaas opened 8 months ago

chaas commented 8 months ago

Product outcome

Materialize supports push sources via full integrations/support for a handful of CDC partners. As such, Materialize supports the way these CDC tools write, which is pushing their changelog into tables in Materialize.

Discovery

Working group Slack: #wg-push-sources

Notion wg landing page: https://www.notion.so/materialize/Push-sources-1a8b7370961040018f2bcfa619d541e9#4fe16e20fafe4cf9904b4d90240108fd The landing page includes summaries of each of the calls with potential CDC partners and discovery into which features we're missing.

We've distilled out the required table features across the majority of these CDC tools.

Work items

### Minimum viable features
- [ ] Address global write lock: to do large updates we need to lock at least the destination table which can get arbitrarily large and can thus take an arbitrarily large time to scan through and figure out which rows we need to update.
- [ ] Support basic ALTER TABLE in order to support upstream schema changes
- [ ] Decide whether to support (`INSERT ... ON CONFLICT` and primary keys) or `MERGE`
- [ ] (Likely) Support [`MERGE`](https://www.postgresql.org/docs/current/sql-merge.html)
- [ ] (Unlikely) Support INSERT ... ON CONFLICT to make it possible to upsert data into a table (See https://github.com/MaterializeInc/materialize/issues/6668 for existing related tracking issue)
### Scalability of tables for push sources
- [ ] https://github.com/MaterializeInc/materialize/issues/25945 - Streaming join: when joining table(A) to table(B) we currently load both tables into memory, this also imposes an upper limit on the size of table we can support updates for
- [ ] https://github.com/MaterializeInc/materialize/issues/23942 - If persist were to keep data sorted by some key, then we would be able to more efficiently load just the portion of the table we care about from Persist, as opposed to the entire thing

Decision log

ParkMyCar commented 7 months ago

Brain Dump - ALTER TABLE ... ADD COLUMN ...

Thank you to @benesch and @danhhz who chatted with me a bunch about this. We're not at a point yet where we can make a design doc, still need to flush out a few ideas, hence the brain dump.

There are three main areas where we'd need to make changes:

  1. Adapter - persistence of table definitions
  2. Storage - schema of persist shards
  3. Compute - planning of dataflows on restart and persist shards showing up with new columns

Adapter. The tricky part about adding columns to tables, is we do not desugar, or fully resolve, column names in view definitions. For example, CREATE VIEW v1 AS SELECT * FROM t1 gets persisted as exactly that, with ... SELECT * .... If a column is added to t1 that view definition would now include the new column, which isn't what we want.

"De-sugaring SELECT *" is something we've looked into but is a multi-month effort that we don't want to block this project on. The current thinking is we'd update the table to include the new column, and then maintain some "artifact" that describes the previous version of the table. Two approaches to this "artifact" are:

  1. Version all tables. When creating a view that references a table, the view definition would internally get resolved to include the table version, e.g. foo_v0. When adding a column to the table, we’d bump the version, e.g. foo_v1, and retain the create_sql for the original foo_v0. Note, maybe it wouldn’t exactly be foo_vX but something along those lines. a. After talking with Dan, an issue with this approach is plumbing through to persist the concept of “old relation description” and “current relation description”. The persist shard for the table would contain the new column, but something has to understand that in this case we want to drop those old columns.

  2. When adding a column to a table, update the original table, but create an “internal view” that existing dependents would use. We’d either update all dependents at the time of the ALTER TABLE to use this new view, or lazily when selected from, we’d swap any references to the table with SELECT <original columns> FROM <table>.

Storage. There currently is not a way to do schema migrations of persist shards. The ideal approach would be when adding a column with a NULL or default value we record that somewhere, and rows without the column would lazily get that value filled in. Similarly for dropping a column, older rows with the column would need to know they should omit it. According to Dan this could probably get prototyped in a couple of weeks, but would take on the order of 1 - 2 quarters to productionize it, since persist largely is not schema aware.

The alternative would be to create a new shard with the new column, then copy all of the data from the old shard to the new shard, with the default value. It’s slow, but it works and can (probably) get done quickly and entirely from the Adapter.

Compute. Ideally when adding a column it doesn’t effect existing dataflows. They don’t use the new data so they should just be able to ignore it. But if we take the approach from Storage of creating a new shard and copying data over, any dependent dataflows will definitely need to be restarted and re-hydrated. This is scary(👻) because if we hook this up to some external provider, unknown to a user the provider could alter a table midday, which would cause a user’s dataflows to get destroyed then rehydrated, which might be viewed as "downtime".

I will do some research into how often columns get added, I would think it’s not too much.

tl;dr I think an MVP approach we could take to support ALTER TABLE ... ADD COLUMN ... is:

  1. In Adapter get a lock (ideally table level lock)
  2. Create a new persist shard with the new column, copy all data from the old shard into the new
  3. Create a new view <table_name>_vX whose definition is SELECT <original columns> FROM <table_name> (this is approach 2 from above). Anything that currently depends on <table_name> will get re-planed to depend on the view <table_name>_vX
  4. Have Adapter destroy and re-create any dependent compute objects
benesch commented 7 months ago

Thanks for writing this up, @ParkMyCar! Super excited to get this work started.

"De-sugaring SELECT *" is something we've looked into but is a multi-month effort that we don't want to block this project on. The current thinking is we'd update the table to include the new column, and then maintain some "artifact" that describes the previous version of the table. Two approaches to this "artifact" are:

  1. Version all tables. When creating a view that references a table, the view definition would internally get resolved to include the table version, e.g. foo_v0. When adding a column to the table, we’d bump the version, e.g. foo_v1, and retain the create_sql for the original foo_v0. Note, maybe it wouldn’t exactly be foo_vX but something along those lines. a. After talking with Dan, an issue with this approach is plumbing through to persist the concept of “old relation description” and “current relation description”. The persist shard for the table would contain the new column, but something has to understand that in this case we want to drop those old columns.
  2. When adding a column to a table, update the original table, but create an “internal view” that existing dependents would use. We’d either update all dependents at the time of the ALTER TABLE to use this new view, or lazily when selected from, we’d swap any references to the table with SELECT <original columns> FROM <table>.

I think both of these approaches reduce to the same thing: you need some way of slapping a projection operator on the current version of the shard to transform it into each of the old versions. Fundamentally, I think each version is just the specification of that projection.

We have some flexibility in how we represent that in the catalog. Option 1, as described, is to store every CREATE TABLE statement, and conjure up the projection to go from v_new to v_old on the fly by diffing the RelationDescs. Option 2, as described, is to store the projection as a SQL view.

I think option 2 might be more of headache than it's worth—at least while the allowed transformations from version to version are just column additions. It's possible you could instead store:

struct SerializedTable {
    /// E.g., `CREATE TABLE t ...`
    pub current_version_create_sql: String; 
    /// For each version, the projection that transforms the current version's relation into the old version's relation.
    pub old_version_projections: BTreeMap<Version, Vec<i64>>;
}
ParkMyCar commented 7 months ago

Instead of storing these projections separately, I was thinking of updating the create_sql for the dependent views directly. For example, pretend we have a table t1 with columns a, b, and we're adding a new column c:

CREATE VIEW v1 AS SELECT * FROM t1

would get transformed to

CREATE VIEW v1 AS SELECT * FROM (SELECT a, b FROM t1) t1

I've been experimenting and trying to think through some edge cases, and this should work. One wrinkle @petrosagg pointed out is we can't do this transformation if there is a Sink directly depending on the table, since Sinks don't support dataflow fragments. He mentioned it should be fixable in the long run, but for now returning an error if a user tries to do this seems reasonable, open to thoughts here though!

I chatted with some more folks this week and I think and approximate shape for an MVP flow of ALTER TABLE ... ADD COLUMN ... could be:

  1. Take a lock in the Coordinator on the table to prevent inserts and other modifications
  2. Spawn a tokio::task to do all of the following work...
  3. Create a new persist shard with a schema for the now updated table.
  4. Flush all existing writes to the table
  5. Copy all of the data from the original persist shard, to the new shard, with a NULL or default value for the new column
  6. Run a Catalog transaction to update the create_sql for the table, the Global ID for the table and thus the backing persist shard, and update the create_sql for any dependent views, as described above. At this point the column is added, and even if we crash everything would be okay because we'd re-plan with the new shard. We also release the write lock.
  7. Re-plan all transitively dependent dataflows, which now depend on the new shard.
  8. Wait for the dataflows to become hydrated.
  9. Swap the new dataflows with the old ones, and tear down the old ones.
  10. Return to the caller of ALTER TABLE ...

There's a bit of hand wavy-ness around the implementation of this process within the Adapter, but I think we'd model after Peeks, and its separate stages. I'm also being quite hand wavy with starting and swapping dataflows, I need to see if that's actually possible, if not we'd need to probably tear down the existing and re-plan, which would introduce "downtime".

Some open questions that I still need to think through are:

  1. Timestamps and read holds. How should the sinces and uppers of the new/second persist shard advance w.r.t. the original/first persist shard? Does there need to be any relation?
  2. Do we need to do any work to prevent duplicated events when creating a new/second dataflows on top of the new persist shard?
benesch commented 7 months ago

I was thinking of updating the create_sql for the dependent views directly.

Haven't thought about this for too long, but gut take is that this seems more fiddly/error prone than storing the BTreeMap<Version, Vec<i64>> alongside the table. What are the benefits you see, @ParkMyCar, to mutating the create_sql for downstream views?

ParkMyCar commented 7 months ago

you need some way of slapping a projection operator on the current version of the shard to transform it into each of the old versions.

I think this part is easier to do by mutating the downstream views. Otherwise we'd need to add the concept of versioning for tables, and then every time while planning(?) do this projection, which seems harder? Although I could be totally wrong! I can prototype both approaches and go from there?

benesch commented 7 months ago

I just think the rewrite is likely to be a quagmire, for many of the same reasons that wildcard expansion and our current approach to renaming is such a quagmire. Here's an example that I think demonstrates the complexity ahead of that rewrite approach:

benesch=# SELECT t1.a FROM t1;
 a 
---
(0 rows)

benesch=# SELECT benesch.public.t1.a FROM t1;
 a 
---
(0 rows)

benesch=# SELECT t1.a FROM (SELECT a FROM t1) t1;
 a 
---
(0 rows)

benesch=# SELECT benesch.public.t1.a FROM (SELECT a FROM t1) t1;
ERROR:  invalid reference to FROM-clause entry for table "t1"
LINE 1: SELECT benesch.public.t1.a FROM (SELECT a FROM t1) t1;
               ^
HINT:  There is an entry for table "t1", but it cannot be referenced from this part of the query.
ParkMyCar commented 7 months ago

Here's an example that I think demonstrates the complexity ahead of that rewrite approach

I was waiting for an example like this to pop up, good to figure this out early on! Chatted with @danhhz today who put this prototype up https://github.com/MaterializeInc/materialize/pull/23782 for schema migrations within Persist, and I think we're starting to get the shape of a design. Here's a braindump of the latest:

Persist

Similar to what Dan did in the prototype, we make the bytes we write into S3 schema aware. There's a bit of hand-waviness still, but a general shape is we add schemas to State<T>. This collection of schemas can be one of two things:

  1. A map of SchemaId -> Schema
  2. A root schema, with a list of diffs, where each version is an index in that list. For example, V0 would be just the root, V1 would be root + diff[0], V2 would be root + diff[0] + diff[1]

Each Batch would then be persisted with the version of the schema it was written at, and when opening a ReadHandle you could specify the version you want to read. During read (probably at the level of FetchedPart?) we would then do the "migration" from the persisted bytes to the version of schema requested.

hand-waviness I'm not exactly sure how we'd do the migrations, we currently just get bytes back from S3, and defer the decoding to some type K. There's probably some way to make this work, just haven't thought through it.

When Persist compacts two batches we'd also migrate at that point in time to the greater of the two schemas.

Adapter

Introducing the the concept of schema versions, and being able to read out of Persist at an older version works really well for Adapter.

When creating an object that depends on a table, e.g. a VIEW, we can persist the current schema version of any dependent tables. Haven't put too much thought into this, but next to the create_sql we currently store, we would also do something like dependency_versions: BTreeMap<GlobalId, SchemaVersion>. When planning, we push this schema version into the resulting object that we pass to compute (dataflow?) and it then reads out of Persist at whatever version it needs. This is really nice because then existing dataflows can continue to "just work" even as new columns are added.

ParkMyCar commented 7 months ago

For the next day or two I'm going to work on a smaller task, let this design bake for a bit, and then prototype/code spelunk. After that I'll write a design doc!

benesch commented 7 months ago

Haven't put too much thought into this, but next to the create_sql we currently store, we would also do something like dependency_versions: BTreeMap<GlobalId, SchemaVersion>.

This sounds right to me!

When planning, we push this schema version into the resulting object that we pass to compute (dataflow?) and it then reads out of Persist at whatever version it needs. This is really nice because then existing dataflows can continue to "just work" even as new columns are added.

This also sounds good to me, but worth checking with compute about how exactly this will work. Will MirRelation::Get grow an additional schema_version: SchemaVersion field?

danhhz commented 7 months ago

When Persist compacts two batches we'd also migrate at that point in time to the greater of the two schemas.

super nitpick: Technically I think we'd have to write it at the schema of whatever WriteHandle is assigned the compaction work, which is not necessarily the greater of the two. If this ends up being an issue (it might be? It's certainly a little weird and I think it might lead to data loss), @bkirwi and I have tossed around some brainstorming for how to make compaction more independent schema-wise.