MaterializeInc / materialize

The Cloud Operational Data Store: use SQL to transform, deliver, and act on fast-changing data.
https://materialize.com
Other
5.72k stars 466 forks source link

sinks,kafka: sinking a materialized view will queue all messages, causing memory usage to spike #7043

Closed aljoscha closed 2 years ago

aljoscha commented 3 years ago

The Kafka sink stashes records for "open" timestamps until they are finalized/consistent/durable. The (avro) encoded records are essentially stashed in a Vec in a HashMap (see [1] and [2]).

When sinking a materialized view, all sinked records have the same timestamp (more or less, possibly depending on timestamp compaction). Say the view has 100M individual entries. The (Kafka) sink will now stash 100M encoded records in memory before starting to produce them to Kafka. This means that a) we need to wait a long time for sinked data to arrive at Kafka, and b) the Materialize process will likely run out of memory before it can finish.

The reason for our problematic behavior is the interplay of two things:

  1. Arrangements that are imported into a dataflow replay their contents at the current-ish timestamp. It seems all records are emitted at the same timestamp (see [3]). A materialized view is backed by an arrangement.
  2. The Kafka sink waits for timestamps to be closed. In our case there is only one timestamp, meaning the sink waits for the complete batch of all the changes so far.

Solution

There are two obvious solutions based on the two root causes:

  1. Don't emit all updates at the same timestamps when importing an arrangement but instead let them "trickle out" over time. This would possibly require changes in differential-dataflow
  2. Don't wait for timestamps to close in the sink. This one seems harder and would require a re-imagining of the sink architecture.

There might be other, non-obvious solutions.

References

[1] https://github.com/MaterializeInc/materialize/blob/0f7a9b27330ba35b53005533ef4c2812fa242b2e/src/dataflow/src/sink/kafka.rs#L433 [2] https://github.com/MaterializeInc/materialize/blob/0f7a9b27330ba35b53005533ef4c2812fa242b2e/src/dataflow/src/sink/kafka.rs#L450 [3] https://github.com/timelydataflow/differential-dataflow/blob/b168506cd74a01658e312e624e8a4cb014400a28/src/operators/arrange/agent.rs#L424

aljoscha commented 3 years ago

cc @nmeagan11 and @frankmcsherry (because you mentioned this to Cuong, apparently)

aljoscha commented 3 years ago

Also, thanks @krishmanoh2 for reporting this!

frankmcsherry commented 3 years ago

Related to: https://github.com/MaterializeInc/materialize/issues/2808

aljoscha commented 3 years ago

Thanks! One of the approaches I have in my head for CDCv2 sinks would work like #2808. I have half a mind to do that even though it seems a smidge more complicated than our current sinks in the short run.

The scheme I have in mind allows writing batches in parallel (multiple workers) and still get exactly-once semantics with RT-timestamping. However, even with that, we would have to wait for RT timestamp bindings to be durable. At least I'm naively thinking about it like this right now:

And I still don't see how we can get around waiting for durable timestamps with the existing sink.

Some spitballing: it could work if the arrangement (either the one that we sink or the one that the sink dataflow uses to consolidate) could send this downstream:

  1. I know time t is done
  2. Send all the data that is not beyond time t
  3. Signal that there won't be more data that is not beyond t

This way the sink wouldn't have to also buffer the data for time t.

Maybe we can have a call about this when you're back from vacation, in case you already have more ideas?

aljoscha commented 3 years ago

Also seems related to this: #7031

aljoscha commented 3 years ago

Couple thoughts on my earlier, ahem, thoughts

Some spitballing: it could work if the arrangement (either the one that we sink or the one that the sink dataflow uses to consolidate) could send this downstream:

  1. I know time t is done
  2. Send all the data that is not beyond time t
  3. Signal that there won't be more data that is not beyond t

This is already what an arrangement gives us. Change batches are only sent downstream when the frontier has passed them. Someone more knowledgeable should confirm, though.

I'm thinking more and more that we could do a very nice sink with CDCv2. Writing in parallel, good performance ...

Coming back to the immediate issue. I believe the only reason we need to stash records in the sink is so that we can sort them by time. This is relevant for the DEBEZIUM and UPSERT envelopes, where the order in which records arrive in Kafka do have meaning. The reason we need to sort is that we're only writing to Kafka from one worker. In the sink dataflow of ... -> arrange -> put_in_envelope -> encode -> produce_to_kafka the last step exchanges all updates to one worker, which messes up the order of elements. It's quite the pickle, we write from one worker to be able to work with Kafka transactions to provide consistency, but that introduces this other problem.

umanwizard commented 3 years ago

This is already what an arrangement gives us. Change batches are only sent downstream when the frontier has passed them.

This also matches my understanding.

aljoscha commented 3 years ago

Thinking a bit outside the box. We emit records in timestamp order from the sink for two reasons:

  1. To satisfy the happens-before nature of UPSERT sinks. Updates that "happen" later in time need to be later in the topic, so that the sinked view can be faithfully reconstructed
  2. To write a DEBEZIUM consistency topic that is compatible with DEBEZIUM consumers, including our own DEBEZIUM source. And I believe also the UPSERT source with BYO timestamping, which reads our debezium-style consistency topic.

UPSERT

To satisfy 1. it is enough to ensure that records with the same key are emitted in timestamp order. Records of different keys might appear in the topic with interleaving timestamps. This would make it harder for a consumer of that topic to get "consistency" based on timestamps but I'm not sure this even makes sense for UPSERT topics.

We can already naturally satisfy 1. because the arrange -> combine_at_timestamp pipeline should process things in timestamp order. Exchanging those records to one worker (as the sink does) does not change the order for records from the same worker, so timestamp order within a key is preserved. This does potentially introduce the aforementioned interleaving of timestamps for records of different workers.

The consistency topic we write for this could not be used as a BYO input topic anymore, because of the timestamp interleaving. It would still work for exactly-once purposes in the sink, where we use it to filter out records that we have already written based on the timestamp.

DEBEZIUM

To satisfy 2. we really do need to write all records that have the same timestamp as one batch and then write the consistency record for that timestamp.

Radical proposal

One radical solution could be to not support DEBEZIUM sinks and instead heavily lean on CDCv2 sinks. We would not write a consistency topic anymore that is usable as a BYO consistency input topic.

The question we would need to answer is whether anyone wants to consume our debezium-style consistency topic. The answer could well be yes because Debezium ha an existing ecosystem while nobody uses CDCv2.

aljoscha commented 3 years ago

I moved this to the Icebox for now because it's not easily fixable with the current infrastructure around sinks/dataflows. I outlined the problem and a potential path to a solution here: https://gist.github.com/aljoscha/9d134bbec1b5fceacf8a2da791a44aef