MaterializeInc / materialize

The data warehouse for operational workloads.
https://materialize.com
Other
5.68k stars 458 forks source link

Unbounded memory usage with LIMIT (TopK) on non-Debezium Kafka sources #5013

Closed pikulmar closed 11 months ago

pikulmar commented 3 years ago

What version of Materialize are you using?

$ materialized -v
materialized v0.5.4-dev (b2b8a0d4a)

How did you install Materialize?

What was the issue?

Using TopK (SQL LIMIT) appears to be essential for defining bounded views on unbounded Kafka sources with append-only envelope (ENVELOPE NONE). In practice, the memory usage is unexpectedly high, which suggests that TopK might not be exploiting the append-only nature of the Kafka source. Thus, the issue can be rephrased as "TopK push-down to append-only Kafka source" and is related to https://github.com/MaterializeInc/materialize/issues/3196, https://github.com/MaterializeInc/materialize/issues/777, https://github.com/MaterializeInc/materialize/issues/652, and https://github.com/MaterializeInc/materialize/issues/4216.

Is the issue reproducible? If so, please provide reproduction instructions.

Yes. Given a Kafka topic mytopic with around 1M messages (~500 MiB total data volume), run

DROP SOURCE IF EXISTS data_stream CASCADE;
CREATE SOURCE IF NOT EXISTS data_stream
    FROM KAFKA BROKER 'mybroker:9092'
    TOPIC 'mytopic'
    WITH (
        ignore_source_keys=true
    )
FORMAT BYTES;

DROP VIEW IF EXISTS test_view CASCADE;
CREATE MATERIALIZED VIEW test_view AS
    SELECT
      s.data AS testcol
    FROM data_stream AS s
    LIMIT 1;

The increase in memory usage after submitting the above query and waiting a few minutes is roughly 2 GiB, indicating that all data read from Kafka are retained in memory. However, only a single (!) row of data should be kept in memory after accounting for the fact that Kafka messages cannot be deleted (cf. append-only source).

Please attach any applicable log files.

rjnn commented 3 years ago

Thanks @pikulmar for the report! cc @frankmcsherry shouldnt #3987 have fixed this? It might also have to do with the sources that fail to propagate monotonicity information in which case also cc @elindsey.

frankmcsherry commented 3 years ago

