rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.43k stars 903 forks source link

[QST] OOM issue while loading the 26GB twitter dataset into 128GB GPU memory #11796

Open parkerzf opened 2 years ago

parkerzf commented 2 years ago

Hey I try to load the twitter graph in a AWS p3.16xlarge instance, which has 8 16GB memory GPUs, in total 128GB. However, it is OOM. Could you please take a look if I missed anything? Thanks so much!

import dask
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import dask_cudf
import cugraph
import cugraph.dask as dask_cugraph
from cugraph.dask.common.mg_utils import get_visible_devices
from cugraph.dask.comms import comms as Comms
import time

csv_file_name = "twitter-2010.csv"

with dask.config.set(jit_unspill=True):
    with LocalCUDACluster(n_workers=8, device_memory_limit="16GB") as cluster:
        with Client(cluster) as client:
            client.wait_for_workers(len(get_visible_devices()))
            Comms.initialize(p2p=True)
            chunksize = dask_cugraph.get_chunksize(csv_file_name)
            ddf = dask_cudf.read_csv(csv_file_name, chunksize=chunksize, delimiter=' ', names=['src', 'dst'], dtype=['int32', 'int32'])
            ddf.compute()
            # G = cugraph.Graph(directed=True)
            # G.from_dask_cudf_edgelist(ddf, source='src', destination='dst')

I can't find similar issues, this one got similar errors but it is because LocalCUDACluster is not used.

I used the docker approach to install the rapid frameworks:

docker pull rapidsai/rapidsai-dev:22.08-cuda11.5-devel-ubuntu20.04-py3.9
docker run --gpus all --rm -it \
    --shm-size=10g --ulimit memlock=-1 \
    -p 8888:8888 -p 8787:8787 -p 8786:8786 \
    rapidsai/rapidsai-dev:22.08-cuda11.5-devel-ubuntu20.04-py3.9

The error log:

2022-09-27 13:03:21,520 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-09-27 13:03:21,520 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-09-27 13:03:21,524 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-09-27 13:03:21,524 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-09-27 13:03:21,544 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-09-27 13:03:21,544 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-09-27 13:03:21,554 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-09-27 13:03:21,554 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-09-27 13:03:21,555 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-09-27 13:03:21,555 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-09-27 13:03:21,556 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-09-27 13:03:21,556 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-09-27 13:03:21,597 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-09-27 13:03:21,597 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-09-27 13:03:21,673 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-09-27 13:03:21,673 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
---------------------------------------------------------------------------
MemoryError                               Traceback (most recent call last)
/tmp/ipykernel_5947/1798640855.py in <module>
      9             chunksize = dask_cugraph.get_chunksize(csv_file_name)
     10             ddf = dask_cudf.read_csv(csv_file_name, chunksize=chunksize, delimiter=' ', names=['src', 'dst'], dtype=['int32', 'int32'])
