rapidsai / cugraph

cuGraph - RAPIDS Graph Analytics Library
https://docs.rapids.ai/api/cugraph/stable/
Apache License 2.0
1.69k stars 300 forks source link

[QST] Handling GPU memory oversubscription #2694

Closed Ankitkurani1997 closed 1 year ago

Ankitkurani1997 commented 2 years ago

I want to read a csv file which has size greater then that of single GPU on my local system. It give out of memory error, every time I try to read the file in cudf dataframe.

Is there a way to spill the memory to host memory or manage this, or is it not possible to read file greater than single GPU memory in cudf dataframe?

I have only one GPU.

File to load :- 26GB

I tried dask_cudf, but it also internally calls cudf.read_csv, so while performing .compute(), it again gives Out of memory error or illegal address.

@madsbk @beckernick

nvidia-smi :- RTX 3090 GPU image

shwina commented 2 years ago

so while performing .compute(), it again gives Out of memory error or illegal address.

Are you calling compute() immediately after the call to read_csv()? Ideally, you should only call .compute() on the end result of your workflow, which is typically much smaller than the initial data.

madsbk commented 2 years ago

Try to use a local CUDA cluster with spilling enabled:

from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import dask_cudf

# Must run dask cluster within a "__main__" guard
if __name__ == "__main__":

    # Setup local CUDA cluster with spilling
    with LocalCUDACluster(jit_unspill=True) as cluster:
        with Client(cluster) as client:
            # Load from a CSV file
            ddf = dask_cudf.read_csv("myfile.csv")
            # Do your computations
            # ...
            # Get result
            print(ddf.head())
Ankitkurani1997 commented 2 years ago

so while performing .compute(), it again gives Out of memory error or illegal address.

Are you calling compute() immediately after the call to read_csv()? Ideally, you should only call .compute() on the end result of your workflow, which is typically much smaller than the initial data.

Got it

Ankitkurani1997 commented 2 years ago

@madsbk

Getting following error when trying to construct a graph using cugraph

