coiled / feedback

A place to provide Coiled feedback
14 stars 3 forks source link

Memory limit not properly set / VM memory not fully available to worker processes #185

Closed fjetter closed 2 years ago

fjetter commented 2 years ago

Starting a default Coiled cluster starts VMs with ts.medium EC2 instances which come equipped with 4.0 GiB

Inspecting the Worker.memory_limit I can see that it is set to about 3.78GiB, i.e. roughly about 95% of the available memory which is reasonable setting.

Executing tasks on the platform that take a lot of memory experience a very weird behaviour. For this example, I am submitting a simple task called allocate_some_memory which creates a large byte string with ~3GiB of memory. Afterwards I am measuring the RSS memory of the process, sleep for a couple of seconds and return the RSS value. The script below prints the RSS utilization based on the memory limit (i.e. the ~3.8GiB not the full 4GiB) for a couple of value and measures it's runtime.

import math
from coiled import Cluster
from distributed import Client
import time

cluster = Cluster()
client = Client(cluster)

def allocate_some_memory(num_gib, collect_rss=True):
    # This function simply allocates a bit of memory in the local scope, holds
    # it for a couple seconds and releases it again
    multiplier = int(math.floor(1024**3 * num_gib))
    mem = b" " * multiplier
    import psutil
    if collect_rss:
        rss = psutil.Process().memory_info().rss
    else:
        rss = -1
    import time
    time.sleep(5)
    return rss

client.restart()
mem_limits = client.run(lambda dask_worker: dask_worker.memory_limit)
limit = next(iter(mem_limits.values()))

for mult in [3.0, 3.01, 3.02, 3.03, 3.04, 3.0, 3.05]:
    start = time.time()
    measured_rss = client.submit(allocate_some_memory, mult, False).result()
    end = time.time()
    print(measured_rss)
    print(f"Utilizing {measured_rss / limit:.2f}% of RSS with {mult}GiB and took {end - start:.2f}s to run")

Expected behavior

I would expect all the futures to finish in <10s without an error. No swapping, no oom killer.

What actually happens