---> 11             ddf.compute()
     12             # G = cugraph.Graph(directed=True)
     13             # G.from_dask_cudf_edgelist(ddf, source='src', destination='dst')

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask/base.py in compute(self, **kwargs)
    313         dask.base.compute
    314         """
--> 315         (result,) = compute(self, traverse=False, **kwargs)
    316         return result
    317 

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask/base.py in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    597 
    598     results = schedule(dsk, keys, **kwargs)
--> 599     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    600 
    601 

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask/base.py in <listcomp>(.0)
    597 
    598     results = schedule(dsk, keys, **kwargs)
--> 599     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    600 
    601 

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask/dataframe/core.py in finalize(results)
    136 
    137 def finalize(results):
--> 138     return _concat(results)
    139 
    140 

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxify_device_objects.py in wrapper(*args, **kwargs)
    167     @functools.wraps(func)
    168     def wrapper(*args, **kwargs):
--> 169         ret = func(*args, **kwargs)
    170         if dask.config.get("jit-unspill-compatibility-mode", default=False):
    171             ret = unproxify_device_objects(ret, skip_explicit_proxies=False)

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask/dataframe/core.py in _concat(args, ignore_index)
    131         args[0]
    132         if not args2
--> 133         else methods.concat(args2, uniform=True, ignore_index=ignore_index)
    134     )
    135 

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask/dataframe/dispatch.py in concat(dfs, axis, join, uniform, filter_warning, ignore_index, **kwargs)
     60     else:
     61         func = concat_dispatch.dispatch(type(dfs[0]))
---> 62         return func(
     63             dfs,
     64             axis=axis,

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxy_object.py in wrapper(*args, **kwargs)
    900         args = [unproxy(d) for d in args]
    901         kwargs = {k: unproxy(v) for k, v in kwargs.items()}
--> 902         return func(*args, **kwargs)
    903 
    904     return wrapper

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask/dataframe/dispatch.py in concat(dfs, axis, join, uniform, filter_warning, ignore_index, **kwargs)
     60     else:
     61         func = concat_dispatch.dispatch(type(dfs[0]))
---> 62         return func(
     63             dfs,
     64             axis=axis,

/opt/conda/envs/rapids/lib/python3.9/contextlib.py in inner(*args, **kwds)
     77         def inner(*args, **kwds):
     78             with self._recreate_cm():
---> 79                 return func(*args, **kwds)
     80         return inner
     81 

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cudf/backends.py in concat_cudf(dfs, axis, join, uniform, filter_warning, sort, ignore_index, **kwargs)
    273         )
    274 
--> 275     return cudf.concat(dfs, axis=axis, ignore_index=ignore_index)
    276 
    277 

/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/reshape.py in concat(objs, axis, join, ignore_index, sort)
    397                 # don't filter out empty df's
    398                 objs = old_objs
--> 399             result = cudf.DataFrame._concat(
    400                 objs,
    401                 axis=axis,

/opt/conda/envs/rapids/lib/python3.9/contextlib.py in inner(*args, **kwds)
     77         def inner(*args, **kwds):
     78             with self._recreate_cm():
---> 79                 return func(*args, **kwds)
     80         return inner
     81 

/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/dataframe.py in _concat(cls, objs, axis, join, ignore_index, sort)
   1674         # Concatenate the Tables
   1675         out = cls._from_data(
-> 1676             *libcudf.concat.concat_tables(
   1677                 tables, ignore_index=ignore_index or are_all_range_index
   1678             )

concat.pyx in cudf._lib.concat.concat_tables()

concat.pyx in cudf._lib.concat.concat_tables()
beckernick commented 2 years ago

When you call compute, you are bringing all of the data to a single GPU and a single Python process, which is likely not what you want to do. These two blogs from Coiled can provide some context on how to handle distributed data in memory.

parkerzf commented 2 years ago

Thanks @beckernick That makes total sense. I will read more about the blogs you shared. I have a follow up question, actually what I want is to load the data to cugraph and run the pagerank algorithm, not compute. However, it still shows the OOM error. You may find the detailed error message in this issue: https://github.com/rapidsai/cugraph/issues/2694.

import dask
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import dask_cudf
import cugraph
import cugraph.dask as dask_cugraph
from cugraph.dask.common.mg_utils import get_visible_devices
from cugraph.dask.comms import comms as Comms
import time

csv_file_name = "twitter-2010.csv"

with dask.config.set(jit_unspill=True):
    with LocalCUDACluster(n_workers=8, device_memory_limit="16GB") as cluster:
        with Client(cluster) as client:
            client.wait_for_workers(len(get_visible_devices()))
            Comms.initialize(p2p=True)
            chunksize = dask_cugraph.get_chunksize(csv_file_name)
            ddf = dask_cudf.read_csv(csv_file_name, chunksize=chunksize, delimiter=' ', names=['src', 'dst'], dtype=['int32', 'int32'])
            G = cugraph.Graph(directed=True)
            G.from_dask_cudf_edgelist(ddf, source='src', destination='dst')

This doesn't seem to be the same issue because I don't collect all the data to a single GPU. Do you maybe have a hint what could be the reason for that? Thanks!

Ankitkurani1997 commented 2 years ago

@parkerzf , did you find any workaround for the above issue?

parkerzf commented 2 years ago

Nope, hope that someone else could share their experience to deal with large dataset.