MaterializeInc / materialize

The data warehouse for operational workloads.
https://materialize.com
Other
5.67k stars 458 forks source link

"Parallel Workload (cancel)" is flaky due to OOM #22228

Closed nrainer-materialize closed 9 months ago

nrainer-materialize commented 9 months ago

Buildkite link

https://buildkite.com/materialize/nightlies/builds/4639#018b042e-2007-4dee-bed8-41dbd2247f56

Relevant log output

Exception in thread Exception in thread worker_29:
Exception in thread worker_11:
Exception in thread worker_13:
Exception in thread worker_17:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages
Exception in thread worker_0:
Exception in thread worker_2:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages
Exception in thread worker_8Exception in thread worker_12:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages
Exception in thread
cancelTraceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages
Exception in thread :
Exception in thread worker_1:
Exception in thread worker_24:
:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages
worker_31:
Traceback (most recent call last):
Exception in thread worker_23:
Exception in thread worker_15:
Traceback (most recent call last):
Exception in thread worker_5:
Exception in thread Traceback (most recent call last):
Exception in thread worker_3:
  File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages
Exception in thread worker_27:
Exception in thread   File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages
  File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages
Traceback (most recent call last):
worker_25:
Traceback (most recent call last):
Exception in thread Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages
Exception in thread worker_20:
Traceback (most recent call last):
worker_26    Traceback (most recent call last):
Exception in thread worker_4:
Traceback (most recent call last):
:
  File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages
    worker_28:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages
code, data_len = ci_unpack(self._read(5))
  File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages
    code, data_len = ci_unpack(self._read(5))Exception in thread worker_30  File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages
    worker_9Traceback (most recent call last):
    Exception in thread worker_10:
Traceback (most recent call last):
:
code, data_len = ci_unpack(self._read(5))  File "/usr/local/lib/python3.11/dist-packages/pg8000/core.py", line 805, in handle_messages

Additional thoughts

I have seen further occurrences. I will link them if I can find them or encounter new ones.

def- commented 9 months ago

Cancelled queries seem not to be cleaned up properly and are thus leaking memory. I'll try to create a smaller repo. Edit: Couldn't find a small repo that works. This way I can get my 64 GB system to go OoM: bin/mzcompose --find parallel-workload down && bin/mzcompose --find parallel-workload run default --seed=1696582954 --threads=32 --runtime=3000 --complexity=ddl --scenario=cancel Edit2: This is not as reproducible as I hoped either. Not sure what to do with this item. In my opinion we are missing ways to easily see what query is even consuming the memory.

def- commented 9 months ago

Found a reproducer:

import pg8000
import threading
import time

def run_cancel(pid: int) -> None:
    conn = pg8000.connect(host="127.0.0.1", port=6875, user="materialize", database="materialize")
    with conn.cursor() as cur:
        while True:
            time.sleep(10)
            print("cancelling query")
            cur.execute(f"SELECT pg_cancel_backend({pid})")

conn = pg8000.connect(host="127.0.0.1", port=6875, user="materialize", database="materialize")
with conn.cursor() as cur:
    conn.autocommit = True
    cur.execute("drop table if exists t1")
    cur.execute("create table t1 (c int)")
    cur.execute("insert into t1 select generate_series(1, 10000)")
    conn.autocommit = False
    cur.execute("SELECT pg_backend_pid()")
    pid = cur.fetchall()[0][0]
    threading.Thread(target=run_cancel, args=(pid, )).start()
    while True:
        try:
            print("running query")
            cur.execute("select * from t1, t1 as t2 limit 1")
        except pg8000.dbapi.ProgrammingError:
            cur.execute("rollback")

This keeps consuming memory for me until OoM. What I believe is happening is that clusterd already accepts the new query and starts working on it, before having cleaned up the remaining work from the cancelled queries before. Since there are always new queries coming in immediately the cleanup will never happen. Without the cancellations the memory consumption stays much longer since the cleanup in clusterd happens before the next query is accepted. (try without threading.Thread(target=run_cancel, args=(pid, )).start() line.

What should happen ideally is that we cancel queries immediately (https://github.com/MaterializeInc/materialize/issues/2392) and clean up after them. But until that is implemented I'm wondering if there is a problem with the currently used approach and could we improve it?

antiguru commented 9 months ago

Unfortunately, it's not easy to cancel queries at the moment. Some operators can be terminated before their input frontiers advance to the empty frontier, but most will run to completion, which causes compute to do the work to produce results even though the query was cancelled.

There's an experimental drop_dataflow API that would give us eager dropping, but we haven't had the change to validate it actually is correct in all situations.

teskje commented 9 months ago

FYI, this is the PR that makes compute use drop_dataflow (behind a feature flag) and would probably solve this issue: https://github.com/MaterializeInc/materialize/pull/18442. We mostly need to add tests to ensure both ways of dropping a dataflow are covered (since we might need the token-based way again down the road), and then let it bake in staging for a while. But dataflow dropping issues haven't come up in a while so we haven't prioritized this.