Open nmeagan11 opened 1 year ago
@doy-materialize wanted this today to fix up data in a webhook source.
another use case for this is moving sources around - if i stand up a webhook source in one environment and want to move it to another environment, or other kinds of restructuring like that, that necessarily means losing the entire data history (you could theoretically script something up to read all of the raw data from the original source and send fake webhooks to the new endpoint with that data, but getting that right isn't trivial, and could potentially have issues with things like rate limits depending on how much data there is).
this is currently mostly an issue for internal stuff because most customers only have a single environment, but it does feel like it limits the possibilities for refactoring/reorganization in kind of annoying ways.
Some constraints that need to be relaxed for this feature to happen are the following:
Persist dictates that there is only one process that determines the contents of a shard for a particular timestamp. Currently storage dataflows hold a persist write handle and append new batches of data to the shard. In order to allow users writing to the shard of a source at an arbitrary time we'll have to either:
CREATE SOURCE foo
is actually a CREATE SOURCE foo_ingestion ..; CREATE TABLE foo_edits; CREATE MATERIALIZED VIEW foo AS (SELECT * FROM foo_ingestion) UNION ALL (SELECT * FROM foo_edits)
and routing all writes to foo_edits
. This is the simplest but has the downside that there is no cross-shard compactionUpsert dataflows read back their output shard in order to reconstruct their in-memory state. If we were to allow arbitrary modifications to their output by the user then it's unclear what happens on rehydration. Options here include things like:
I agree with @petrosagg here, actually merging data between an ingestion dataflow and the INSERT
statements is quite complex, I think all variations have tradeoffs.
He also makes a point about weird semantics of this feature interacting with UPSERT
, but I also want to point out that this feature has bizarre semantics for anything but append-only sources, such as pg/mysql source:
DELETE
an error? Do we need new SQL syntax for this?
I think any usecase for this that is just merging datasets is better captured by UNION ALL
. For fixing errors, I think a more targeted way to retract errors from ENVELOPE NONE
sources makes more sense. For ignoring errors for features of mysql/pg we don't support yet, I think the IGNORE ERRORS
work people have been looking into fits better.
Agree with Petros and Gus that this feature feels semantically fraught... and for fundamental reasons, not just in the details. Today, despite all the caveats etc., (sub)sources generally have a pretty straightforward denotation as a view over some external state. Correctness properties 2 through 4 are all about how hard we work to faithfully preserve those semantics. Once users can write to subsources this seems to go out the window — a subsource’s contents might have little relationship with the upstream database at all.
That said, I think a lot of the details are kind of messy too. Take blue/green deploys: if I deploy an updated version of my source with a new schema, do my manual edits disappear or carry over? (Either option seems like it could be surprising.) What if those updates don’t match the schema of the new import, or a retraction retracts something that no longer exists?
I think it’s kind of interesting that users generally don’t have the same issues with materialized views. If a materialized view doesn’t spit out the data you want, you have options like:
coalesce
away an unwanted null.... where id not in (select * from blacklisted_ids)
or that sort of thing.Neither of these options are accessible to sources. Subsources can be pretty substantial views over the input data, involving complex transformations like Avro decoding — but users don’t have much access to or control over that logic. I feel like giving users control over that logic instead of baking it into the source feels like a less gnarly and more “Materailize”y approach to the problems that motivate this feature…
Back each logical collection by two shards, one that the source cluster controls and one that is a normal table ... This is the simplest but has the downside that there is no cross-shard compaction
Yeah, I think the lack of cross-shard compaction is a nonstarter. I imagine that a major use case for this feature would be running manual garbage compaction jobs like DELETE FROM src_table WHERE ts < now() - interval '1 month'
to clean up old data.
Make storage dataflows write batches to S3 and send metadata back to the controller that does the final CoA to the shard. Open question of who does the final CaA in a pv2 world since there can be multiple controllers. One take is that you can only write to a source from the cluster that manages the ingestion.
Yeah, this is loosely what I was envisioning! The additional data is stitched into the source by the controller.
Upsert dataflows read back their output shard in order to reconstruct their in-memory state. If we were to allow arbitrary modifications to their output by the user then it's unclear what happens on rehydration ... This means a rework of upsert will be needed so that it continually reads back its output (as opposed to one time at startup) and takes it into account for each batch produced.
Yes, exactly what I was thinking conceptually!
He also makes a point about weird semantics of this feature interacting with UPSERT
I don't think the semantics are weird for ENVELOPE UPSERT
? The implementation is complex, for sure, but the semantics seem straightforward.
I also want to point out that this feature has bizarre semantics for anything but append-only sources, such as pg/mysql source
Yep, @frankmcsherry called this out on Slack at some point, which is immortalized in the issue description. There's definitely an important distinction to draw between database CDC sources and Kafka/webhook sources.
My understanding is retracting errors is one intended feature. How do you DELETE an error? Do we need new SQL syntax for this?
I think the initial scope of this feature can not involve deleting errors. Although I do think https://github.com/MaterializeInc/materialize/issues/22430 is pointing in a cool direction: DELETE FROM ERRORS(t)
.
What does a source "table" even represent? Currently its an exact copy of the upstream table, but now it can contain arbitrary data whose origin is an arbitrary dml statement, lost to time?
Agree with Petros and Gus that this feature feels semantically fraught... and for fundamental reasons, not just in the details. Today, despite all the caveats etc., (sub)sources generally have a pretty straightforward denotation as a view over some external state. Correctness properties 2 through 4 are all about how hard we work to faithfully preserve those semantics. Once users can write to subsources this seems to go out the window — a subsource’s contents might have little relationship with the upstream database at all.
Hmm, it just doesn't seem all that semantically fraught to me? The implementation seems hard as hell in a few parts, for sure, but subsources being an exact copy of the upstream system unless you make some edits locally seems reasonable to me. The existing correctness properties all seem compatible with this take, too. None of them say that you can't sneak in additional events from a different source.
It's also the experience you get with Fivetran, too. Fivetran loads into normal tables. You mess with those tables at your own peril, but if you do discover that somehow the data's gotten out of sync, you can just fix it up manually in the downstream system, rather than forcing a full resync.
if I deploy an updated version of my source with a new schema, do my manual edits disappear or carry over? (Either option seems like it could be surprising.) What if those updates don’t match the schema of the new import, or a retraction retracts something that no longer exists?
I'd definitely expect them to disappear. To use the Fivetran analogy, adding a new copy of the source is forcing a full resync, and now it's on you to either have pushed your edits upstream, or to have some means of applying them manually again in Materialize.
I feel like giving users control over that logic instead of baking it into the source feels like a less gnarly and more “Materailize”y approach to the problems that motivate this feature…
I'm definitely interested in making FORMAT
s and ENVELOPE
s more programmable so that the existing syntax can turn into syntax sugar. But doesn't that point back in the direction of "writing at" sources? At least loosely. Say we got rid of ENVELOPE UPSERT
, for example. There's no way to get the equivalent experience on top of an ENVELOPE NONE
source, because, even if you build the upsert logic as a TopK in compute, you can't push the retractions back at the persist source. AFAICT, either you need TRANSFORM USING
, triggers, or the ability to issue DELETE
s on source tables.
Thanks very much for the thoughts, y'all! It seems like a lot of the concerns are around 1) the implementation of writing at ENVELOPE UPSERT
sources and 2) philosophical questions around "what is a subsource table anyway?"
Maybe it would help to scope the conversation down to Kafka and webhook sources with ENVELOPE NONE
to start. Does anyone have major qualms about allowing users to use INSERT
, UPDATE
, or DELETE
with those types of sources?
I think this neatly sidesteps the philosophical problems. Webhook sources don't have an upstream—the source is the source of truth. And Kafka sources using ENVELOPE NONE
already aren't an exact replica of upstream state. If the Kafka topic has a retention policy, that retention policy will be reflected upstream but not in Materialize.
A timely Slack conversation from a user who would have been well served by the ability to DELETE
from webhook sources: https://materializeinc.slack.com/archives/C05UL8K9B7G/p1716286327650999?thread_ts=1716283390.330859&cid=C05UL8K9B7G
Summary
Allow users to modify the data in source tables using
INSERT
,UPDATE
, andDELETE
statements.Details
Motivation
Users want a way to correct "wrong" or corrupted data that's already been ingested into Materialize. This is particularly important in cases where 1) the user cannot fix the data in the upstream system, or 2) the source uses
ENVELOPE NONE
and so it is not possible today to edit the existing data.Design
Once #20208 is complete, the conceptual model of sources will become clearer: they're just an "ingestion widget" that read data from an external system and write it to normal-looking SQL tables. The only difference between a source table and a normal table is that source tables are read only to end users. End users can't edit the data in a source table. The source tables are wholly under the control of the source ingestion widget.
So, post #20208, the proposal here is quite simple: lift the read-only restriction on source tables. Allow running
INSERT
,UPDATE
, andDELETE
commands against source tables, as if they were normal tables.There are, however, many implementation details to sort out: e.g., how do we interleave end user DML with the ingestion widget's updates?
Discussion
@frankmcsherry points on out Slack (link lost to the sands of time, sorry) that it might be worth drawing a distinction between database CDC sources (MySQL and PostgreSQL) and streaming sources (Kafka). Database CDC sources blindly issue retractions when an upstream deletion or update occurs, because they trust that data was already written to Materialize. Whereas Kafka will only issue a retraction when used with
ENVELOPE UPSERT
.21 May 2024. A Slack conversation about a user who would have been well served by the ability to
DELETE
from webhook sources.How would this feature interact with
TRANSFORM USING
? https://github.com/MaterializeInc/materialize/issues/16547See also
20208