Traceback (most recent call last):
  File "new_approach.py", line 33, in <module>
    G.from_dask_cudf_edgelist(ddf, source='src', destination='dst')
  File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.8/site-packages/cugraph/structure/graph_classes.py", line 284, in from_dask_cudf_edgelist
    self._Impl._simpleDistributedGraphImpl__from_edgelist(
  File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.8/site-packages/cugraph/structure/graph_implementation/simpleDistributedGraph.py", line 143, in __from_edgelist
    source_col, dest_col = symmetrize(
  File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.8/site-packages/cugraph/structure/symmetrize.py", line 242, in symmetrize
    output_df = symmetrize_ddf(input_df, source_col_name, dest_col_name,
  File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.8/site-packages/cugraph/structure/symmetrize.py", line 162, in symmetrize_ddf
    num_partitions = len(worker_list)
TypeError: object of type 'NoneType' has no len()

Code snippet :

import dask
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import dask_cudf
import cugraph
import cugraph.dask as dask_cugraph

if __name__ == "__main__":
    with dask.config.set(jit_unspill=True):
        with LocalCUDACluster(n_workers=1, device_memory_limit="1GB") as cluster:
            with Client(cluster) as client:
                ddf = dask_cudf.read_csv("twitter-2010.csv", delimiter=' ', names=['src', 'dst'], dtype=['int32', 'int32'])
                G = cugraph.Graph(directed=True)
                G.from_dask_cudf_edgelist(ddf, source='src', destination='dst')
                print(ddf.head())

My question is can I directly use dask_cudf dataframe to perform operations directly on the algorithms provided by cugraph or I have to use partition based approach, and for each partition the operation has to be made?

quasiben commented 2 years ago

cc @eriknw @VibhuJawa for cugraph usage

Ankitkurani1997 commented 2 years ago

@beckernick @eriknw @VibhuJawa can you please suggest some fix around the above error or point to some different approach if I want to use dask_cudf along with cugraph?

Ankitkurani1997 commented 2 years ago

@madsbk @beckernick @afender I want to try to read 26GB data on a single gpu having memory 24GB available. Wanted to know how memory over subscription is handled I am using cudf.read_csv

and have imported rmm and rmm.reinitialize(managed_memory=True) for allocating all memory as managed memory (cudaMallocManaged underlying allocator) as suggested in article https://medium.com/rapids-ai/tackling-large-graphs-with-rapids-cugraph-and-unified-virtual-memory-b5b69a065d4

Giving copy_if failed to synchronize: cudaErrorIllegalAddress: an illegal memory access was encountered

Have also tried rmm.mr.set_current_device_resource(rmm.mr.ManagedMemoryResource())

Does cudf.read_csv does not handle memory over subscription ?

If possible how to enable the setting?

beckernick commented 2 years ago

Dask cuDF will spill from GPU to CPU memory during dataframe operations by default. The threshold for spilling can be configured by using the device_memory_limit parameter as you're doing. Dask likely cannot spill cuGraph operations with the default configuration (or at all).

It sounds like you are trying to use Dask with cuDF and cuGraph and running into issues related to processing large graphs with cuGraph. As a result, I'm going to transfer this issue to the cuGraph repository.

If possible, please provide a minimal, reproducible example. This will make it much easier for folks to understand what you're trying to do and provide guidance.

rlratzel commented 2 years ago

@Ankitkurani1997 , I was able to reproduce your problem using your code snippet. I made a few minor changes to it and was able to get it to work, see below:

import dask
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import dask_cudf
import cugraph
import cugraph.dask as dask_cugraph
from cugraph.dask.common.mg_utils import get_visible_devices
from cugraph.dask.comms import comms as Comms

csv_file_name = "twitter-2010.csv"

if __name__ == "__main__":
    with dask.config.set(jit_unspill=True):
        with LocalCUDACluster(n_workers=1, device_memory_limit="1GB") as cluster:
            with Client(cluster) as client:
                client.wait_for_workers(len(get_visible_devices()))
                Comms.initialize(p2p=True)
                ddf = dask_cudf.read_csv(csv_file_name, delimiter=' ', names=['src', 'dst'], dtype=['int32', 'int32'])
                G = cugraph.Graph(directed=True)
                G.from_dask_cudf_edgelist(ddf, source='src', destination='dst')
                print(ddf.head())

The extra code needed was the wait_for_workers() call and the Comms.initialize(p2p=True) call. Unfortunately, I don't think the Comms object will get cleaned up upon exit of the context manager (this would a nice enhancement for us to make), so you'll have to call

Comms.destroy()

separately after exiting the context.

My question is can I directly use dask_cudf dataframe to perform operations directly on the algorithms provided by cugraph or...

The Graph you created and populated with from_dask_cudf_edgelist() is all you should need for calling our cugraph.dask algos - you simply pass it as the first arg to those algos in the cugraph.dask package. The results returned will be dask_cudf DataFrames instead of cudf DataFrames, but otherwise our single-GPU and multi-GPU algos are intended to be pretty close in their usage.

rlratzel commented 2 years ago

@Ankitkurani1997 - sorry, I should have clarified this earlier. I was able to reproduce the problem you saw (TypeError: object of type 'NoneType' has no len()) and get the snippet to eventually work all while using a dataset I had, since I didn't have the one used in your snippet (twitter-2010.csv).

Ankitkurani1997 commented 2 years ago

@rlratzel , I tried implementing from the Graph object we received after graph creation, and applied mg_pagerank algo, but it have me out of memory error, seems that spliiling is not supported with the following code.

import dask
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import dask_cudf
import cugraph
import cugraph.dask as dask_cugraph
from cugraph.dask.common.mg_utils import get_visible_devices
from cugraph.dask.comms import comms as Comms

csv_file_name = "twitter-2010.csv"

if __name__ == "__main__":
    with dask.config.set(jit_unspill=True):
        with LocalCUDACluster(n_workers=1, device_memory_limit="20GB") as cluster:
            with Client(cluster) as client:
                client.wait_for_workers(len(get_visible_devices()))
                Comms.initialize(p2p=True)
                ddf = dask_cudf.read_csv(csv_file_name, delimiter=' ', names=['src', 'dst'], dtype=['int32', 'int32'])
                G = cugraph.Graph(directed=True)
                G.from_dask_cudf_edgelist(ddf, source='src', destination='dst')
                pr_df = dask_cugraph.pagerank(G, tol=1e-4)
                Comms.destroy()

Does cugraph algo actually take into account the spilling from GPU to CPU ? is there any limit on amount of data cugraph algos can process at any given moment.

Seems like 26GB Graph is a lot to process for GPU memory 24GB

rlratzel commented 2 years ago

@Ankitkurani1997 - you are correct that spilling is currently only supported for the input graph dataframe, and the graph object that gets created needs to fit entirely on GPU and won't be spilled to host memory. In addition to the graph, many algos also need additional GPU working memory which is not spillable either.

@ChuckHastings should be able to comment further, but in general, Chuck has mentioned this to me in the past:

The peak memory usage during graph construction (edgelist dataframe + internal DCSR + internal renumbering structures + some internal shuffle buffers) will limit the size graph you can create. Then, some algorithms (notably Louvain) will not operate on the largest size graph you can create in the GPU because it requires more working memory than the peak memory usage during graph construction. These algos will require you to use more GPUs to spread the data out and leave more GPU memory available for running the algorithm.

ChuckHastings commented 2 years ago

The cuGraph graph and intermediate results will not spill to host memory or disk. Graph algorithms, by their nature, are random access algorithms, so techniques like spilling in dask are not particularly effective. There is no reasonable way to get good performance from a graph algorithm where the entire graph and any ancillary data you need for the algorithm doesn't fit in the memory of the GPU. My general rule of thumb is to limit your graph size to half of the GPU memory to leave room for intermediate results. Some algorithms might require even more free memory.

You could run with UVM, which will page memory between host and GPU memory. This will dramatically decrease the performance (especially with a 26GB graph running on a GPU with 24GB of memory, as the pages of memory will little be thrashing on your memory bus. But there is a chance it would work.

More generally you probably need to find a way to shrink your graph, or run on either a larger single GPU (a 64 GB GPU would probably do very well for this problem) or run on a cluster of GPUs (perhaps 3 24 GB GPUs would be a good minimum size).

parkerzf commented 2 years ago

I faced a similar issue. Trying to find the practical hardware requirements for running graph algorithms using cuGraph. I use the AWS g5.12xlarge instance, with 4 A10G GPUs, in total 96 GB GPU memory. However, I still can't run pagerank with the twitter 26GB data. From the log I used all 4 GPUs to create the LocalCUDACluster, so I don't know what I could improve. Could you take a look? Here is the code:

import dask
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import dask_cudf
import cugraph
import cugraph.dask as dask_cugraph
from cugraph.dask.common.mg_utils import get_visible_devices
from cugraph.dask.comms import comms as Comms

csv_file_name = "twitter-2010.csv"

if __name__ == "__main__":
    with dask.config.set(jit_unspill=True):
        with LocalCUDACluster(n_workers=4, device_memory_limit="20GB") as cluster:
            with Client(cluster) as client:
                client.wait_for_workers(len(get_visible_devices()))
                Comms.initialize(p2p=True)
                chunksize = dask_cugraph.get_chunksize(csv_file_name)
                ddf = dask_cudf.read_csv(csv_file_name, chunksize=chunksize, delimiter=' ', names=['src', 'dst'], dtype=['int32', 'int32'])
                G = cugraph.Graph(directed=True)
                G.from_dask_cudf_edgelist(ddf, source='src', destination='dst')

                Comms.destroy()
                client.close()
                cluster.close()

The error message:


2022-09-21 17:25:49,580 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-09-21 17:25:49,580 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-09-21 17:25:49,582 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-09-21 17:25:49,582 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-09-21 17:25:49,594 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-09-21 17:25:49,594 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-09-21 17:25:49,631 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-09-21 17:25:49,631 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/dataframe.py:1064: FutureWarning: The default dtype for empty Series will be 'object' instead of 'float64' in a future version. Specify a dtype explicitly to silence this warning.
  return pd.Series(self._dtypes)
2022-09-21 17:26:50,183 - distributed.worker - WARNING - RMM allocation of 2.74 GiB failed, spill-on-demand couldn't find any device memory to spill:
<ProxyManager dev_limit=18.63 GiB host_limit=46.68 GiB disk=0 B(0) host=2.74 GiB(1) dev=0 B(0)>:
  host - <dask_cuda.proxy_object.ProxyObject at 0x7f6f58083b20 of cudf.core.dataframe.DataFrame (serialized='dask')>
traceback:
  File "/opt/conda/envs/rapids/lib/python3.9/threading.py", line 937, in _bootstrap
    self._bootstrap_inner()
  File "/opt/conda/envs/rapids/lib/python3.9/threading.py", line 980, in _bootstrap_inner
    self.run()
  File "/opt/conda/envs/rapids/lib/python3.9/threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/threadpoolexecutor.py", line 57, in _worker
    task.run()
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/_concurrent_futures_thread.py", line 65, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/worker.py", line 2880, in apply_function
    msg = apply_function_simple(function, args, kwargs, time_delay)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/worker.py", line 2902, in apply_function_simple
    result = function(*args, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cudf/groupby.py", line 888, in _finalize_gb_agg
    gb.columns = col_array
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxy_object.py", line 470, in __setattr__
    object.__setattr__(pxy.deserialize(nbytes=self.__sizeof__()), name, val)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxy_object.py", line 137, in wrapper2
    ret = func(self)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxy_object.py", line 526, in __sizeof__
    return sizeof(pxy.obj)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask/utils.py", line 607, in __call__
    return meth(arg, *args, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cudf/backends.py", line 421, in sizeof_cudf_dataframe
    + df._index.memory_usage()
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/multiindex.py", line 1544, in memory_usage
    if self.levels:
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/multiindex.py", line 561, in levels
    self._compute_levels_and_codes()
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/multiindex.py", line 698, in _compute_levels_and_codes
    code, cats = cudf.Series._from_data({None: col}).factorize()
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/single_column_frame.py", line 293, in factorize
    return cudf.core.algorithms.factorize(self, na_sentinel=na_sentinel)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/algorithms.py", line 62, in factorize
    labels = values._label_encoding(cats=cats, na_sentinel=na_sentinel).values
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/series.py", line 2233, in _label_encoding
    codes = codes.sort_values("order")["code"].fillna(na_sentinel)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/indexed_frame.py", line 1901, in sort_values
    out = self._gather(
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/indexed_frame.py", line 1506, in _gather
    libcudf.copying.gather(
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxify_host_file.py", line 587, in oom
    traceback.print_stack(file=f)

2022-09-21 17:26:50,192 - distributed.worker - WARNING - Compute Failed
Key:       ('groupby_agg-39a76f8e19e5e32a0280fd5e61efb10a', 0)
Function:  _finalize_gb_agg
args:      (<dask_cuda.proxy_object.ProxyObject at 0x7f6f58083b20 of cudf.core.dataframe.DataFrame (serialized='dask')>, ['src', 'dst'], {}, [], Index([], dtype='object'), True, None, '___', True, {})
kwargs:    {}
Exception: "MemoryError('std::bad_alloc: out_of_memory: CUDA error at: /opt/conda/envs/rapids/include/rmm/mr/device/cuda_memory_resource.hpp:70: cudaErrorMemoryAllocation out of memory')"

2022-09-21 17:26:50,198 - distributed.worker - WARNING - RMM allocation of 2.74 GiB failed, spill-on-demand couldn't find any device memory to spill:
<ProxyManager dev_limit=18.63 GiB host_limit=46.68 GiB disk=0 B(0) host=2.74 GiB(1) dev=0 B(0)>:
  host - <dask_cuda.proxy_object.ProxyObject at 0x7f4f0c4228b0 of cudf.core.dataframe.DataFrame (serialized='dask')>
traceback:
  File "/opt/conda/envs/rapids/lib/python3.9/threading.py", line 937, in _bootstrap
    self._bootstrap_inner()
  File "/opt/conda/envs/rapids/lib/python3.9/threading.py", line 980, in _bootstrap_inner
    self.run()
  File "/opt/conda/envs/rapids/lib/python3.9/threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/threadpoolexecutor.py", line 57, in _worker
    task.run()
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/_concurrent_futures_thread.py", line 65, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/worker.py", line 2880, in apply_function
    msg = apply_function_simple(function, args, kwargs, time_delay)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/worker.py", line 2902, in apply_function_simple
    result = function(*args, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cudf/groupby.py", line 888, in _finalize_gb_agg
    gb.columns = col_array
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxy_object.py", line 470, in __setattr__
    object.__setattr__(pxy.deserialize(nbytes=self.__sizeof__()), name, val)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxy_object.py", line 137, in wrapper2
    ret = func(self)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxy_object.py", line 526, in __sizeof__
    return sizeof(pxy.obj)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask/utils.py", line 607, in __call__
    return meth(arg, *args, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cudf/backends.py", line 421, in sizeof_cudf_dataframe
    + df._index.memory_usage()
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/multiindex.py", line 1544, in memory_usage
    if self.levels:
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/multiindex.py", line 561, in levels
    self._compute_levels_and_codes()
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/multiindex.py", line 698, in _compute_levels_and_codes
    code, cats = cudf.Series._from_data({None: col}).factorize()
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/single_column_frame.py", line 293, in factorize
    return cudf.core.algorithms.factorize(self, na_sentinel=na_sentinel)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/algorithms.py", line 62, in factorize
    labels = values._label_encoding(cats=cats, na_sentinel=na_sentinel).values
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/series.py", line 2233, in _label_encoding
    codes = codes.sort_values("order")["code"].fillna(na_sentinel)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/indexed_frame.py", line 1901, in sort_values
    out = self._gather(
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/indexed_frame.py", line 1506, in _gather
    libcudf.copying.gather(
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxify_host_file.py", line 587, in oom
    traceback.print_stack(file=f)

2022-09-21 17:26:50,208 - distributed.worker - WARNING - Compute Failed
Key:       ('groupby_agg-39a76f8e19e5e32a0280fd5e61efb10a', 1)
Function:  _finalize_gb_agg
args:      (<dask_cuda.proxy_object.ProxyObject at 0x7f4f0c4228b0 of cudf.core.dataframe.DataFrame (serialized='dask')>, ['src', 'dst'], {}, [], Index([], dtype='object'), True, None, '___', True, {})
kwargs:    {}
Exception: "MemoryError('std::bad_alloc: out_of_memory: CUDA error at: /opt/conda/envs/rapids/include/rmm/mr/device/cuda_memory_resource.hpp:70: cudaErrorMemoryAllocation out of memory')"

2022-09-21 17:26:50,232 - distributed.worker - WARNING - RMM allocation of 2.73 GiB failed, spill-on-demand couldn't find any device memory to spill:
<ProxyManager dev_limit=18.63 GiB host_limit=46.68 GiB disk=0 B(0) host=2.73 GiB(1) dev=0 B(0)>:
  host - <dask_cuda.proxy_object.ProxyObject at 0x7f77e8fffd90 of cudf.core.dataframe.DataFrame (serialized='dask')>
traceback:
  File "/opt/conda/envs/rapids/lib/python3.9/threading.py", line 937, in _bootstrap
    self._bootstrap_inner()
  File "/opt/conda/envs/rapids/lib/python3.9/threading.py", line 980, in _bootstrap_inner
    self.run()
  File "/opt/conda/envs/rapids/lib/python3.9/threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/threadpoolexecutor.py", line 57, in _worker
    task.run()
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/_concurrent_futures_thread.py", line 65, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/worker.py", line 2880, in apply_function
    msg = apply_function_simple(function, args, kwargs, time_delay)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/worker.py", line 2902, in apply_function_simple
    result = function(*args, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cudf/groupby.py", line 888, in _finalize_gb_agg
    gb.columns = col_array
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxy_object.py", line 470, in __setattr__
    object.__setattr__(pxy.deserialize(nbytes=self.__sizeof__()), name, val)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxy_object.py", line 137, in wrapper2
    ret = func(self)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxy_object.py", line 526, in __sizeof__
    return sizeof(pxy.obj)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask/utils.py", line 607, in __call__
    return meth(arg, *args, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cudf/backends.py", line 421, in sizeof_cudf_dataframe
    + df._index.memory_usage()
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/multiindex.py", line 1544, in memory_usage
    if self.levels:
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/multiindex.py", line 561, in levels
    self._compute_levels_and_codes()
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/multiindex.py", line 698, in _compute_levels_and_codes
    code, cats = cudf.Series._from_data({None: col}).factorize()
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/single_column_frame.py", line 293, in factorize
    return cudf.core.algorithms.factorize(self, na_sentinel=na_sentinel)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/algorithms.py", line 62, in factorize
    labels = values._label_encoding(cats=cats, na_sentinel=na_sentinel).values
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/series.py", line 2233, in _label_encoding
    codes = codes.sort_values("order")["code"].fillna(na_sentinel)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/indexed_frame.py", line 1901, in sort_values
    out = self._gather(
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/indexed_frame.py", line 1506, in _gather
    libcudf.copying.gather(
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxify_host_file.py", line 587, in oom
    traceback.print_stack(file=f)

2022-09-21 17:26:50,238 - distributed.worker - WARNING - RMM allocation of 2.73 GiB failed, spill-on-demand couldn't find any device memory to spill:
<ProxyManager dev_limit=18.63 GiB host_limit=46.68 GiB disk=0 B(0) host=2.73 GiB(1) dev=0 B(0)>:
  host - <dask_cuda.proxy_object.ProxyObject at 0x7fc37c055910 of cudf.core.dataframe.DataFrame (serialized='dask')>
traceback:
  File "/opt/conda/envs/rapids/lib/python3.9/threading.py", line 937, in _bootstrap
    self._bootstrap_inner()
  File "/opt/conda/envs/rapids/lib/python3.9/threading.py", line 980, in _bootstrap_inner
    self.run()
  File "/opt/conda/envs/rapids/lib/python3.9/threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/threadpoolexecutor.py", line 57, in _worker
    task.run()
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/_concurrent_futures_thread.py", line 65, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/worker.py", line 2880, in apply_function
    msg = apply_function_simple(function, args, kwargs, time_delay)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/worker.py", line 2902, in apply_function_simple
    result = function(*args, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cudf/groupby.py", line 888, in _finalize_gb_agg
    gb.columns = col_array
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxy_object.py", line 470, in __setattr__
    object.__setattr__(pxy.deserialize(nbytes=self.__sizeof__()), name, val)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxy_object.py", line 137, in wrapper2
    ret = func(self)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxy_object.py", line 526, in __sizeof__
    return sizeof(pxy.obj)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask/utils.py", line 607, in __call__
    return meth(arg, *args, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cudf/backends.py", line 421, in sizeof_cudf_dataframe
    + df._index.memory_usage()
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/multiindex.py", line 1544, in memory_usage
    if self.levels:
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/multiindex.py", line 561, in levels
    self._compute_levels_and_codes()
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/multiindex.py", line 698, in _compute_levels_and_codes
    code, cats = cudf.Series._from_data({None: col}).factorize()
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/single_column_frame.py", line 293, in factorize
    return cudf.core.algorithms.factorize(self, na_sentinel=na_sentinel)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/algorithms.py", line 62, in factorize
    labels = values._label_encoding(cats=cats, na_sentinel=na_sentinel).values
  File "/opt/conda/envs/rapids/lib/python3.9/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/series.py", line 2233, in _label_encoding
    codes = codes.sort_values("order")["code"].fillna(na_sentinel)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/indexed_frame.py", line 1901, in sort_values
    out = self._gather(
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/indexed_frame.py", line 1506, in _gather
    libcudf.copying.gather(
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxify_host_file.py", line 587, in oom
    traceback.print_stack(file=f)

---------------------------------------------------------------------------
MemoryError                               Traceback (most recent call last)
/tmp/ipykernel_3557/3750493298.py in <module>
     19                 ddf = dask_cudf.read_csv(csv_file_name, chunksize=chunksize, delimiter=' ', names=['src', 'dst'], dtype=['int32', 'int32'])
     20                 G = cugraph.Graph(directed=True)
---> 21                 G.from_dask_cudf_edgelist(ddf, source='src', destination='dst')
     22 
     23                 Comms.destroy()

/opt/conda/envs/rapids/lib/python3.9/site-packages/cugraph-22.8.0-py3.9-linux-x86_64.egg/cugraph/structure/graph_classes.py in from_dask_cudf_edgelist(self, input_ddf, source, destination, edge_attr, renumber, store_transposed, legacy_renum_only)
    282         elif (self._Impl.edgelist is not None):
    283             raise RuntimeError("Graph already has values")
--> 284         self._Impl._simpleDistributedGraphImpl__from_edgelist(
    285             input_ddf,
    286             source,

/opt/conda/envs/rapids/lib/python3.9/site-packages/cugraph-22.8.0-py3.9-linux-x86_64.egg/cugraph/structure/graph_implementation/simpleDistributedGraph.py in __from_edgelist(self, input_ddf, source, destination, edge_attr, renumber, store_transposed, legacy_renum_only)
    176         # C++ renumbering is enabled by default for algorithms that
    177         # support it (but only called if renumbering is on)
--> 178         self.compute_renumber_edge_list(
    179             transposed=store_transposed,
    180             legacy_renum_only=legacy_renum_only

/opt/conda/envs/rapids/lib/python3.9/site-packages/cugraph-22.8.0-py3.9-linux-x86_64.egg/cugraph/structure/graph_implementation/simpleDistributedGraph.py in compute_renumber_edge_list(self, transposed, legacy_renum_only)
    763 
    764             renumbered_ddf, number_map, aggregate_segment_offsets = \
--> 765                 NumberMap.renumber_and_segment(
    766                     self.input_df,
    767                     self.source_columns,

/opt/conda/envs/rapids/lib/python3.9/site-packages/cugraph-22.8.0-py3.9-linux-x86_64.egg/cugraph/structure/number_map.py in renumber_and_segment(df, src_col_names, dst_col_names, preserve_order, store_transposed, legacy_renum_only)
    576                          renumber_map.renumbered_dst_col_name}
    577             )
--> 578         num_edges = len(df)
    579 
    580         if isinstance(df, dask_cudf.DataFrame):

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask/dataframe/core.py in __len__(self)
   4414             return super().__len__()
   4415         else:
-> 4416             return len(s)
   4417 
   4418     def __contains__(self, key):

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask/dataframe/core.py in __len__(self)
    708 
    709     def __len__(self):
--> 710         return self.reduction(
    711             len, np.sum, token="len", meta=int, split_every=False
    712         ).compute()

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask/base.py in compute(self, **kwargs)
    313         dask.base.compute
    314         """
--> 315         (result,) = compute(self, traverse=False, **kwargs)
    316         return result
    317 

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask/base.py in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    596         postcomputes.append(x.__dask_postcompute__())
    597 
--> 598     results = schedule(dsk, keys, **kwargs)
    599     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    600 

/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2999                     should_rejoin = False
   3000             try:
-> 3001                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3002             finally:
   3003                 for f in futures.values():

/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   2173             else:
   2174                 local_worker = None
-> 2175             return self.sync(
   2176                 self._gather,
   2177                 futures,

/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/utils.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    336             return future
    337         else:
--> 338             return sync(
    339                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    340             )

/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    403     if error:
    404         typ, exc, tb = error
--> 405         raise exc.with_traceback(tb)
    406     else:
    407         return result

/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/utils.py in f()
    376                 future = asyncio.wait_for(future, callback_timeout)
    377             future = asyncio.ensure_future(future)
--> 378             result = yield future
    379         except Exception:
    380             error = sys.exc_info()

/opt/conda/envs/rapids/lib/python3.9/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   2036                             exc = CancelledError(key)
   2037                         else:
-> 2038                             raise exception.with_traceback(traceback)
   2039                         raise exc
   2040                     if errors == "skip":

/opt/conda/envs/rapids/lib/python3.9/contextlib.py in inner()
     77         def inner(*args, **kwds):
     78             with self._recreate_cm():
---> 79                 return func(*args, **kwds)
     80         return inner
     81 

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cudf/groupby.py in _finalize_gb_agg()
    886             agg_array.append(aggs_renames.get((name, agg), agg))
    887     if str_cols_out:
--> 888         gb.columns = col_array
    889     else:
    890         gb.columns = pd.MultiIndex.from_arrays([col_array, agg_array])

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxy_object.py in __setattr__()

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxy_object.py in wrapper2()

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cuda-22.8.0-py3.9.egg/dask_cuda/proxy_object.py in __sizeof__()

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask/utils.py in __call__()
    605         """
    606         meth = self.dispatch(type(arg))
--> 607         return meth(arg, *args, **kwargs)
    608 
    609     @property

/opt/conda/envs/rapids/lib/python3.9/contextlib.py in inner()
     77         def inner(*args, **kwds):
     78             with self._recreate_cm():
---> 79                 return func(*args, **kwds)
     80         return inner
     81 

/opt/conda/envs/rapids/lib/python3.9/site-packages/dask_cudf/backends.py in sizeof_cudf_dataframe()
    419     return int(
    420         sum(col.memory_usage for col in df._data.columns)
--> 421         + df._index.memory_usage()
    422     )
    423 

/opt/conda/envs/rapids/lib/python3.9/contextlib.py in inner()
     77         def inner(*args, **kwds):
     78             with self._recreate_cm():
---> 79                 return func(*args, **kwds)
     80         return inner
     81 

/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/multiindex.py in memory_usage()
   1542     def memory_usage(self, deep=False):
   1543         usage = sum(col.memory_usage for col in self._data.columns)
-> 1544         if self.levels:
   1545             for level in self.levels:
   1546                 usage += level.memory_usage(deep=deep)

/opt/conda/envs/rapids/lib/python3.9/contextlib.py in inner()
     77         def inner(*args, **kwds):
     78             with self._recreate_cm():
---> 79                 return func(*args, **kwds)
     80         return inner
     81 

/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/multiindex.py in levels()
    559         """
    560         if self._levels is None:
--> 561             self._compute_levels_and_codes()
    562         return self._levels
    563 

/opt/conda/envs/rapids/lib/python3.9/contextlib.py in inner()
     77         def inner(*args, **kwds):
     78             with self._recreate_cm():
---> 79                 return func(*args, **kwds)
     80         return inner
     81 

/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/multiindex.py in _compute_levels_and_codes()
    696         codes = {}
    697         for name, col in self._data.items():
--> 698             code, cats = cudf.Series._from_data({None: col}).factorize()
    699             codes[name] = code.astype(np.int64)
    700             levels.append(cudf.Series(cats, name=None))

/opt/conda/envs/rapids/lib/python3.9/contextlib.py in inner()
     77         def inner(*args, **kwds):
     78             with self._recreate_cm():
---> 79                 return func(*args, **kwds)
     80         return inner
     81 

/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/single_column_frame.py in factorize()
    291         StringIndex(['a' 'c'], dtype='object')
    292         """
--> 293         return cudf.core.algorithms.factorize(self, na_sentinel=na_sentinel)
    294 
    295     @_cudf_nvtx_annotate

/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/algorithms.py in factorize()
     60 
     61     name = values.name  # label_encoding mutates self.name
---> 62     labels = values._label_encoding(cats=cats, na_sentinel=na_sentinel).values
     63     values.name = name
     64 

/opt/conda/envs/rapids/lib/python3.9/contextlib.py in inner()
     77         def inner(*args, **kwds):
     78             with self._recreate_cm():
---> 79                 return func(*args, **kwds)
     80         return inner
     81 

/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/series.py in _label_encoding()
   2231 
   2232         codes = codes.merge(value, on="value", how="left")
-> 2233         codes = codes.sort_values("order")["code"].fillna(na_sentinel)
   2234 
   2235         codes.name = None

/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/indexed_frame.py in sort_values()
   1899 
   1900         # argsort the `by` column
-> 1901         out = self._gather(
   1902             self._get_columns_by_label(by)._get_sorted_inds(
   1903                 ascending=ascending, na_position=na_position

/opt/conda/envs/rapids/lib/python3.9/site-packages/cudf/core/indexed_frame.py in _gather()
   1504 
   1505         return self._from_columns_like_self(
-> 1506             libcudf.copying.gather(
   1507                 list(self._index._columns + self._columns)
   1508                 if keep_index

copying.pyx in cudf._lib.copying.gather()

MemoryError: std::bad_alloc: out_of_memory: CUDA error at: /opt/conda/envs/rapids/include/rmm/mr/device/cuda_memory_resource.hpp:70: cudaErrorMemoryAllocation out of memory
​```
Ankitkurani1997 commented 2 years ago

@parkerzf I am also facing same issue as you mentioned above, were you able to get any work around for the problem?

Ankitkurani1997 commented 2 years ago

@afender @ChuckHastings I tried using simple uvm based approach with the help of follwoing code snippet

import rmm  
rmm.mr.set_current_device_resource(rmm.mr.ManagedMemoryResource())
assert(rmm.is_initialized())

import cugraph
import cudf
import time

cudf.set_allocator("managed")
start_ts = time.time()
cdf = cudf.read_csv("../twitter-2010.csv", delimiter=' ', usecols=[0, 1], names=['src', 'dst'], dtype=['int32', 'int32'])
G = cugraph.DiGraph()
G.from_cudf_edgelist(cdf, source='src', destination='dst', renumber=False)
end_ts = time.time()

print("Total Time on plain UVM  approach:- " + str(end_ts - start_ts))

I am facing OOM issue I dont understand why, is it that cudf.read_csv brings complete thing in device memory and does not spill anything back to cpu or disk So if answer to above question is yes, only option remains is to use dask cudf,

But dask cudf will use partitioned based approach right? I just want the results for uvm based solutions, any comment how to achieve this on single gpu for gpu memory oversubscription

parkerzf commented 2 years ago

Not yet. I created another issue in cudf: https://github.com/rapidsai/cudf/issues/11796

parkerzf commented 2 years ago

To summarise, both UVM approach and the multi-GPU with LocalCUDACluster face the OOM issue, when loading the twitter data to cuGraph.

@Ankitkurani1997 What is the GPU memory and CPU memory in your test?

Ankitkurani1997 commented 2 years ago

CPU Memory : 58GB GPU Memory : 24GB

Ankitkurani1997 commented 2 years ago

@ChuckHastings I tried using UVM

Following is the code snippet

import rmm  
rmm.reinitialize(managed_memory=True)
import cudf
import cugraph
import time

cdf = cudf.read_csv("datasets/twitter-2010.csv", delimiter=' ', names=['src', 'dst'])
G = cugraph.Graph(directed=True)
G.from_cudf_edgelist(cdf, source='src', destination='dst', renumber=False)

The process gets stuck while making edge list for 1.47 billion rows in the dataset.

I can process 1 billion edges in few mins

Is there a reason why it takes so much time for making edge list of 1.47 billlion edges?

I used debugger to pin point the part of the code in the cugraph library where issue might lie. I guess when it tries to make graph symmetric in SimplewGraph.py file miniconda3/envs/rapids-22.06/lib/python3.8/site-packages/cugraph/structure/graph_implementation/simpleGraph.py

in function __from_edgelist

Current CPU memory :- 377GB GPU memory :- 22GB

So there is enough space for the graph to fir in device+host space

ChuckHastings commented 2 years ago

@rlratzel may have some thoughts on this specific issue. My understanding of our python layer is limited.

A quick glance at the python code suggests:

You could try skipping this step and creating cugraph.MultiGraph(directed=True) and see if that works any better. That should bypass both of these cudf functions and hopefully actually get you into the cugraph C++ code.

Of course, you might still hit another bottleneck.

parkerzf commented 1 year ago

Hey @rlratzel @Ankitkurani1997 , any luck to make it work?

Ankitkurani1997 commented 1 year ago

yes @parkerzf I was able to run uvm based approach, thanks to @ChuckHastings

import rmm  
rmm.reinitialize(managed_memory=True)

For pagerank i created graph using G = cugraph.MultiGraph(directed=True) G.from_cudf_edgelist(df, source='src', destination='dst', renumber=False)

CPU main memory :- 128GB GPU memory :- 22GB

The footprint i guess when we create graph went up to 75-80GB Earlier I did not use machine with such main memory, but after shifting to machine with greater main memory I was able to resolve the issue in the UVM based approach

Ankitkurani1997 commented 1 year ago

@shwina

For the following code snippet,

import dask from dask_cuda import LocalCUDACluster from dask.distributed import Client, wait import dask_cudf import cugraph import cugraph.dask as dask_cugraph from cugraph.dask.common.mg_utils import get_visible_devices from cugraph.dask.comms import comms as Comms

csv_file_name = "../data/1b_twitter-2010.csv"

if __name__ == "__main__":
    with dask.config.set(jit_unspill=True):
        with LocalCUDACluster(n_workers=1, device_memory_limit="2GB") as cluster:
            with Client(cluster) as client:
                client.wait_for_workers(len(get_visible_devices()))
                Comms.initialize(p2p=True)
                ddf = dask_cudf.read_csv(csv_file_name, chunksize="0.5 GB", delimiter=' ', names=['src', 'dst'], dtype=['int32', 'int32'])
                G = cugraph.Graph(directed=True)
                G.from_dask_cudf_edgelist(ddf, source='src', destination='dst')
                Comms.destroy()

I am getting following warnings and errors

distributed.worker - WARNING - RMM allocation of 14.90 GiB failed, spill-on-demand couldn't find any device memory to spill

File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.8/threading.py", line 890, in _bootstrap self._bootstrap_inner() File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.8/threading.py", line 932, in _bootstrap_inner self.run() File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.8/threading.py", line 870, in run self._target(*self._args, self._kwargs) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.8/site-packages/distributed/threadpoolexecutor.py", line 57, in _worker task.run() File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.8/site-packages/distributed/_concurrent_futures_thread.py", line 65, in run result = self.fn(*self.args, *self.kwargs) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.8/site-packages/distributed/worker.py", line 2880, in apply_function msg = apply_function_simple(function, args, kwargs, time_delay) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.8/site-packages/distributed/worker.py", line 2902, in apply_function_simple result = function(args, kwargs) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.8/contextlib.py", line 75, in inner return func(*args, *kwds) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.8/site-packages/dask_cudf/groupby.py", line 784, in _tree_node_agg gb = df.groupby(gb_cols, dropna=dropna, as_index=False, sort=sort).agg( File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.8/contextlib.py", line 75, in inner return func(args, **kwds) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.8/site-packages/cudf/core/groupby/groupby.py", line 418, in agg ) = self._groupby.aggregate(columns, normalized_aggs) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.8/site-packages/dask_cuda/proxify_host_file.py", line 587, in oom traceback.print_stack(file=f)

2022-10-20 12:36:02,061 - distributed.worker - WARNING - Compute Failed Key: ('groupby_tree_reduce-cbc7f4751fc5ef7ba900e1bea2b059f9', 0, 0, 2) Function: _tree_node_agg args: ([<dask_cuda.proxy_object.ProxyObject at 0x7fe0dc14d8b0 of cudf.core.dataframe.DataFrame (serialized='dask')>, <dask_cuda.proxy_object.ProxyObject at 0x7fe0d53b9d60 of cudf.core.dataframe.DataFrame (serialized='dask')>, <dask_cuda.proxy_object.ProxyObject at 0x7fe0f0680be0 of cudf.core.dataframe.DataFrame (serialized='dask')>, <dask_cuda.proxy_object.ProxyObject at 0x7fe0d5357970 of cudf.core.dataframe.DataFrame (serialized='dask')>, <dask_cuda.proxy_object.ProxyObject at 0x7fe0dc24f910 of cudf.core.dataframe.DataFrame (serialized='dask')>], ['src', 'dst'], 1, True, None, '___') kwargs: {} Exception: "MemoryError('std::bad_alloc: out_of_memory: CUDA error at: /home/ankit/miniconda3/envs/rapids-22.08/include/rmm/mr/device/cuda_memory_resource.hpp:70: cudaErrorMemoryAllocation out of memory')"

I am trying to see how partition based approach behaves while reading forming graph of 1 billion edges

GPU memory :- 23 GB CPU memory :- 58GB

CSV file size on disk :- 17GB

parkerzf commented 1 year ago

@Ankitkurani1997 Have you tried your code:

G = cugraph.MultiGraph(directed=True)
G.from_cudf_edgelist(df, source='src', destination='dst', renumber=False)

using LocalCUDACluster?

Also, why do you set device_memory_limit to 2 GB given that you have 23GB GPU memory? It is the size of the CUDA device LRU cache, which is used to determine when the worker starts spilling to host memory.

Ankitkurani1997 commented 1 year ago

@parkerzf trying the changes suggested by you. Will let you know in few mins

Ankitkurani1997 commented 1 year ago

@parkerzf

For the following code snippet

import dask
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
import dask_cudf
import cugraph
import cugraph.dask as dask_cugraph
from cugraph.dask.common.mg_utils import get_visible_devices
from cugraph.dask.comms import comms as Comms

csv_file_name = "../data/1b_twitter-2010.csv"

if __name__ == "__main__":
    with dask.config.set(jit_unspill=True):
        with LocalCUDACluster(n_workers=1, device_memory_limit="20GB") as cluster:
            with Client(cluster) as client:
                client.wait_for_workers(len(get_visible_devices()))
                Comms.initialize(p2p=True)
                ddf = dask_cudf.read_csv(csv_file_name, chunksize="0.5 GB", delimiter=' ', names=['src', 'dst'], dtype=['int32', 'int32'])
                G = cugraph.MultiGraph(directed=True)
                G.from_cudf_edgelist(ddf, source='src', destination='dst', renumber=False)
                Comms.destroy()

I am getting following error :

Traceback (most recent call last): File "/home/ankit/rapids/graph/graph-analytics/part_test_pr.py", line 21, in G.from_cudf_edgelist(ddf, source='src', destination='dst', renumber=False) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/cugraph/structure/graph_classes.py", line 164, in from_cudf_edgelist self._Impl._simpleGraphImplfrom_edgelist( File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/cugraph/structure/graph_implementation/simpleGraph.py", line 145, in from_edgelist elist = input_df.compute().reset_index(drop=True) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/dask/base.py", line 315, in compute (result,) = compute(self, traverse=False, kwargs) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/dask/base.py", line 599, in compute return repack([f(r, a) for r, (f, a) in zip(results, postcomputes)]) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/dask/base.py", line 599, in return repack([f(r, a) for r, (f, a) in zip(results, postcomputes)]) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/dask/dataframe/core.py", line 138, in finalize return _concat(results) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/dask_cuda/proxify_device_objects.py", line 169, in wrapper ret = func(*args, *kwargs) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/dask/dataframe/core.py", line 133, in _concat else methods.concat(args2, uniform=True, ignore_index=ignore_index) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/dask/dataframe/dispatch.py", line 62, in concat return func( File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/dask_cuda/proxy_object.py", line 902, in wrapper return func(args, kwargs) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/dask/dataframe/dispatch.py", line 62, in concat return func( File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/contextlib.py", line 79, in inner return func(*args, kwds) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/dask_cudf/backends.py", line 275, in concat_cudf return cudf.concat(dfs, axis=axis, ignore_index=ignore_index) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/cudf/core/reshape.py", line 399, in concat result = cudf.DataFrame._concat( File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/contextlib.py", line 79, in inner return func(*args, *kwds) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/cudf/core/dataframe.py", line 1687, in _concat out._index = cudf.core.index.GenericIndex._concat( File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/contextlib.py", line 79, in inner return func(args, kwds) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/cudf/core/index.py", line 994, in _concat result = _concat_range_index(objs) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/contextlib.py", line 79, in inner return func(*args, *kwds) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/cudf/core/index.py", line 2991, in _concat_range_index result = as_index(concat_columns([x._values for x in indexes])) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/cudf/core/index.py", line 2991, in result = as_index(concat_columns([x._values for x in indexes])) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/functools.py", line 993, in get val = self.func(instance) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/contextlib.py", line 79, in inner return func(args, **kwds) File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/cudf/core/index.py", line 238, in _values return column.arange( File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/cudf/core/column/column.py", line 2321, in arange return libcudf.filling.sequence( File "filling.pyx", line 100, in cudf._lib.filling.sequence MemoryError: std::bad_alloc: out_of_memory: CUDA error at: /home/ankit/miniconda3/envs/rapids-22.08/include/rmm/mr/device/cuda_memory_resource.hpp Exception ignored in: <function Comms.del at 0x7f7abb71bc10> Traceback (most recent call last): File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/raft/dask/common/comms.py", line 134, in del File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/raft/dask/common/comms.py", line 226, in destroy File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/distributed/client.py", line 2780, in run File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/distributed/utils.py", line 338, in sync File "/home/ankit/miniconda3/envs/rapids-22.08/lib/python3.9/site-packages/distributed/utils.py", line 361, in sync RuntimeError: IOLoop is closed

Also I don't think from_cudf_edgelist will work on dask data frame, they are for cudf data frames i guess.

parkerzf commented 1 year ago

You are right @Ankitkurani1997 I think the right one should be

G.from_dask_cudf_edgelist(ddf, source='src', destination='dst', renumber=False)
Ankitkurani1997 commented 1 year ago

@shwina @ChuckHastings @madsbk @rlratzel is 1 billion edges too much for current implementation of from_dask_cudf_edgelist, for GPU memory : 23GB and main memory 128GB?

Because I am trying to create partition of 0.5 GB, I think with spilling feature set this should not have been a problem

So basically rapids has to implement this feature where if a Graph object does not fit into single GPU, then the spilling can be enabled.

Currently if a Graph object fits completely into GPU memory available, then only the cugraph dask algos will work. (Not taking into account uvm approach).. Please confirm this so that I can close this issue

I have already tried for uvm based approach and was able to run algos and get results. Was trying for partition based approach now

I think @beckernick can also comment here as he pointed this thing of spilling not enabled currently with cugraph dask algos.

ChuckHastings commented 1 year ago

You are reading the twitter data set with types of int32. This should result in one column (src) that is about 6 GB (I believe there are about 1.5 billion edges in the data set) and another column (dst) that is about 6 GB. This is probably too large for a 23 GB GPU, since we're going to convert that 12 GB of COO data into a CSR that will take about 8 GB. When combined with the temporary data structures required along the way, that's probably too much for a single GPU with 23 GB of memory. I would think a dask cluster with 2 or 4 GPUs would be able to handle this data, but I haven't experimented with the dask infrastructure too much and don't know what the dask, dask_cudf, or dask_cugraph overheads look like.

Ankitkurani1997 commented 1 year ago

@ChuckHastings I got your point, but what if I am trying to create graph containing 500 million edges from the same twitter dataset, and I am trying to create graph, and I am creating partition of size 1GB or 0.5GB, how is this thing handled in rapids then, won't single partition first be processed and the csr format be created for that partition only, and then the rest of the partitions can be loaded to the GPU and then processed, while the older partitions can be spilled back to disk or CPU?

My understanding was that partition based approach for the large graph processing will follow this, won't it?

Irrespective of how large the csv file is or the dataset file is, if I am making the partition of 1GB or 0.5 GB, this thing should work right? (without using uvm based approach) (I thought one of the approaches for handling GPU memory oversubscription was partition based)

So irrespective of the number of partitions we make, only single csr on the GPU will hold the complete graph?

I get that the performance can be bad but still ..

Please correct me if my understanding is incorrect

ChuckHastings commented 1 year ago

I think I see what you're trying to do. The short answer is, that won't work.

cuGraph is not integrated into dask in the same way that cudf is. cuGraph accepts a dask_cudf data frame, and can be launched from dask. But all of the cuGraph internal data structures are managed by cuGraph, not by dask. In fact, all of the cuGraph internal data structures are explicitly hidden from the outside infrastructure (e.g., dask). This allows us to use the same C++ back end to support parallelism via dask and parallelism using MPI (the more traditional HPC approach to distributed memory programming).

We have started some design discussions to create an approach which allows running cuGraph on a graph that is larger than the available GPU memory, potentially utilizing CPU host memory or disk/SSD. This will likely also be independent of the dask approach - as we hope to support a variety of parallel front-ends.

We don't currently have this capability on our road map, we're just starting the design conversations now. I would expect that we will not be able to begin implementation until Spring 2023 at the earliest.

Ankitkurani1997 commented 1 year ago

I think I see what you're trying to do. The short answer is, that won't work.

cuGraph is not integrated into dask in the same way that cudf is. cuGraph accepts a dask_cudf data frame, and can be launched from dask. But all of the cuGraph internal data structures are managed by cuGraph, not by dask. In fact, all of the cuGraph internal data structures are explicitly hidden from the outside infrastructure (e.g., dask). This allows us to use the same C++ back end to support parallelism via dask and parallelism using MPI (the more traditional HPC approach to distributed memory programming).

We have started some design discussions to create an approach which allows running cuGraph on a graph that is larger than the available GPU memory, potentially utilizing CPU host memory or disk/SSD. This will likely also be independent of the dask approach - as we hope to support a variety of parallel front-ends.

We don't currently have this capability on our road map, we're just starting the design conversations now. I would expect that we will not be able to begin implementation until Spring 2023 at the earliest.

Ok thanks @ChuckHastings

parkerzf commented 1 year ago

@Ankitkurani1997 @ChuckHastings Thanks for the discussion. Let me summarise my understanding and please let me know if we are on the same page :-)

cuGraph only uses dask to manage the parallelism but not the memory usage. As such, configurations in dask, dask.config.set(jit_unspill=True) and with LocalCUDACluster(n_workers=1, device_memory_limit="20GB"), doesn't work. cugraph only use the GPU memory but not the host memory in this mode.

I also have a follow up question. Since we load the graph from cudf dataframe, we should also consider the memory overhead from cudf while loading the graph. Is it correct?

Ankitkurani1997 commented 1 year ago

@parkerzf According to my understanding Dask is used mainly to manage parallelism and scale the packages or libraries to multi-core environment. It is not that the setting doesn't work, the internal data structures required while forming the graph are hidden from dask due to which it cannot spill the data to host.

yes definetely the memory required for the cudf dataframe has to be considered.

parkerzf commented 1 year ago

@Ankitkurani1997 Yep, I think that is exactly what I understand as well.

parkerzf commented 1 year ago

@ChuckHastings do we release the GPU memory of COO data after the CSR data generated? Or we keep both format in GPU memory even after the graph creation? If we drop the COO data, have we consider accepting CSR data directly to lower the required GPU memory during construction? Thanks!

ChuckHastings commented 1 year ago

I think @rlratzel can better answer that question. From the C++ side, we do not release the COO during graph construction. The API accepts const pointers to the COO and cannot release the memory. The python code could release the memory, although I think it does not. The caller could release the memory, although I'm not sure if the python code keeps a copy.

We could create an API that releases the memory as soon as it can within the C++ side (we have some other C++ APIs that do that, which does benefit the memory footprint in those cases. But the python code would have to be adapted to handle that, and I think that breaks some cudf/dask_cudf assumptions about immutable objects - so it might not be an appropriate think to integrate this way.