In principle yes, though in practice there are some quirks. The memory reduction relies on compaction, so a few things need to be true:

  1. You need to run with DIFFERENTIAL_EAGER_COMPACTION set to something (e.g. 1000).
  2. You'll probably want to reduce --logical-compaction-window to something like 1s (the default is one minute).
  3. You'll get best results if you don't use a static topic (there is a chance that Materialize can fully physically compact your data before enough time has passed for logical compaction to trigger; one additional record after --logical-compaction-window should resolve this, but it's likely not part of your test).

Also, there are other quirks like it spikes higher than I'd like myself.

ruchirK commented 3 years ago

I can confirm that I was able to reproduce this error on my laptop with a fairly small topic, and I was able to make it go away with a logical compaction window of 10ms

It's still a bit mysterious to me why the compaction window needs to be small, given that the retraction delay is 10 seconds? When I left things on the default settings for a few minutes, everything stayed uncompacted.

Also @pikulmar if you have other memory related issues you should check out Materialize's dataflow graph memory utilization viewing page <mz-hostname>:6875/memory it should take you to a graph visual that shows how much memory various operators are taking up and it has a share button that lets you share that graph with others such as us. Hope that helps!

frankmcsherry commented 3 years ago

It's still a bit mysterious to me why the compaction window needs to be small, given that the retraction delay is 10 seconds?

DD has the "issue" that if your data achieve a compacted physical representation, one batch for the entire collection, then it doesn't have another chance to do physical compaction, and doesn't grok the circumstances under which doing a speculative compaction would improve anything.

So, if the compaction is 1min, and we read the data in 7s and within 20s total have received and retracted the feedback, we have 40s to do physical compaction before we even think of cancelling anything (the fed-back retractions are not at the same time; they are 10s later on). If within that 40s we arrive at a single physical batch, we are "stuck" until we receive another input. Just one more record would unstick it.

pikulmar commented 3 years ago

Thank you all for the quick feed-back. Meanwhile, I found the time to follow up on this story. Summary below:

Debugging Materialize issue #5013

Test 1: JSON parsing followed by Top-K

Initialize test environment:

docker-compose --force-recreate up

With

# docker-compose.yml

# Based on
#  https://raw.githubusercontent.com/bitnami/bitnami-docker-kafka/master/docker-compose.yml

version: '2'

services:
  zookeeper:
    image: 'bitnami/zookeeper:3'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

  kafka1:
    image: 'bitnami/kafka:2'
    environment:
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
    depends_on:
      - zookeeper

  kafka2:
    image: 'bitnami/kafka:2'
    environment:
      - KAFKA_CFG_BROKER_ID=2
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
    depends_on:
      - zookeeper

  kafka3:
    image: 'bitnami/kafka:2'
    environment:
      - KAFKA_CFG_BROKER_ID=3
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
    depends_on:
      - zookeeper

  materialize:
    image: 'materialize/materialized:unstable-81f832b0818789c5e74183c208398431f14f079d'
    # environment:
    #   # Enabled only for selected tests, as explained in text.
    # - DIFFERENTIAL_EAGER_MERGE=1000
    ports:
      - 6875:6875
    depends_on:
      - kafka1
      - kafka2
      - kafka3

Test data are generated as follows:

$ docker run --rm -it --network=materialize-monotonicity-test_default confluentinc/cp-kafkacat /bin/bash

$ openssl rand -base64 100000000 | kafkacat -P -b kafka1:9092 -t test -p 0 -k key
$ kafkacat -C -b kafka1:9092 -t test -e -o beginning | wc
% Reached end of topic test [0] at offset 2083334: exiting
2083334 2083334 135416670

$ kafkacat -C -b kafka1:9092 -t test -e -o beginning -J | kafkacat -P -b kafka1:9092 -t test-json -p 0 -k key
% Reached end of topic test [0] at offset 2083334: exiting
$ kafkacat -C -b kafka1:9092 -t test-json -e -o beginning | wc
% Reached end of topic test-json [0] at offset 2083334: exiting
2083334 2083334 361388966

$ kafkacat -C -b kafka1:9092 -t test-json -e -o beginning | tail -n 1
% Reached end of topic test-json [0] at offset 2083334: exiting
{"topic":"test","partition":0,"offset":2083333,"tstype":"create","ts":1608192273785,"key":"key","payload":"wcp7XWQ4DyDo0cJdEZctjQ=="}

Thus, the test-json Kafka topic contains 2 M JSON-formatted messages with a uncompressed size of around 360 MiB.

Next, run psql postgresql://localhost:6875/materialize and execute:

DROP SOURCE IF EXISTS data_stream CASCADE;
CREATE SOURCE IF NOT EXISTS data_stream
    FROM KAFKA BROKER 'kafka1:9092'
    TOPIC 'test-json'
    WITH (
        ignore_source_keys=true
    )
FORMAT BYTES;

DROP VIEW IF EXISTS test_view CASCADE;
CREATE MATERIALIZED VIEW test_view AS
    SELECT
      CONVERT_FROM(s.data, 'utf-8')::JSONB AS testcol
    FROM data_stream AS s
    ORDER BY s.data
    LIMIT 1;

Memory usage:

Virtual memory Resident set
peak 17 GiB 14 GiB
after load 17 GiB 5 GiB

Following https://materialize.com/docs/ops/diagnosing-using-sql/#materialize-is-using-lots-of-memory-what-gives and running

select mdo.id, mdo.name, sum(mas.records) as records, sum(mas.batches) as batches
from mz_arrangement_sizes as mas,
     mz_dataflow_operators as mdo
where
    mas.operator = mdo.id and
    mas.worker = mdo.worker
group by mdo.id, mdo.name
order by sum(mas.records) desc;

results in:

 id  |      name      | records | batches
-----+----------------+---------+---------
 564 | Arrange: TopK  |  404939 |      10
 ...

So there is some compaction happening, albeit it might not be perfect in that particular case.

Following your suggestions, we tried adding one message to the input topic:

$ kafkacat -C -b kafka1:9092 -t test -e -o beginning -J | head -n 1 | kafkacat -P -b kafka1:9092 -t test-json -p 0 -k key
% ERROR: Output write error: Broken pipe
$ kafkacat -C -b kafka1:9092 -t test-json -e -o beginning | wc
% Reached end of topic test-json [0] at offset 2083335: exiting
2083335 2083335 361389134

However, that did not change the overall memory usage.

Variant: use DIFFERENTIAL_EAGER_MERGE=1000

Memory usage:

Virtual memory Resident set
peak 14 GiB 12 GiB
after load 14 GiB 0.7 GiB

Arrangement sizes:

 id  |      name      | records | batches
-----+----------------+---------+---------
 328 | Arrange        |     215 |       4
...

This is clearly much better.

Test 2: JSON parsing followed by jsonb_each and Top-K

Since a comment on the GitHub issue suggested that the propagation of monotonicity information might be missing, we revisited our original query, which was more complex than the one reported in the GitHub issue and in fact made use of set-returning functions from the jsonb_* family of functions. Let's therefore have a look at an example making use of jsonb_each:

DROP SOURCE IF EXISTS data_stream CASCADE;
CREATE SOURCE IF NOT EXISTS data_stream
    FROM KAFKA BROKER 'kafka1:9092'
    TOPIC 'test-json'
    WITH (
        ignore_source_keys=true
    )
FORMAT BYTES;

DROP VIEW IF EXISTS test_view CASCADE;
CREATE MATERIALIZED VIEW test_view AS
    SELECT
        each.key,
        each.value
    FROM
        data_stream s,
        jsonb_each(CONVERT_FROM(s.data, 'utf-8')::JSONB) each
    ORDER BY each.value
    LIMIT 1;

Memory usage:

Virtual memory Resident set
peak 13 GiB 11 GiB
after load 13 GiB 10 GiB

Arrangements:

 id  |      name      | records | batches
-----+----------------+---------+---------
 556 | Arrange: TopK  | 4168514 |      12
 ...

Note that the number of records is suspiciously close to twice the number of messages in the data stream.

Setting DIFFERENTIAL_EAGER_MERGE=1000 does not reduce memory usage significantly:

Virtual memory Resident set
peak 20 GiB 16 GiB
after load 20 GiB 10 GiB

Nor does it reduce the arrangement size:

 id  |      name      | records | batches
-----+----------------+---------+---------
 556 | Arrange: TopK  | 4168493 |       4
...

The graph returned by http://localhost:6875/memory can be found here.

Observations

ruchirK commented 3 years ago

Thanks @pikulmar for the very detailed and extremely helpful report. I'll let @frankmcsherry respond as to the exact semantics of differential eager merge but I think I see the issue with the monotonicity propagation for table valued functions and should have a fix for that very soon. I can verify from the linked graph that we are not rendering a monotonic topk in the view with jsonb_each

frankmcsherry commented 3 years ago

Thanks for the detailed comments! Quick thoughts:

  1. The DIFFERENTIAL_EAGER_MERGE=1000 is I think something we set-up in our docker images. If we don't that it is an error. There is some discussion on branding these things differently and making them easier to find (they feed in to the underlying compute runtime, not Materialize). There is a trade-off; the larger the number the more "dead time" the system will spend on compaction. From your point of view, a higher value will ensure that things go back down to zero faster, at the cost of more compute consumed as the task runs (so, if you want to minimize latency and don't care about footprint a small value is better).

  2. Thanks for the observation. I'll forward that along. The philosophy of the underlying compute layer is that it should process all data presented to it, and that one should slow down ingestion if that number is getting too high. We'll probably need a better story than that, but it might be the sort of thing that makes sense for us only once we are in charge of managing the input as well.

    Setting --logical-compaction-window to something like 10ms should also reduce the standing memory footprint (part of what you are likely seeing is us maintaining a 1min history for you, for which it is correct to spike up because we've been instructed to retain enough data to go back 1min in the past; this drops as the 1min passes and the compacted data is sufficient). I'd give this a try if you haven't, before trying to throttle the input to keep the footprint lower.

    Edit: There is some information here about memory: https://materialize.com/docs/ops/deployment/ that seems to be the sort of place we should explain the environment variable, but I think there is some hesitation as it isn't a Materialize thing. But, that's where you should be able to check about how to run Materialize with better memory properties (e.g., --logical-compaction-window gets mentioned there).

  3. I'm afraid this will be a problem for a while.

  4. Yup. Pretty easy to fix. https://github.com/MaterializeInc/materialize/pull/5085 should improve the situation, once it lands!

pikulmar commented 3 years ago

Thanks a lot for the super-quick and effective response @frankmcsherry ! Finally managed to have a coarser read through https://people.csail.mit.edu/malte/pub/drafts/2019-kpg.pdf. Although I am not an expert in that domain, I enjoyed the read and was also happy to see that we have apparently both spent some time at ETH Zurich.

A few comments on your comments:

  1. Apparently the docker images do not contain this setting, at least materialized behaves differently when the environment is not specified explicitly. Thanks for explaining the trade-offs related to this setting.

  2. Thanks for the suggestion to use --logical-compaction-window as a means of controlling peak memory usage. We still have to test this. However, from a users perspective, the issue mainly appears when catching up with Kafka topics after a cold start and in such a scenario having back-pressure imposed by system memory usage might be the best solution.

    There is however still the issue of the non-decreasing virtual memory usage. Taking C++ experience as a reference, and without having checked the source code, one might speculate that there is a pooled allocator somewhere which perhaps zeros free'd memory (so that the system can unmap any physical pages and RSS decreases), but does not actually free the pool memory / return it back to the system. This might of course happen if almost-no chunks are empty due to fragmentation...

  3. This might actually be relatively easy to implement. If using a pooled allocator, as an user-selectable alternative to allocating new chunks for the pool via the usual mechanism, the allocator would need to create a file in some user-specified directory and map that into its virtual address space.

  4. Awesome, I can confirm the memory footprint now behaves as we had expected, at least for the test example given above!

frankmcsherry commented 3 years ago

Thanks for the feedback. The allocator is probably jemalloc (depends a bit on the system; I think it varies from Linux to OSX), and I'll pass a pointer to these comments back to folks a bit more familiar. Traditionally the allocator and the system end up fighting a bit when we want to aggressively return memory to the system: we end up with disproportionate kernel CPU as memory is too often returned to and allocated from the kernel. Anyhow, it's something we can look into, and probably something that can be piloted with jemalloc env vars.

frankmcsherry commented 3 years ago

However, from a users perspective, the issue mainly appears when catching up with Kafka topics after a cold start and in such a scenario having back-pressure imposed by system memory usage might be the best solution.

We have an issue open about this, but a different answer to this is that on cold start we should possibly not assign distinct timestamps. The reason for the excessive memory is because the answers change so much, because on load we are assigning many distinct timestamps. Instead, we should be able to assign all data in the initial load the same timestamp, which should result in a peak memory utilization that is the same as the conclusion.

Lemme find that issue / WIP PR and link them.

pikulmar commented 3 years ago

However, from a users perspective, the issue mainly appears when catching up with Kafka topics after a cold start and in such a scenario having back-pressure imposed by system memory usage might be the best solution.

We have an issue open about this, but a different answer to this is that on cold start we should possibly not assign distinct timestamps. The reason for the excessive memory is because the answers change so much, because on load we are assigning many distinct timestamps. Instead, we should be able to assign all data in the initial load the same timestamp, which should result in a peak memory utilization that is the same as the conclusion.

Lemme find that issue / WIP PR and link them.

Yes, that sounds like a promising approach indeed. Since we are at it, I hope it OK to ask two related questions:

frankmcsherry commented 3 years ago
  1. This is a not-unreasonable ask, but it brings some other policy questions with it, mostly like you observe, that some important properties like monotonicity may not hold, and so what should the behavior be. We used to have this behavior, and enforced that the timestamps increased (any timestamp regressions were just brought forward). The main pain point here was that we couldn't know if timestamps would strictly increase, and so after an update at time we couldn't produce the outputs for time until something else happened, advancing the time beyond time. The observed behavior was the system failing to load the last hunk of a batch load, until changes started to flow through it (and only as long as changes flowed). Perhaps there is a fix, but Kafka didn't seem to record a durable "system timestamp has increased to time" message that we would need.

  2. Do you mean by "subset of view versions" something like "set of prefixes by time"? If so, it's mechanically feasible (round all times "up" to the next version of interest) but currently only possible with the BYO or CDCv2 inputs, where you explicitly specific the timestamps for input data, and probably not pleasant to do in either case. I can file an issue about it, but if you have more detail about the feature it would be great if you would be willing!

pikulmar commented 3 years ago
  1. Kafka supports transactions. The idea was to be able to tell Materialize that source/message timestamps are non-decreasing (any decrease could be treated as error) within transactions and strictly increasing across transactions. Then, source message timestamps can be used as logical timestamps. A timestamp is closed whenever (i) the source message timestamp increases or (ii) a Kafka transaction commit/end marker is encountered. Such a scheme should work well for both, (possibly back-filled) historical data as well as transactionally published real-time data.

  2. If Materialize can use message timestamps as logical timestamps, one could ask things like "What did this view look like AS OF yesterday?".

    Some ideas how such a feature could look:

    • Based on my current intuition for how things work, one possibility would be to support CREATE MATERIALIZED VIEW historical_view AS SELECT * FROM other_view AS OF historical_timestamp, which would translate into some "magic" trace handle to the arrangement holding the contents of other_view, which prevents the data for the corresponding view version from being compacted until a user drops historical_view. However, there would be a race between creating historical_view and the data-flow for other_view, so the UX part needs more thinking.

    • Alternatively, support CREATE MATERIALIZED VIEW historical_view AS <query> UNTIL historical_timestamp_frontier and provide a way to wait for the view to be "ready". The data flow for the view would then stop and the user could do whatever they need and use some kind of ALTER VIEW statement to manually advance historical_timestamp_frontier afterwards. This is probably not very good in terms of performance/efficiency.

    • Alternatively, ensure that every committed Kafka transaction results in a view update being output via TAIL or some CDC-style sink. Issue here is that often the entire view snapshot would be required, so the user would still need to write code for re-materializing the view versions from the CDC data. Optionally emitting entire snapshots to a specified sink would address this concern.

frankmcsherry commented 11 months ago

This issue is almost certainly fixed (over the course of a few units of work). There is a lean implementation of TopK on append-only sources, reflected in the plan variant TopKPlan::MonotonicTopK.