This is the output I receive when using a default coiled cluster and executing the above script. We can see that the first couple of attempts return within 7-8s. Accounting for a network roundtrip (I'm in EU using us-east cloud), the memory allocation, psutil call, etc. that's perfectly fine. We're utilizing about 82% of memory all things considered. We iterate in 10MiB steps and once we hit 3.03GiB the runtime drops significantly. It's been theorize on other issues that linux kernel swap may be enabled on our VMs which would explain a slowdown when coming close to the memory limit. However, at 82% this is way sooner than anticipated. Ideally, there should be no OS swapping at all. It is not confirmed whether this is actually due to swapping. Stepping the threshold again up 10MiB this issue becomes even worse and this call already takes about 5min. This time is apparently spent entirely in the psutil call since this artifact vanishes when not collecting the RSS memory, if that means anything to anybody.

What's even more interesting is that once I go to 3.05GiB, the entire thing crashes with a KilledWorker exception. We're nowhere close the the actual VM memory threshold nor are we close the the configured worker memory limit.

Looking into the logs of the worker, there is a kernel level log python invoked oom-killer with some memory stats.

3319492608
Utilizing 0.82% of RSS with 3.0GiB and took 7.56s to run
3318157312
Utilizing 0.82% of RSS with 3.01GiB and took 7.78s to run
3323457536
Utilizing 0.82% of RSS with 3.02GiB and took 7.83s to run
3330027520
Utilizing 0.82% of RSS with 3.03GiB and took 127.52s to run
3342434304
Utilizing 0.82% of RSS with 3.04GiB and took 497.69s to run

# Again a "low memory" call with 3.0 to make sure this is not a weird buildup.
# This returns in about the same time as the first one

3304460288
Utilizing 0.82% of RSS with 3.0GiB and took 7.40s to run

# ... afterwards :boom:

Example cluster address https://cloud.coiled.io/dask-engineering/clusters/48755/details

Kudos to @hendrikmakait to finding this one

cc @shughes-uk @ntabris @dchudz @gjoseph92

shughes-uk commented 2 years ago

System services occupy a bunch of memory, not sure if its that much but I could see it being a gigabyte.

fjetter commented 2 years ago

System services occupy a bunch of memory, not sure if its that much but I could see it being a gigabyte.

A gigabyte seems to be a lot but I'm not an expert here.

Either way, the dask worker memory limit needs to be set to a realistic value and describe the amount of memory we can actually utilize. If that's significantly wrong, all memory management systems within dask will not work properly which can cause catastrophic failure or deadlocks.

fjetter commented 2 years ago

Iff whatever system services are running are indeed already consuming 20% of our default VMs memory we should probably consider changing the default and/or looking into what is actually eating up that much. I would expect such VMs to run on a relatively slim system

fjetter commented 2 years ago

See also for some additional logs for the time duration where everything appears to be slow https://github.com/coiled/feedback/issues/186

ntabris commented 2 years ago

I have slightly less overhead, gdm is no longer running, but here's what I'm seeing when allocate code hits 3.2GiB. At this point htop mostly stops updating or responding, and it's been like this for a couple minutes.

image

Update:

After maybe six minutes or so, it finished the allocate, dask client returned, memory dropped, VM is responsive again.

fjetter commented 2 years ago

After maybe six minutes or so, it finished the allocate, dask client returned, memory dropped, VM is responsive again.

FWIW I opened another issue for the responsiveness https://github.com/coiled/feedback/issues/186 with some additional logs. The issues are obviously strongly entangled but I figured there may be different root causes

fjetter commented 2 years ago

FWIW With "memory limit" in the title, I am referring memory_limit argument to the Worker class in dask/distributed

ntabris commented 2 years ago

I chatted with @gjoseph92 and he pushed me toward using cgroup to limit container memory. This would be more robust than relying on dask/distributed memory_limit. Any objections?

In particular, I'm grabbing the available memory from /proc/meminfo before we start docker, and setting that as the mem_limit for the container. Dask will then use this value as the memory_limit for worker, e.g.,

In [62]: client.run(lambda dask_worker: dask_worker.memory_limit)
Out[62]: {'tls://10.6.56.128:36755': 3534893056}

Running the example allocate code gives this result:

3342041088
Utilizing 94.54% of RSS with 3.0GiB and took 7.22s to run
3354202112
Utilizing 94.89% of RSS with 3.01GiB and took 7.21s to run
---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
Input In [64], in <cell line: 19>()
     19 for mult in [3.0, 3.01, 3.02, 3.03, 3.04, 3.0, 3.05]:
     20     start = time.time()
---> 21     measured_rss = client.submit(allocate_some_memory, mult, True).result()
     22     end = time.time()
     23     print(measured_rss)

File ~/mambaforge/envs/coiled-runtime-004/lib/python3.9/site-packages/distributed/client.py:280, in Future.result(self, timeout)
    278 if self.status == "error":
    279     typ, exc, tb = result
--> 280     raise exc.with_traceback(tb)
    281 elif self.status == "cancelled":
    282     raise result

KilledWorker: ('allocate_some_memory-3bc7c88da288fb5fa9c47e69ddfcd1a0', <WorkerState 'tls://10.6.56.128:46469', name: bob-2ba043d8-d-worker-d40e7c5c11, status: closed, memory: 0, processing: 1>)

The worker gets killed and restarted, as you can see in the worker logs:

2022-08-15 22:51:30,321 - distributed.core - INFO - Starting established connection
2022-08-15 22:51:32,260 - distributed.worker_memory - WARNING - Worker is at 95% memory usage. Pausing worker.  Process memory: 3.13 GiB -- Worker memory limit: 3.29 GiB
2022-08-15 22:51:32,261 - distributed.worker_memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 3.13 GiB -- Worker memory limit: 3.29 GiB
2022-08-15 22:51:32,264 - distributed.worker_memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 3.13 GiB -- Worker memory limit: 3.29 GiB
2022-08-15 22:51:32,305 - distributed.worker_memory - WARNING - Worker tls://10.6.56.128:46289 (pid=245) exceeded 95% memory budget. Restarting...
2022-08-15 22:51:32,435 - distributed.nanny - INFO - Worker process 245 was killed by signal 15
2022-08-15 22:51:32,441 - distributed.nanny - WARNING - Restarting worker

Presumably dask won't always catch this and sometimes the container will die. We don't currently restart the container, so the worker instance will then die.

We don't always want containers to restart when they get non-zero exit code, since sometimes this means we failed with installing/starting distributed. But it probably is fine to restart worker container, at least a couple times. If we have install/start failure, this will happen to scheduler, and when scheduler dies, we stop the rest of the cluster.

shughes-uk commented 2 years ago

Very nice! Thanks @ntabris

ntabris commented 2 years ago

Here's the internal MR for cgroup change: https://gitlab.com/coiled/cloud/-/merge_requests/4926

There may also be work that could/should be done to tweak things so there's less memory overhead of the non-dask stuff running on system, but that's less urgent / more incremental (and also there will be proportionately less overhead on larger instances, we we're now using as default).

fjetter commented 2 years ago

he pushed me toward using cgroup to limit container memory. This would be more robust than relying on dask/distributed memory_limit. Any objections?

That would be what is discussed in https://github.com/coiled/product/issues/5 I'm OK with this if the Worker.memory_limit is also properly set while doing so. Just to reiterate, this measure is not merely there to terminate the worker process but also to control a couple of internal system that would not work properly if that number is off (spill to disk, pausing, scaling, dashboard, ...), i.e. there are a couple of issues working at the same time. Setting the cgroup and reading it out in dask likely fixes both problems at the same time

Judging by the logs in https://github.com/coiled/feedback/issues/185#issuecomment-1215959972 this works as expected.

Is there a way to test this? Is this already deployed/live on staging?

ntabris commented 2 years ago

the Worker.memory_limit is also properly set while doing so

If Worker.memory_limit isn't explicitly set, it will now be set (by the distributed code) to the cgroup memory limit rather than system memory. Good?

Is there a way to test this? Is this already deployed/live on staging?

Not yet, I'm currently building/pushing new AMI to staging and once that's done it should be testable. I'll comment when it's ready to test in staging.

fjetter commented 2 years ago

to the cgroup memory limit rather than system memory. Good?

Probably, yes. I'd love to take this for a spin once it is deployed somewhere. No rush

ntabris commented 2 years ago

I'd love to take this for a spin once it is deployed somewhere. No rush

Deployed and just tested on staging; let me know whether this seems good / you have any concerns.

fjetter commented 2 years ago

Thanks this is already a much better experience!

What exactly did we now configure for memory_limit? is this connected to any dask configuration? I noticed that the worker process is killed whenever it reaches ~95%. is this connected to the worker.memory.terminate threshold configured in distributed.yaml?

ntabris commented 2 years ago

Currently the Coiled platform isn't doing anything with memory_limit. Given the relevant distributed code I think this means it will now use the cgroup value for memory_limit rather than the total memory.

The user should be able to set a lower value if desired, and that should work, though I haven't tested this.

whenever it reaches ~95%. is this connected to the worker.memory.terminate threshold configured in distributed.yaml?

That's what I'm assuming but haven't specifically tested this.

hendrikmakait commented 2 years ago

This looks pretty good, the tensordot_stress in coiled/coiled-runtime#229 now produces these logs:

...
Aug 17 11:15:27 ip-10-0-10-241 cloud-init[1124]: 2022-08-17 11:15:27,800 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.83s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
Aug 17 11:15:37 ip-10-0-10-241 cloud-init[1124]: 2022-08-17 11:15:37,158 - distributed.worker_memory - WARNING - Worker is at 90% memory usage. Pausing worker.  Process memory: 2.92 GiB -- Worker memory limit: 3.22 GiB
Aug 17 11:15:38 ip-10-0-10-241 cloud-init[1124]: 2022-08-17 11:15:38,664 - distributed.worker_memory - WARNING - Worker tls://10.0.10.241:40379 (pid=26) exceeded 95% memory budget. Restarting...
Aug 17 11:15:38 ip-10-0-10-241 cloud-init[1124]: 2022-08-17 11:15:38,780 - distributed.nanny - INFO - Worker process 26 was killed by signal 15

It looks like we're finally triggering the desired Dask behavior. Running out of memory in this test is likely due to dask/distributed#6208, so nothing the changes discussed here could resolve.

fjetter commented 2 years ago

I've been told, this is now on prod. Closing. Thanks @ntabris and @shughes-uk !