MaterializeInc / materialize

The Cloud Operational Data Store: use SQL to transform, deliver, and act on fast-changing data.
https://materialize.com
Other
5.72k stars 466 forks source link

Consider `CHANGES(AS OF)` table function #15754

Open frankmcsherry opened 1 year ago

frankmcsherry commented 1 year ago

Feature request

This idea comes in from other folks, but has been independently requested.

One can imagine a CHANGES(AS OF) table valued function, which acts on any collection, and provides the output SUBSCRIBE provides, but as a collection that can be transformed, indexed, materialized, etc.

Mechanically, this is not that hard: each input update (row, time, diff) is transformed to ((row, time, diff), max(time, AS OF), 1).

The existence of such a collection requires a read hold at AS OF and holds back compaction to AS OF, as unlike other SQL operators the CHANGES operator does not accommodate compaction (applying it to a collection that has been compacted forward in time is not the same as compacting its output to that same time). In principle this compaction hold could be released when data are sunk to durable storage (e.g. a materialized view) but not as time passes indexes by (they would need to be recoverable at the initial time).

Anyhow, the request is to explore the uses for this, and to centralize interest and discussion. It may be that uses of this transform encourage antipatterns (queries that could have been maintained views that instead are written as SQL over timestamps, applying redundant logic).

benesch commented 1 year ago

