dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 715 forks source link

Unmanaged Memory Leak with Large Parquet Files (Dask + Distributed) #8377

Open seydar opened 7 months ago

seydar commented 7 months ago

Describe the issue:

Not sure if this requires a separate bug report from #8375, but just in case, I can get the same error with plain old dask + distributed if the parquet data doesn't fit in memory.

When I try to read in 30 parquet files (~150 MB each) into 2 worker nodes (m5.large, 8 GB RAM, 500 GB disk each), there is a runaway unmanaged memory process that consumes all of the memory and causes the dask worker to be restarted. Eventually, after crashing on each of the 4 workers (2 nodes, 2 vCPUs each), dask gives up.

The process that is consuming all of the memory: /usr/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=11, pipe_handle=17) --multiprocessing-fork

I now know that that's the mark of a forked process in Python, but I'm unable to figure out where in the code this occurs.

Minimal Complete Verifiable Example:

import sys
import dask
import dask.distributed
from   dask.distributed import Client
import dask.dataframe as dd

print(f"connecting to {sys.argv[1]}")
client = Client(sys.argv[1])

manifest_files = ['s3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_0934fdc9-1bc7-4dae-a371-c3f58f3b31fc',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_7011b49a-bf6c-42d0-9852-85aa10a3ad37',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_8ca98d70-74c2-4d17-b059-83cd25e398c0',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_61265893-e992-4026-8b69-e60a4641dd10',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_d4318a64-7fc0-4b59-8f85-8b1790e72a70',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_4e2e19a8-b360-49fe-9b82-f66a1e23ad3e',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_c5a77760-f078-432c-aaf5-6d99fb0cee0c',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_db5f90f3-9fc0-4b86-98da-d840bb9b5423',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_2472c560-1113-448b-ae18-e23032d3f3d8',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_551944ef-47f4-475d-a575-e209ca1ee7b4',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_67569166-18f9-40bd-9845-20f069f8dc8a',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_8a81dca0-767d-43bf-b0aa-a087f98d41c5',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_c9dd6a02-0f92-4202-ba70-77fda02d4acf',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_9f094026-03f4-4ec2-a348-0498eeb6b04d',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_2c20c942-1e2c-4f37-86ec-4adce83edbea',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_237d6c26-8372-4584-ae15-9f693d2295a6',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_cf41cbf5-eb46-4bb9-b904-f1518affbefa',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_420f8434-5a71-4af8-91b5-d5364c941ded',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_f155616d-39bc-483e-b19a-56da9fbae685',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_ef8a7770-43b7-44bb-9dde-f92ed8faae9b',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_8aa7de20-91ee-4ada-8f48-c7a5b313572f',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_7c265580-ada5-4b94-b818-5cebdb4bb6c6',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_13562caf-4578-4e36-83fe-8b7a5eabc7e8',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_b0f80cd3-4aa5-40ac-a0c8-632c763f8969',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_2f1e190b-621c-4638-91f1-e961ba674714',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_5dcd43e4-67bd-41ba-b3cd-7e8f8da9b1f9',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_293c6916-4f22-4ca0-a240-078ebe48368b',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_80e8c47e-f254-47c4-80af-d744885e8174',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_6b1ba112-0545-4c3f-9321-dbf597d98bc1',
                  's3://my-bucket/results/0.12892323498922598/20231127_175506_00007_3735a_0b3dd1a8-72ef-4852-bf1d-f7091629d3b6']

try:
    df    = dd.read_parquet(manifest_files)

    divisions   = list(range(0, 10001))
    df          = df.set_index('agent', divisions=divisions)

    dp = df.persist()

    print(dp.count().compute())

finally:
    client.close()

Anything else we need to know?:

I'm running this on AWS. The dask cluster is handmade (via boto3), where my architecting script runs dask worker and dask scheduler on everything. The cluster seems to be correct because all other operations work, and the cluster is able to handle a 52 GB CSV file just fine — it only fails with parquet.

Environment:

Dask version: 2023.11.0 Python version: 3.10.12 Operating System: Ubuntu 22.04.3 LTS Install method (conda, pip, source): pip

Logs: worker1.log, worker2.log, scheduler.log

fjetter commented 7 months ago

If your parquet files are 150MB on disk, chances are that they are easily 1GB in memory if not more and there are two threads per worker running loading this data. During deserialization, it is very possible that it requires more than that in peak memory.

Eventually, after crashing on each of the 4 workers (2 nodes, 2 vCPUs each), dask gives up.

If you start dask worker twice on the nodes, you'll have two workers with two CPUs each, i.e. four threads loading data. You should only call this command once per node.

Instead of doing this yourself, I recommend using a ready solution. See https://docs.dask.org/en/stable/deploying.html#cloud for some suggestions for cloud deployments. There are open source solutions like Dask Cloud Provider and hosted solutions like Coiled (which has a generous free tier; disclaimer: I work for Coiled)

After the data loading comes a shuffle that will also require some memory.

Given all this, it is not unexpected that your workers are running out of memory and I strongly recommend using larger and/or more workers to process your data and only launch one worker per node. There is always some overhead involved (e.g. the python process alone often requires a couple hundred MB) and cutting it too close is often not a good idea (and not cost efficient)

seydar commented 7 months ago

Cluster Creation

