MaterializeInc / materialize

The data warehouse for operational workloads.
https://materialize.com
Other
5.68k stars 458 forks source link

MVP freshness metric #28097

Open benesch opened 1 week ago

benesch commented 1 week ago

Summary

Design and implement a minimum-viable freshness metric that can be summarized across dataflows, clusters, regions, and customers.

Details

Background

There is good historical context in the Notion document about Document and Measure Sources of Unavailability in Materialize.

Conceptual framework

Summarization is hard! See the screed I left as a top-level comment in the above Notion document.

Here's a rough theory of summarization:

We have an easy means to summarize freshness across time, too: the percentage of seconds that were FRESH in a given time period.

Taken together, we can describe freshness across several dimensions. We can look back at the last week and see if customer $CUST had more than (say) 97% freshness. We can look back at the last month (or quarter!) and assess freshness across all customers. We can look at overall numbers month over month and see how the overall freshness trend is improving or declining.

Open question: how do we let users indicate which clusters are running production workloads? Ideally, we introduce some sort of MONITOR = TRUE option to clusters (with incentives for setting it on clusters that are truly running prod workloads, and disincentives for setting it on clusters that are not truly running prod workloads), and then we can restrict our attention to only the clusters that are marked for monitoring.

In the short term, we can maintain by hand a list of clusters that are known to be running production workloads, and manually join against those clusters in our monitoring.

Out of scope: sources and sinks. We're explicitly downscoping here to just objects whose write frontier indicates freshness, which today is just indexes and matviews. In the future we might get sources "for free", if https://github.com/MaterializeInc/materialize/issues/23345 turns out to be a good idea. If not, we might need to do more work to get a dedicated notion of source lag, as described in: https://www.notion.so/materialize/Metrics-for-lag-behind-upstream-source-d5c055fc47124777bf607c10be850d34?utm_content=d5c055fc-4712-4777-bf60-7c10be850d34&utm_campaign=TL68GSMGF&pvs=6

Implementation

Largely TODO. @teskje: I would love your thoughts here.

cc @frankmcsherry @teskje @bosconi

sthm commented 1 week ago

Alternatively, try to rig something up with introspection sources inside of Materialize. The data we want is ostensibly computable from mz_frontiers, but 1) we'd need to make that a retained metrics collection (scary!) and 2) we'd need CHANGES to actually write the query that says "count the number of seconds where frontiers were more than x seconds behind wall clock time".

This seems to be the most complex option that takes the most work to implement. But at least from a customer perspective, I think it's the most appealing one: it surfaces these metrics to them (instead being exposed in an internal tool) and these techniques (CHANGES) apply to other metrics as well (e.g., what was the average CPU utilization in the last hour).

benesch commented 1 week ago

The final option could/would also make the data observable and queryable to users via a system table; it’d just be a minute stale. Wouldn’t require CHANGES to query, which would be good.

teskje commented 1 week ago

v0.2: declare that a dataflow is FRESH iff its write frontier is within 1 (2? 3?) seconds of the current wall clock time

It's possible to construct dataflows that will never be FRESH under this definition, even though we'd presumably treat them as healthy. Each persist roundtrip adds some CRDB/S3-induced delay, so if you build something that depends on a chain of sufficiently many MVs, the resulting persist delay will be higher than 1/2/3 seconds. Maybe that's fine and we don't expect users to build these MV chains in their production cluster, but if they do, that'd break our summarization scheme for the entire cluster/customer/region/stack.

Implementation

I assume having the metric end up in Prometheus is a hard requirement? Or are we fine having it in a system table and using some other means of pulling it from environments and summarizing it?

Agreed that making mz_frontiers a retained-metrics collection seems scary! Roughly all the frontiers are changing every second, so this collection is going to get very large quickly. If data ending up in Prometheus is a requirement, we'd need to change the promsql-exporter to be able to deal with retained-metrics relations, but that seems feasible.

The fourth option does indeed seem straightforward if we let Mz push to Prometheus directly. If we write the freshness measurements to an append-only table we'd again need to be worried about the size of the collection (which we may need to index), at least as long as we don't have some form of periodic cleanup for append-only tables. And we'd also need to teach the promsql-exporter how to work with log relations (i.e. remember up to which time the last query received results, so it doesn't fetch results twice).

There are two other options I can think of:

benesch commented 1 week ago

I assume having the metric end up in Prometheus is a hard requirement? Or are we fine having it in a system table and using some other means of pulling it from environments and summarizing it?

Definitely not a hard requirement. The only hard requirement is that we have a dashboard tool that can slice and dice freshness across clusters/regions and across time.

Pushing the catalog exporter across the finish line would be a welcome alternative.

benesch commented 1 week ago

It's possible to construct dataflows that will never be FRESH under this definition, even though we'd presumably treat them as healthy. Each persist roundtrip adds some CRDB/S3-induced delay, so if you build something that depends on a chain of sufficiently many MVs, the resulting persist delay will be higher than 1/2/3 seconds. Maybe that's fine and we don't expect users to build these MV chains in their production cluster, but if they do, that'd break our summarization scheme for the entire cluster/customer/region/stack.

We discussed this problem a bit in the TL sync this week, and one proposal on the table would be to instead define not a binary FRESH/NOTFRESH designation, but as a "number of seconds of lag" STALENESS measure. Summarization works largely the same as it did with FRESH/NOTFRESH. A cluster's STALENESS is the max staleness of any of its dataflows. And so on.

This is strictly more powerful than FRESH. You can easily convert STALENESS into FRESH/NOTFRESH by applying a threshold at the time of analysis (rather than the time of measurement).

Teach the promsql-exporter to subscribe to mz_frontiers, translate updates of this relation into freshness updates, and push them to Prometheus through the remote write API. This is nice because it doesn't require changes to Mz and might be useful for other metrics we want to export in the future.

I like the sound of this a lot. Seems much cleaner architecturally.

But it also doesn't work with the above definition of FRESH because we can't put mz_now()/now() in a subscribe to compare with the wallclock time.

That's okay though, right? We can go full special case. SUBSCRIBE to mz_frontiers, then do the comparison to wall clock time each second in Rust code.

And when the promsql-exporter goes down for whatever reason, we lose the freshness metric for that time.

That seems okay! That should present as "no data" and conservatively assume NOTFRESH or maximum STALENESS.

teskje commented 1 week ago

That's okay though, right? We can go full special case. SUBSCRIBE to mz_frontiers, then do the comparison to wall clock time each second in Rust code.

Ah, good point. I was thinking that "wallclock" time should really be some time produced by the coordinator, the oracle write time or something like this, given that the actual wallclock time has no relation to the times at which source data is produced. But the oracle time also has no relation (at the moment) to the times at which sources or the per-replica introspection relations commit their updates, so for these using the oracle time would not obviously be better than using just the system time. Given that system times between pods are hopefully pretty similar, letting the promsql-export use its own time should be fine.

The only remaining question would then be how we want to expose this data to Prometheus. We can either (a) expose the maximum lag per minute and let the scraper fetch it as usual, (b) expose a histogram of per-second lags for the scraper to fetch, and (c) actively push the per-second values. (a) is easiest but provides reduced granularity. (b) is maybe scary due to amount of time series but is also easy to implement. (c) requires a bunch of new code in the promsql-exporter and is maybe also scary from an amount-of-data perspective (?), but gives us the most flexibility for using this data.