dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Unmanaged memory because of block splitting in pandas #7800

Open phofl opened 1 year ago

phofl commented 1 year ago

Describe the issue:

pandas started splitting blocks in 2.0 to improve performance of setitem when a full column is replaced. This keeps unused data in memory.

Minimal Complete Verifiable Example:

import dask.array as da
import dask.dataframe as dd

# Create columns with 400MB each
ddf = dd.from_array(da.random.random((50_000_000, 10)), columns=list("abcdefghij"))

ddf["b"] = 1
# ddf = ddf.rename(columns={"a": "x"})
ddf.persist()

cc @crusaderky we chatted offline about this last week. Anything we can do here? Should this be counted as managed memory? Rename triggers a deep copy before we persist, which brings the unmanaged memory down.

Anything else we need to know?:

Environment:

crusaderky commented 1 year ago

This keeps unused data in memory.

Reproduced. Indeed we need to fix this (through intelligent deep-copy?)

import gc
import time
import pandas
from distributed import wait, Client
import dask.array as da
import dask.dataframe as dd

client = Client(n_workers=1)

def dump_mem(label):
    client.run(gc.collect)
    time.sleep(3)  # Wait for memory to settle and for heartbeat
    print("=" * 80)
    print(label)
    print(client.run_on_scheduler(lambda dask_scheduler: dask_scheduler.memory))

print(pandas.__version__)
dump_mem("Empty cluster")

ddf = dd.from_array(da.random.random((500_000_000, 3)), columns=list("abc"))

ddf = ddf.persist()
wait(ddf)
dump_mem("Original dataframe")

ddf["b"] = 1
ddf = ddf.persist()
wait(ddf)
dump_mem("After setitem")
2.0.1
================================================================================
Empty cluster
Process memory (RSS)  : 129.17 MiB
  - managed by Dask   : 0 B
  - unmanaged (old)   : 74.61 MiB
  - unmanaged (recent): 54.56 MiB
Spilled to disk       : 0 B

================================================================================
Original dataframe
Process memory (RSS)  : 11.33 GiB
  - managed by Dask   : 11.18 GiB
  - unmanaged (old)   : 68.00 MiB
  - unmanaged (recent): 86.85 MiB
Spilled to disk       : 0 B

================================================================================
After setitem
Process memory (RSS)  : 15.05 GiB
  - managed by Dask   : 11.18 GiB
  - unmanaged (old)   : 68.00 MiB
  - unmanaged (recent): 3.81 GiB
Spilled to disk       : 0 B
crusaderky commented 1 year ago

Indeed we need to fix this (through intelligent deep-copy?)

Let me expand. Historically, dask has had this exact problem with numpy views, and it's been solved draconianly by ensuring that everything is always deep-copied. Now we face the same issue with pandas.

I can see three options:

Solution 1: Do nothing

dask DataFrames can consume substantially more memory than what sizeof() returns for them. This extra memory will be accounted as unmanaged memory, and it will disappear as soon as the task is either transferred over the network or is spilled and then unspilled.

User cost

User benefit

Dev cost

Dev benefit

Solution 2: Deep-copy as needed

Ensure dask.dataframe forces a deep copy every time this kind of situation arises; e.g. there is never an invisible piece that is being kept alive. This behaviour is coherent with dask.array.

A pandas API for this (with zero cost when there's no invisible memory) - e.g. pandas.DataFrame.trim_unused_buffers would greatly simplify the implementation.

User cost

User benefit

Dev cost

Dev benefit

Solution 3: :sparkles: The Fancy One :sparkles:

Write a variant of sizeof() that returns two measures:

Track this invisible memory in the SpillBuffer, in the bokeh GUI, and in Prometheus. Encapsulate heuristics in the SpillBuffer so that, before you start actually spilling to disk, you instead just do a serialization/deserialization roundtrip to wipe away this extra memory (caveat: this is extremely expensive with object strings; the same pandas API as in point 2 would be very useful)

User cost

None

User benefit

Dev cost

Dev benefit

phofl commented 1 year ago

Another example that produces a lot of unmanaged memory (not sure if this is already know, but this behaviour has been around forever):

ddf = dd.from_array(da.random.randint(1, 100, (length, 16)), columns=list("abcdefghijklmnop"))
ddf1 = ddf["a"].persist()

This might be a bigger problem than the initial example.

The initial example creates unmanaged memory temporarily, but a copy is triggered in most cases as soon as another operation is executed on the modified DataFrame. If the user keeps ddf1 alive for some reason, the whole memory isn't freed

crusaderky commented 1 year ago

Another example that produces a lot of unmanaged memory (not sure if this is already know, but this behaviour has been around forever):

ddf = dd.from_array(da.random.randint(1, 100, (length, 16)), columns=list("abcdefghijklmnop"))
ddf1 = ddf["a"].persist()

O_o I'm very surprised that dask.dataframe doesn't deep-copy like dask.array does

fjetter commented 1 year ago

IIUC this all boils down to dataframe.methods.assign which already has some logic around deepcopying. this likely needs refinement

https://github.com/dask/dask/blob/053c5425e415a84de9fef31d2bf94b8fb9ef477e/dask/dataframe/methods.py#L354