Copying in some context from a duplicate I filed (#16476):

From @frankmcsherry:

This has come up in a few spots now, so raising a thought provoking discussion: imagine there was a function CHANGES(collection, as_of) whose columns were the same as those of SUBSCRIBE (the columns of collection plus a timestamp and diff), which is an append-only collection that shows you a snapshot of collection at as_of and then all of the diffs from that point forward, with each diff occurring at the timestamp it indicates. I'm trying to think through what that thing is (a source, a view, a mv, .. ) and what if any frustration it introduces.

For example, if it were to be a view, we would need to hold back compaction of collection to as_of, because anyone using it would need to be able to see the same results, which require playing out collection from that historical time. If we made it a MV then perhaps we don't need to do that because things are written down, but we would get stressed out when we recover and try and reproduce the contents. Perhaps it is more like a source which just writes data down and says "that's what I wrote; too bad if future me doesn't like it".

It's a bit scary because it is hard to revise the output, as it is append only and speaks about changes potentially in the distant past. That fights a fair bit with our approach to upgrades (we will reflow your dataflows and correct them, because maybe we had a bug and want to get you in the correct state), but the value also drops a bit if CHANGES doesn't produce a history from which you could return to the original collection. Maybe "reconciliation" is a large utterance we would write out at some change if we saw it, but we'd want a sidecar collection persisted so that we could read back what we had efficiently.

From @benesch:

Having CHANGES(src, as_of) hold back compaction on src to as_of is the thing that makes sense to me. Stressful to do that in an index or materialized view, for sure. Maybe we hard disallow doing that. Usable in one-off SELECTs only, say.

Or maybe we let you peg as_of to a sliding window. Would be cool to have a view that shows you the last 30m of changes, say.

CHANGES(src, mz_now() - ‘30m’)

On the subject of persisting CHANGES: sounds like maybe we want as_of to be advisory, where we have permission to occasionally advance it to the source’s since frontier? If we need to basically start over due to a behavior change in Materialize, for example. If we’ve written down the change stream, we can retract the old one that we wrote down, then start a new one at the source’s current as of.

frankmcsherry commented 1 year ago

Some thoughts that occurred in Slack but are being retranscribed here (not edited for github comment style):

  1. CHANGES seems less scary in a data warehouse context. Kimball modeling has this sort of information in piles anyhow. We might still be bad at it, but expectations might align.
  2. First cut: CHANGES can only be applied to things that resolve to persist collections. Its action is to "maintain" the SUBSCRIBE output we would produce.
  3. Seems to require opening up "user controlled compaction". Otherwise, failures out of the users control. This invites the risk that uncompacted data are expensive for everyone. Mitigateable with a. very smart persist. b. double collections (select * from uncompacted; -- compact this). c. expectation management (don't directly query uncompacted data).
  4. May also call for CREATE MATVIEW / INDEX AS OF <time> to avoid watching CHANGES evolve. Otherwise, e.g. SELECT COUNT(*) from CHANGES(shard) might be obliged to watch the count for from zero to current value over time. Depends a bit on how we pick timestamps for dataflows (e.g. as a function of since or upper). (edited)

The intent was to try and think a bit harder about what are the moments where we could allow CHANGES without locking ourselves in to supporting something that we don't know how to do well. I think we probably do know how to convert uncompacted data in to CHANGES output; I think we do not yet know how to e.g. optimize queries involving CHANGES in surprising locations and meeting user expectations when we do.

It might be easiest to test this as a persist_source argument that re-interprets the data it reads, rather than a table function that .. violates the time-invariance property that the rest of our IR has (that the output at a time can be determined from logic applied to the accumulated input at that time). We have another case of wanting source-reinterpretation elsewhere (pulling out the progress source using the same id, I think, @aljoscha?). Perhaps source reinterpretation is more broadly valuable, for example getting data-defined CDC via https://github.com/MaterializeInc/materialize/issues/6318.

aljoscha commented 1 year ago

Yes, we did think about something like PROGRESS(the_source) in the past but settled on exposing the progress collection under it's own ID.

@danhhz and I did have a skunkworks project where we want to report the (continually changing) metadata of a persist shard. The syntax we originally came up with for that was SELECT ... FROM PERSIST METADATA FROM (the_collection) (yes, there's two FROMs in there). This feels very similar to CHANGES(the_collection), and could even be made more similar: PERSIST_METADATA(the_collection).

Internal link to the demo, for above: https://materializeinc.slack.com/archives/CNVRXGFDJ/p1676675501147309

RobinClowers commented 1 year ago

This would be really useful in the web console for building observability features. Here is a simplified example of how we might query memory usage over time:

SELECT
  MAX(memory_percent),
  date_bin(
    interval '60 seconds', occurred_at, $1
  ) as bin_start
FROM CHANGES(mz_internal.mz_cluster_replica_utilization, mz_now() - '30m')
GROUP BY bin_start
ORDER BY bin_start DESC;
chaas commented 1 year ago

CHANGES is being discussed in the streaming SQL committee too!

Sharing some notes from that group:

sjwiesman commented 1 year ago

It would only be for SELECT queries, not continuous queries (like our materialized views and indexes)

@chaas what’s the argument against allowing changes in continuous queries?

chaas commented 1 year ago

@sjwiesman we ended up having later proposals that would support CHANGES in continuous queries. In the proposed design, we would effectively support creating a materialized view that is a change log over an existing object. The risk there is the size of materialized view, since the incremental changes would not get compacted away, unless there's some aggregation on top.

sjwiesman commented 1 year ago

I suppose I don't see that as inherently problematic as long as you understand the consequences.

Like I would love to put @RobinClowers's above query in a view an track rolling memory usage over some time.

benesch commented 1 year ago

Yeah, I think the fact that the output of CHANGES is append only is fundamentally fine. The stress is on the implementation side: what do you do on a version update? Do you also have to permanently hold back the since frontiers of all inputs to CHANGES so that you can always recompute the changes in the new version (in case the new version computes different changes)? That will only work for collections that do not change often—or for CHANGES where you only care about some small historical window (e.g., last 30 days), and not all time.

Message ID: @.*** com>

benesch commented 1 year ago

Linking in the related idea of durable subscribes: #17541.

benesch commented 10 months ago

Recording a design thought that @chaas and I have been workshopping: CHANGES is just a SUBSCRIBE that you can use in nested contexts, like:

In a view:

CREATE MATERIALIZED VIEW v AS SUBSCRIBE AS OF '2023-01-01T00:00:00';

In a one-off SELECT:

SELECT some + transform FROM (SUBSCRIBE AS OF '2023-01-01T00:00:00');