dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Managed memory may be double counted #5868

Open crusaderky opened 2 years ago

crusaderky commented 2 years ago

If a task returns the exact same object or a shallow copy, its managed memory is double counted by the scheduler.

@gen_cluster(client=True, nthreads=[("", 1)])
async def test1(c, s, a):
    x = c.submit(lambda: "x" * 1000, key="x")
    y = c.submit(lambda x: x, x, key="y")
    await wait(y)
    assert a.data["x"] is a.data["y"]
    await asyncio.sleep(2)  # Wait for heartbeat
    assert memory.managed < 2000

AssertionError: assert 2098 < 2000

Impact

distributed.scheduler.MemoryState caps the managed memory to the process memory. So, this issue causes the managed memory to be over-reported and the unmanaged memory to be under-reported, but the total of the two is not affected.

Proposed design

Avoiding double-counting of identical objects is definitely feasible, as long as it is done Worker side.

Detecting shallow copies that partially share memory, in general, is near impossible; however it's possible to implement a special case handling for numpy arrays sharing the same buffer. Unsure if it's worth it though; dask.array slicing always makes sure to deep-copy everything to avoid holding a very large base buffer instead of a tiny slice.

fjetter commented 2 years ago

I believe this is a duplicate to https://github.com/dask/distributed/issues/4186 The other issues focuses primarily on views, not identical objects (which I suspect is a rarer case in the wild)

crusaderky commented 2 years ago

identical objects (which I suspect is a rarer case in the wild)

I am not so sure about this. For dask.array and dask.dataframe you are probably right. For dask.delayed and dask.bag, which heavily rely on user-defined map-reduce functions, I suspect this is a much bigger issue. I do agree that it's a rabbit hole without an exit - e.g. slicing a delayed list is affected by this, and there's no simple solution to it. I still wonder however about the cost/benefit of de-duplicating identical values.

fjetter commented 2 years ago

I still wonder however about the cost/benefit of de-duplicating identical values.

I'm a bit worried about hidden complexity involved. For instance, how to deduplicate mutable objects like lists or dicts? (Obviously using mutable objects as future in-/outputs is not a great idea to start with). Would we require things to be hashable? If so, does this impact performance (does a pandas dataframe implement hash?? 🤔 )?

A hash based deduplication algo would also be interesting for spilling. However, I'm not convinced that this is a very frequent problem to justify this complexity.

crusaderky commented 2 years ago

No, I was simply thinking about id based deduplication.

We could also enhance the SpillBuffer LRU algorithm so that multiple keys with the same value are spilled/unspilled together.

Finally, we could also implement an id-based fingerprint that survives network transfers:

  1. worker B asks to worker A for key x
  2. worker A realises that self.data["x"] is self.data["y"] and sends them both
  3. worker B receives x and y and maps them to the same value

This is a lot more complicated than the previous two, because it necessarily involves informing the scheduler of the alias, so that the AMM can avoid dropping what would otherwise be redundant replicas.

crusaderky commented 2 years ago

No, I was simply thinking about id based deduplication. We could also enhance the SpillBuffer LRU algorithm so that multiple keys with the same value are spilled/unspilled together.

The biggest benefit of these two is that they can be encapsulated in the SpillBuffer or even in zict and they would be completely opaque to the worker and the scheduler.

fjetter commented 2 years ago

No, I was simply thinking about id based deduplication.

maybe paranoid on my end but I don't trust ID based deduplication enough to bake it in.

The biggest benefit of these two is that they can be encapsulated in the SpillBuffer or even in zict and they would be completely opaque to the worker and the scheduler.

I absolutely agree on this. I don't think we should do any cluster-wide deduplication. That sounds awfully complex and prone to errors considering the (likely) little benefit it would give. Iff we go for deduplication on worker side, I agree, it should be implemented in the buffer

crusaderky commented 2 years ago

maybe paranoid on my end but I don't trust ID based deduplication enough to bake it in.

Could you articulate? What could go wrong, other than not being able to deduplicate deep copies and copies that appear after a network round-trip?

fjetter commented 2 years ago

maybe paranoid on my end but I don't trust ID based deduplication enough to bake it in.

Could you articulate? What could go wrong, other than not being able to deduplicate deep copies and copies that appear after a network round-trip?

Never mind. All scenarios I can come up with involving mutable data structures would mess everything up even without deduplication

crusaderky commented 1 year ago

I think this issue needs to be resurrected in light of Pandas 2's option for copy-on-write.

A potential design is to write a variant of sizeof that returns a dict of {id: nbytes} with one element per buffer of the object.

Just like sizeof(), it would be a singledispatch. It would default to {id(obj): sizeof(obj)}. We would need to implement for it support for:

distributed.spill.SpillBuffer would then invoke this new function instead of sizeof and do some straightforward reference counting for the returned id's.

CC @phofl

fjetter commented 1 year ago

I believe I'm still struggling to fully understand the impact. From a pure instrumentation / observability / visualization POV I wouldn't want to increase complexity a lot. If managed memory is larger than RSS, that's OK for me from an instrumentation POV.

crusaderky commented 1 year ago

Without pandas CoW, this issue is quite unimportant, as it would only impact poorly implemented custom functions applied with map_blocks etc, dask.delayed, and handmade graphs. With pandas CoW, it would impact all dask.dataframe workflows.

From an observability perspective, the addition of the managed/unmanaged split in the GUI was a big improvement and I received strongly positive feedback from several users. If we don't correct it, it will be actually worse than before we had it as wrong information will be relayed to the users, causing a great deal of confusion to anybody who's trying to debug high memory usage.

From a decision-making perspective, this is important for AMM ReduceReplicas. If you have short memory spikes caused by task heap, serialization/deserialization, etc., they are normally classified as "unmanaged recent" memory and are ignored by AMM. With pandas CoW, they would be immediately taken into consideration, potentially causing unbalances in memory as the AMM could incorrectly drop replicas from the less saturated workers.

fjetter commented 1 year ago

If we don't correct it, it will be actually worse than before we had it as wrong information will be relayed to the users, causing a great deal of confusion to anybody who's trying to debug high memory usage.

Yes, the definition of unmanaged will be less useful but I'm not convinced that this is worth the additional complexity.

With pandas CoW, they would be immediately taken into consideration, potentially causing unbalances in memory as the AMM could incorrectly drop replicas from the less saturated workers.

Why would this be immediate?

crusaderky commented 1 year ago

With pandas CoW, they would be immediately taken into consideration, potentially causing unbalances in memory as the AMM could incorrectly drop replicas from the less saturated workers.

Why would this be immediate?

Because AMM uses optimistic memory, which is defined as managed memory (latest spot sample) + unmanaged (minimum over the latest 30s worth of samples) - e.g. you optimistically hope anything above it is all heap and will deflate itself as soon as the current tasks are finished and the gc has run.