discovery-unicamp / dasf-core

Framework for computing Machine Learning algorithms in Python using Dask and RAPIDS AI.
MIT License
9 stars 2 forks source link

[FEA] Using RMM allocator for all GPU enabled tasks #9

Closed otavioon closed 1 year ago

otavioon commented 1 year ago

Describe the new feature you'd like to see

Some simple workflows executed in GPU fails when the size of the data increases, due to Cuda Out-of-Memory Errors. Part of this happens due to usage of the cupy allocator, instead of RAPIDS one (1, 2, 3).

NOTE: I don't known if this should be put on Dasf-seimic or dasf-core.

Consider the simple code below (attribute_extract_test.py), which calculates the DominantFrequency attribute, for a random array of shape (2000, 2000, 2000) and stores it in a zarr file:

# attribute_extract_test.py  file

import argparse
import cupy as cp
import rmm
import dask.array as da
from dask.distributed import Client
from dasf_seismic.attributes.texture import GLCMDissimilarity as Attribute
from dasf_seismic.attributes.complex_trace import DominantFrequency as Attribute
import json
import time

def cupy_to_numpy(x):
    return cp.asnumpy(x)

def main(schedule_file: str, output: str, data_shape: tuple = (2000, 2000, 2000)):  
    with open(schedule_file, "r") as f:
        json_data = json.load(f)
        address = json_data["address"]

    # Initializing client
    print(f"Connecting to scheduler at {address}...")
    client = Client(address)

    # Creating graph
    state = da.random.RandomState(42)
    data = state.random(data_shape, chunks="auto")
    data = data.map_blocks(cp.asarray)
    data = Attribute()._lazy_transform_gpu(data)
    data = data.map_blocks(cupy_to_numpy)
    data = data.to_zarr(output, compute=False, overwrite=True)

    # Computation
    print("Starting computation...")
    start = time.time()
    client.compute(data, sync=True)
    end = time.time()
    print(f"Done. It took {end-start:.3f} seconds")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--scheduler-file", type=str, required=True, help="Json file containing the scheduler configuration")
    parser.add_argument("--output", default="test.zarr", type=str, required=False)

    args = parser.parse_args()
    print(args)
    main(args.scheduler_file, args.output)

The execution is done, running the following commands, in order, in a single machine, using 3 distinct terminal, each one in a distinct singularity container:

  1. The scheduler was launched using the command: DASK_DISTRIBUTED__SCHEDULER__WORK_SATURATION=1.1 dask scheduler --port 8786 --dashboard-address 8787 --protocol tcp --scheduler-file scheduler.json.
  2. The workers were launched using the command: CUDA_VISIBLE_DEVICES=0,1,2,3 dask-cuda-worker --scheduler-file scheduler.json --protocol tcp
  3. The client code is launched using python attribute_extract_test.py --scheduler-file scheduler.json

The execution fails with the following error, in the client terminal:

...
  File "/home/otavio.napoli/.local/lib/python3.9/site-packages/dasf_seismic/attributes/complex_trace.py", line 34, in __real_signal_hilbert
    return xsignal.hilbert(X)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cusignal/filtering/filtering.py", line 882, in hilbert
    Xf = cp.fft.fft(x, N, axis=axis)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cupy/fft/_fft.py", line 680, in fft
    return _fft(a, (n,), (axis,), norm, cupy.cuda.cufft.CUFFT_FORWARD)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cupy/fft/_fft.py", line 245, in _fft
    a = _fft_c2c(a, direction, norm, axes, overwrite_x, plan=plan)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cupy/fft/_fft.py", line 210, in _fft_c2c
    a = _exec_fft(a, direction, 'C2C', norm, axis, overwrite_x, plan=plan)
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cupy/fft/_fft.py", line 187, in _exec_fft
    out = plan.get_output_array(a)
  File "cupy/cuda/cufft.pyx", line 720, in cupy.cuda.cufft.Plan1d.get_output_array
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/cupy/_creation/basic.py", line 22, in empty
    return cupy.ndarray(shape, dtype, order=order)
  File "cupy/_core/core.pyx", line 171, in cupy._core.core.ndarray.__init__
  File "cupy/cuda/memory.pyx", line 698, in cupy.cuda.memory.alloc
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/rmm/rmm.py", line 232, in rmm_cupy_allocator
    buf = librmm.device_buffer.DeviceBuffer(size=nbytes, stream=stream)
  File "device_buffer.pyx", line 88, in rmm._lib.device_buffer.DeviceBuffer.__cinit__
MemoryError: std::bad_alloc: out_of_memory: CUDA error at: /opt/conda/envs/rapids/include/rmm/mr/device/cuda_memory_resource.hpp

Describe alternatives you've considered

Alternative 1 (using RMM allocator instead of cupy)

Following (1, 2, 3), a solution may involve using RMM allocator instead of cupy. Thus the attribute_extract_test.py could be adapted to this, by adding the lines:

...
    # Initializing client
    print(f"Connecting to scheduler at {address}...")
    client = Client(address)

    + client.run(cp.cuda.set_allocator, rmm.rmm_cupy_allocator)
    + rmm.reinitialize(managed_memory=True)
    + cp.cuda.set_allocator(rmm.rmm_cupy_allocator)

    # Creating graph
    state = da.random.RandomState(42)
...

This forces the usage of RMM allocator instead of cupy in all workers and also in the client. However, this still raises the same error (cuda OOM) when executing.

Alternative 2 (Alternative 1 + lauching workers with managed memory)

Iterating over the modified code (alternative 1), I found that dask-cuda-worker was not launched properly. We must inform dask-cuda-worker to manage RMM memory. Thus, I changed the dask-cuda-worker command to this to allow using managed memory and spill:

export CUDA_VISIBLE_DEVICES=0,1,2,3  dask-cuda-worker --scheduler-file scheduler.json --memory-limit 0.8 --device-memory-limit 0.8 --rmm-managed-memory --enable-jit-unspill --shared-filesystem --protocol tcp 

With these modifications, the workflow was successfully executed, without any errors, in 219.617 seconds.

I did some other tests with other attributes, such as Semblance, GLCMDissimilarity, and LocalBinaryPattern3D and using this strategy to start the workers and configure RMM, no Out-of-memory was raised (besides the slowness).

Feature Request

I suggest adapting worker launch scripts (which call dask-cuda-worker) to enable RMM when possible. I also suggest that DASF auto-configure the workers to use RMM allocator instead of Cupy one, by running the following code when a client is instantiated and a GPU cluster is used:

client.run(cp.cuda.set_allocator, rmm.rmm_cupy_allocator)
rmm.reinitialize(managed_memory=True)
cp.cuda.set_allocator(rmm.rmm_cupy_allocator)

This should be done in a way that is transparent to the user. Maybe in the DaskPipelineExecutor?

Additional context

All scripts were executed in a single-machine with 4 Tesla V100 GPUs (32GB, each), inside singularity container.