MaterializeInc / materialize

The Cloud Operational Data Store: use SQL to transform, deliver, and act on fast-changing data.
https://materialize.com
Other
5.72k stars 466 forks source link

storage: Kafka sinks with a large snapshot can exceed transaction timeout #23955

Open benesch opened 9 months ago

benesch commented 9 months ago

Summary

A Kafka sink over a sufficiently large volume of data can fail loop forever if producing the snapshot to Kafka takes longer than the fixed transaction timeout (currently 60s [^1]). We observed this in the feature benchmark when we attempted to turn the transaction timeout down to 10s: https://github.com/MaterializeInc/materialize/pull/23873.

A similar hazard theoretically exists in the steady state, if producing the data at a single timestamp takes longer than the transaction timeout. I don't believe we've ever observed this in practice, though.

[^1]: we've increased the snapshot timeout across the board to 10 minutes (Slack).

Details

Kafka transactions have a fixed timeout from when they are initiated. This timeout is configurable on the producer via the transaction.timeout.ms parameter. It defaults to 60s. The maximum allowable timeout is configured by the transaction.max.timeout.ms parameter, and defaults to 15m.

For most applications, most transactions are presumably quite quick, and a timeout of 60s is not an issue. But for Materialize, our EOS semantics presently require us to produce the initial snapshot at a single timestamp, and therefore in a single Kafka transaction. These snapshots can be 100GB+ in the worst case, which can easily take longer than 60s to produce to Kafka.

We can't just dial the transaction timeout up to 15m, though. If a Kafka sink fails at exactly the wrong moment, it can leave open a transaction with a "lock" on the progress topic, which will prevent any other Kafka sinks from resuming until that lock expires. (Because progress topics are shared between sinks, we can't rely on fencing to abort in-progress transactions.)

For the gory details on the liveness issue, see: https://github.com/MaterializeInc/materialize/issues/18628#issuecomment-1819384708

There are a few paths forward:

  1. ~Adjust the transaction timeout on a per customer basis. https://github.com/MaterializeInc/materialize/pull/23872 set us up for this. We can dial up the transaction timeout for customers with large Kafka sinks as we like—though we'll have to warn them about the liveness implications.~ We have increased the snapshot timeout across the board to 10 minutes (Slack).
  2. As an extension of the above, we could allow users to configure the transaction timeout on a per-sink basis, and then they could choose the point on the liveness—snapshot size spectrum that makes sense for each sink. It's a bummer to push this complexity onto the user, but it has the upside of being very easy for us to implement. It might also be required to support customers who have turned transaction.max.timeout.ms down to something less than 60s in their Kafka clusters.
  3. Create progress topics 1:1 with data topics. (Past proposal for this.) Then we could dial the transaction timeout up to 15m and rely on fencing to abort in progress transactions on resumption. The downside is that there is operational overhead for the user with all the additional topics.
  4. Learn to emit large timestamps incrementally. The progress records we write to Kafka could encode partial progress—e.g., "we've written the first 10k records in this timestamp". Lots of fiddly implementation details to sort out with this approach, but it has no UX downsides that I can see, and is likely the right long term approach. We'd need to do something similar to support parallelizing sinks across multiple workers, and so it might make sense to do partial progress and parallelization in the same bundle of work.

tl;dr I would suggest seeing how long we can get by with (1). We can implement (2) in a pinch if necessary. And (4) is the long term solution.

See also

https://github.com/MaterializeInc/materialize/issues/27958

cc @guswynn @sploiselle @petrosagg @bkirwi

chuck-alt-delete commented 8 months ago

FYI we saw this with users twice this week

chuck-alt-delete commented 8 months ago

3: separate progress topic per sink:

With kRAFT becoming the norm, even modest Kafka clusters can support 1M+ partitions. Having one compact progress topic per sink may not be an issue in terms of load on modern kafka clusters. I don't see a problem in terms of ACLs / RBAC rolebindings as long as the materialize user in Kafka has permissions scoped on a common prefix.

2: user configurable timeout:

It might also be required to support customers who have turned transaction.max.timeout.ms down to something less than 60s in their Kafka clusters.

I don't understand how user configurable timeout would change the above ^. If a user configures a timeout of 15 minutes and the broker has the max timeout set to 1 minute, the broker's config wins and the snapshot will fail.

Snapshot too big

If the time it takes to snapshot exceeds the broker's timeout, then in all the scenarios except 4, the user would have to work with the Kafka admin to allow a large transaction. Is that correct?

benesch commented 8 months ago

If a user configures a timeout of 15 minutes and the broker has the max timeout set to 1 minute, the broker's config wins and the snapshot will fail.

Yes, agreed with this statement!

It might also be required to support customers who have turned transaction.max.timeout.ms down to something less than 60s in their Kafka clusters.

I don't understand how user configurable timeout would change the above ^.