In another situation, I would love to work with coiled (in particular, I appreciate the blog posts you put out and that you contribute back to dask, which makes me VERY much want to make use of it as a paying user. Unfortunately, that's not an option right now). I should nonetheless try this out with the free tier of coiled (if only to verify that the bug still exists).

EC2Cluster was giving me a lot of trouble when I would try to specify the profile, despite the example on the page, and I sort of gave up and just went to my own custom solution since I already had a library for it written. I'll give it another go to hopefully reproduce this bug.

Workers per Instance

Sorry for the confusion: I'm only calling dask worker once per EC2 instance, but since each instance has 2 vCPUs, it then creates a total of 4 workers loading data across the cluster of 2 instances (2 workers per node).

pyarrow vs csv

The other thing that makes me think it's a pyarrow issue is that this cluster setup works perfectly with CSV data.

Questions

Questions:

  1. What's the best way to calculate how much memory is required for a node in the cluster? Shouldn't it spill over onto disk? From watching the workers page on the dask dashboard, it looks like it's moving ~300 MB chunks, which makes me think that it shouldn't be a problem.
  2. What kind of proof would I need to provide to show that dask is handling parquet files differently than csv?
  3. The leak appears to be coming from the spawned process mentioned above. How can I figure out what that spawned process is doing? It grows until it consumes all of the memory and then restarts and does it again.

My Actions

  1. Provide an MRE using the EC2Cluster class

Related Links

Here are some links where people are whispering about similar issues — none of these are proof, but they at least show me that I'm not crazy.

seydar commented 7 months ago

Reproducing the Error with EC2Cluster

I am able to reproduce the error with EC2Cluster as well. Source code and logs are attached. I'm not as familiar with using EC2Cluster, so I'm not sure how to get better, more detailed logs.

import sys
import dask
from   dask_cloudprovider.aws import EC2Cluster
import dask.dataframe as dd
import dask.distributed
from   dask.distributed import Client
import configparser
import os
import contextlib

def get_aws_credentials():
    parser = configparser.RawConfigParser()
    parser.read(os.path.expanduser('~/.aws/credentials'))
    credentials = parser.items('default')
    all_credentials = {key.upper(): value for key, value in credentials}
    with contextlib.suppress(KeyError):
        all_credentials["AWS_REGION"] = all_credentials.pop("REGION")
    return all_credentials

env = get_aws_credentials()
env['EXTRA_PIP_PACKAGES'] = 's3fs'

cluster = EC2Cluster(env_vars  = env,
                     n_workers = 2,
                     instance_type = 'm5.large',
                     subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a
                     security_groups = ["sg-0f9e555954e863954",    # ssh
                                  "sg-0b34a3f7398076545",    # default
                                  "sg-04cd2626d91ac093c"],   # dask (8786, 8787)
                     ami = "ami-0399a4f70ca684620",
                     key_name  = "my-key",
                     security = False,
                     iam_instance_profile = {'Name': 'Minerva'})

client = Client(cluster)

manifest_files = ['s3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_90fc7ac6-1be8-42a3-8485-f5fda039a23b',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_3fd296d8-4d16-4491-a950-d5e4b0fff172',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_19afa54e-ed8f-4da5-a575-a2f7acd02399',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_0240d219-11d8-4965-9425-ed2cbe9f0984',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_657a9eee-2db8-4b55-8c2b-c98c67eb2992',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_4fb5f559-ac3e-4d0b-bff6-e90aefcecff7',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_a7b3bdfa-aa06-4c7b-b010-f0c59d925051',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_444110c5-885d-4369-9cbd-bb842383baa7',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_921dc55e-8d94-4f91-b71b-ec1b45ff999f',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_dc3fafb8-8d8a-432c-a9f4-386332b7720c',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_9b8c79b9-d8a6-487e-a10b-6d65dae9daff',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_26fe7c8b-15f4-419f-a7c7-c87461f1b69c',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_45c5c82b-befc-4b0c-97fa-673de0feb9dd',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_aca904de-b154-4f56-b255-b71de0be3060',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_9a131e04-2353-44f1-9c13-7c1b381ca553',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_66e9e85d-406b-4164-87f3-3ffbe4ff9162',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_e32f16e7-591e-4da4-a2b1-4b00ac4cb617',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_88eb15ea-9278-4e54-978b-9c211a4b834f',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_71b697fe-3da2-4c5b-a046-e5773b494e7b',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_0454e926-10fb-4af4-82ea-082e6bdb7c5c',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_6ee2d3b4-a837-419f-b181-53a127a791e3',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_abfdb890-64d6-4e05-adf6-c020633fb1ab',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_ee27888e-c3fa-4731-a75b-b2f20efcafc3',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_c0d5978a-a66a-4faf-9d5b-5f5ebd7e7311',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_bdf24af9-bdca-467c-abca-04e215eb190c',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_0dddbf5f-ce5d-4685-8361-cda906bef37c',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_a6800c32-4790-4e77-bfdf-7900ed44097a',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_0d0913a9-9f82-4418-9450-4cbf364ca9fb',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_ab1a2407-b7a4-4d69-9539-a05391945149',
                  's3://my-bucket/results/0.1664769366230633/20231129_180352_00031_3yekz_bcfde315-905f-475c-a3c3-3a3556e53fe4']

try:
    print("read parquet from athena")
    df    = dd.read_parquet(manifest_files)

    print("partitioning")
    divisions   = list(range(0, 10001))
    df          = df.set_index('agent', divisions=divisions)

    print("persisting")
    dp = df.persist()

    print("memory usage")
    print(dp.get_partition(400).memory_usage())

    print("count()")
    print(dp.count().compute())

finally:
    print("closing client")
    client.close()

cluster.close()

Output (STDERR + STDOUT)