Ah, that was addressing a failure mode other than "snapshot so large it exceeds the transaction timeout". I may have misunderstood how this works on the Kafka side. Imagine that the broker has a transaction timeout of 15s, but Materialize configures a transaction timeout of 60s. I was worried that the broker would hard reject Materialize's Kafka connection attempt for having a transaction timeout of 60s. And so we'd need to allow the user to configure the transaction timeout in Materialize to something <=15s in order for Kafka to accept the connection.

If Kafka brokers just set the transaction timeout to min(broker timeout, client timeout), then great, there's no problem here!

benesch commented 8 months ago

With kRAFT becoming the norm, even modest Kafka clusters can support 1M+ partitions. Having one compact progress topic per sink may not be an issue in terms of load on modern kafka clusters. I don't see a problem in terms of ACLs / RBAC rolebindings as long as the materialize user in Kafka has permissions scoped on a common prefix.

I think the concern was not load on the Kafka cluster but the human operational overhead of having to manage two topics for every Kafka sink, rather than just one. cc @bkirwi

If the time it takes to snapshot exceeds the broker's timeout, then in all the scenarios except 4, the user would have to work with the Kafka admin to allow a large transaction. Is that correct?

Yes, sort of! AIUI the default broker-side transaction max timeout is 15m, and the default client-side transaction timeout is 1m. So we have 14m of slop to play with by default. But in cases where the snapshot takes longer than 15m to produce, or where the Kafka admin has turned down the broker-side max timeout, yeah, users would need to ask their admin to raise the limit.

bkirwi commented 8 months ago

I think the concern was not load on the Kafka cluster but the human operational overhead of having to manage two topics for every Kafka sink, rather than just one.

Yeah, this is correct. In particular, if you don't want to give Materialize create-topic privileges for whatever reason, your life now becomes very complicated.

Two things have changed since we made the original call:

bkirwi commented 8 months ago

The other thing I'd say: changing this buys us a factor of 15 in terms of the size of snapshots we can emit, at least for normally-configured clusters. 15 is not a small number, so that may be worthwhile! But we'll need to address option (4) eventually i think.

chuck-alt-delete commented 7 months ago

User keeps seeing this timeout on relatively small data (0.1-0.6GB). Kafka cluster is Confluent cloud in GCP. That’s 1-10 MB/s, which is very low, so there is likely some other issue going on. However, there is more we can do to increase throughput.

The default Kafka producer settings are tuned for lowest latency possible, but that doesn’t fit our use case with Kafka sinks.

benesch commented 7 months ago
  • Compression type (currently defaults to none)

I've filed https://github.com/MaterializeInc/materialize/issues/25242 with the request to change the default compression type.

  • batch size
  • Linger ms

I think our batch size is probably okay for throughput. But maybe we want to bump linger.ms to 50ms or 100ms, up from 10ms.

Past Slack discussion on this point: https://materializeinc.slack.com/archives/C01CFKM1QRF/p1706201775387949?thread_ts=1706156816.285059&cid=C01CFKM1QRF

The suggestion I had there was to make the batch size and linger.ms values configurable in LD. I think that'd be a great first step, as it would let us run throughput tuning experiments in production without needing to wait a whole release cycle.

chuck-alt-delete commented 3 months ago
frankmcsherry commented 3 months ago

Drive-by comment that 4 is for example covered by CDCv2, which .. idk we've supported in the past. Supports out-of-order, potentially duplicated output. Just needs definite input, which we have.

chuck-alt-delete commented 3 months ago

User requested today to be able to configure batch size and linger. Different payloads will have different throughput characteristics, so it makes sense not to have it one size fits all.

benesch commented 3 months ago

Drive-by comment that 4 is for example covered by CDCv2, which .. idk we've supported in the past. Supports out-of-order, potentially duplicated output. Just needs definite input, which we have.

As a consumer of the sink, CDCv2 provides a pretty meaningfully different experience than the experience we provide today (EOS semantics via Kafka transactions). With CDCv2, the consumer of the data needs to implement a nontrivial deduplication protocol. I don't think we ever saw anyone get up and running with CDCv2 back in the day, and I'd be worried about the field engineering resources required to do so.

benesch commented 3 months ago

User requested today to be able to configure batch size and linger. Different payloads will have different throughput characteristics, so it makes sense not to have it one size fits all.

I'm not yet convinced we should expose these to end users. They're not a knob that anyone wants to turn; they're a knob you turn because you're unhappy with performance. I'm hopeful we can find better defaults that satisfy the vast majority of use cases with Materialize. (Our source tick interval is presently fixed to 1s, so we should probably be tuning linger.ms to 100ms or 500ms or so; no benefit to emitting data every 100ms because it can't change more than once every second.)

If we try adjusting the default and we realize that now low latency use cases have a worse time, we should definitely consider exposing the knob to users, but so far all of the complaints have been about the linger being too low, and I think the fix might be as simple as increasing the default.

Even if we do expose a knob here, we may want to consider something like TUNING = {LOW LATENCY | HIGH THROUGHPUT}, rather than exposing the knobs directly, so that we have more flexibility to adjust the tuning in the future.