rapidsai / node

GPU-accelerated data science and visualization in node
https://rapidsai.github.io/node/
Apache License 2.0
186 stars 20 forks source link

[QUESTION] Sharing a Python cudf, or better, dask-cudf? #161

Open lmeyerov opened 3 years ago

lmeyerov commented 3 years ago

Hi @trxcllnt & team!

Curious if any pointers on this. Ultimately, we're trying to setup a basic ~LRU over Python-managed dask-cudf objects (raw ddf or published) and have node do reads+writes with it. That's probably too big a leap for now, but I'm wondering if we can do something in the near term by scoping down to just CUDA IPC + cudf, or something similar?

Ultimate ideal case: multi-gpu dask

Ideally for the read direction, for example:

Dask-based Python publish:

ddf = dask.dataframe....
client.persist(ddf)
client.publish_dataset(xyz=ddf)

Dask-based JS load:

var ddf = nodeDaskCudf.getDataset('xyz'); // does not exist
var gdf = ddf.compute(); // does not exist
var arr = ddf.toArrow();

Near-term workaround via cudf + CUDA IPC

So I'm wondering: If we scoped down to python cudf <> node cudf, would this become doable, say via CUDA IPC?

Ex: Read direction

Python cudf pointer publish:

ddf = dask.dataframe....
gdf = client.compute(ddf)
handle : str = gdf.get_ipc_handle() # RMM / UCX basically do this already afaict
request.post(f'/nodejs/?handle={handle})

JS cudf readback:

function cb(handle) {
   var gdf = cudf.fromIPCHandle(handle); // ???
   var arr = gdf.toArrow();
}
trxcllnt commented 3 years ago

@lmeyerov we don't have deserialization/unpickling of the Python IPC data structures yet, but with a bit of effort it can be done with CUDA IPC primitives. The modules/demo/ipc folder has a few examples, but the key bits are here:

This example is sharing the raw buffers via CUDA IPC, but with a bit more metadata from Python you could construct Series of the correct Arrow type from each set of buffers via Series.new({ type: <type>, data: dataFromIPC, nullMask: maskFromIPC }).

Alternatively we could write bindings to read Arrow IPC data already on device, similar to Python's GpuArrowReader. This uses the Arrow C++ MessageReader + CUDABufferReader. Currently you'd have to serialize the table to a host buffer, copy host to device, then export the device buffer via CUDA IPC, but we could add a GpuArrowWriter in cuDF to avoid this step.

lmeyerov commented 3 years ago

Great, yeah I was looking those, and indeed, was trying to figure out if this would make more sense as a collection of fat pointers or via a GPU readback! Agreed that Python passing a dict of IPC handles + arrow schema seems more friendly & predictable for now: makes fewer impl assumptions, just arrow schema + per-col IPC handles. (And I believe CUDA handles IPC handle GC via client bindings tying them to lang GCs)

-- The examples show getting 1 IPC handle per cudf column, yet the JS arrow constructor wants a nullMask too. What would happen there? I'm not sure what happens if python did something like sliced_gdf = cudf.DataFrame(...)[:20] or had NaNs and tried sharing, what'd be the data vs mask cols? And to do safely, should we also concretize somehow?

-- What about the reverse direction? The initial focus here is Python -> JS <> JS, so Python can be our source of truth, but would be helpful to go the other way (JS -> Python), with the bonus of also revealing how to do JS <> JS

-- Sanity check: For the above, that'd be Series.new via SeriesProps @ https://github.com/rapidsai/node-rapids/blob/main/modules/cudf/src/series.ts#L50 ?

trxcllnt commented 3 years ago

And I believe CUDA handles IPC handle GC via client bindings tying them to lang GCs

CUDA will clean these up on shutdown, otherwise you'll be at the whim of JS's GC. If you let the IpcMemory instance go out of scope, the C++ Finalize method will close the CUDA IPC handle. That's not necessarily deterministic, so you should close it explicitly via ipcMem.close() if Python needs to do something else with the underlying buffers.

the JS arrow constructor wants a nullMask too

The Series nullMask is optional, like in Python.

what'd be the data vs mask cols

Both are the raw buffers. To communicate data and nulls to JS, you'd have to IPC both data and mask buffers in Python.

What about the reverse direction

Would have to do the same thing, but in reverse -- create handles for underlying buffers, JSON tree or w/e to describe the types + IPC handle strings, ship over to other process, deserialize. This only really works between GPUs single-node, so eventually tapping into UCX would be ideal.

how to do JS <> JS

There is a test for IPC between node processes here.

-- Sanity check: For the above, that'd be Series.new via SeriesProps @ https://github.com/rapidsai/node-rapids/blob/main/modules/cudf/src/series.ts#L50 ?

Yep!

lmeyerov commented 3 years ago

Ah, cool, for reference for folks googling, cudf has series::nullmask : https://docs.rapids.ai/api/cudf/stable/api.html#cudf.core.series.Series.nullmask

So that just about ties it off

I'm not sure what happens if python did something like sliced_gdf = cudf.DataFrame(...)[:20]

Any guidance on ^^^^? Is it safe to directly pass the handle to the data/mask, or do we need some sort of index buffer too / deep copy?

trxcllnt commented 3 years ago

In order to IPC buffers that back a (zero-copy) sliced Python Series, you'd have to IPC the entire original buffer + communicate the offset + length along with the IPC handle, like below:

import {
  Series,
  IpcMemory,
  Float32, Float64,
  Int8, Int16, Int32, Int64, 
  Uint8, Uint16, Uint32, Uint64,
} from '@nvidia/cuda';

function openMemHandlesAsSeries(buffers) {
  const { dtype = 'int8', offset = 0, length = 0 } = buffers.data || {};
  const data = buffers.data ? new IpcMemory(buffers.data.handle) : null;
  const nullMask = buffers.mask ? new IpcMemory(buffers.mask.handle) : null;
  switch (dtype) {
    case 'int8': return Series.new({type: new Int8, data, nullMask, offset, length});
    case 'int16': return Series.new({type: new Int16, data, nullMask, offset, length});
    case 'int32': return Series.new({type: new Int32, data, nullMask, offset, length});
    case 'int64': return Series.new({type: new Int64, data, nullMask, offset, length});
    case 'uint8': return Series.new({type: new Uint8, data, nullMask, offset, length});
    case 'uint16': return Series.new({type: new Uint16, data, nullMask, offset, length});
    case 'uint32': return Series.new({type: new Uint32, data, nullMask, offset, length});
    case 'uint64': return Series.new({type: new Uint64, data, nullMask, offset, length});
    case 'float32': return Series.new({type: new Float32, data, nullMask, offset, length});
    case 'float64': return Series.new({type: new Float64, data, nullMask, offset, length});
  }
}
import numba
import numpy as np

# returns:
# {
#   'buffers': {
#     'data': { 'h', 'handle', 'dtype', 'length', 'offset' } | None,
#     'mask': { 'h', 'handle', 'dtype', 'length' } | None,
#   }
# }
def series_to_ipc_handles(sr):
    # TODO: recurse through child Series if sending nested dtypes
    buffers = {}
    buffers.update(data=buffer_to_ipc_handle_info(sr._column.data, sr.dtype))
    buffers.update(mask=buffer_to_ipc_handle_info(sr._column.mask, np.dtype(np.uint8)))
    if buffers['data'] is not None:
        # sr._column.offset is the offset in number of elements:
        # https://github.com/rapidsai/cudf/blob/bc9903a3295aefd39d63bf15b312752daa6645bc/python/cudf/cudf/_lib/column.pyx#L117-L118
        buffers['data'].update(offset=sr._column.offset)
    # mask is never sliced, it will always start at byte offset 0:
    # https://github.com/rapidsai/cudf/blob/bc9903a3295aefd39d63bf15b312752daa6645bc/python/cudf/cudf/_lib/column.pyx#L163
    return buffers

def buffer_to_ipc_handle_info(buf, dtype):
    if buf is not None:
        size = buffer.size // dtype.itemsize
        ary = device_array_from_ptr(buf.ptr, size, dtype)
        handle = darray.get_ipc_handle()
        return {
            # keep ref to handle b/c numba closes it on destruction
            'h': handle,
            # serialize handle octets to Python array for json.dumps
            'handle': np.array(handle._ipc_handle.handle).tolist(),
            'dtype': str(dtype)
            'length': size,
        }
    return None

def device_array_from_ptr(ptr, nelem, dtype=np.float, finalizer=None):
    """
    device_array_from_ptr(ptr, size, dtype=np.float, stream=0)
    Create a Numba device array from a ptr, size, and dtype.
    """
    # Handle Datetime Column
    if dtype == np.datetime64:
        dtype = np.dtype("datetime64[ms]")
    else:
        dtype = np.dtype(dtype)

    elemsize = dtype.itemsize
    datasize = elemsize * nelem
    shape = (nelem,)
    strides = (elemsize,)
    # note no finalizer -- freed externally!
    ctx = numba.cuda.current_context()
    ptr = ctypes.c_uint64(int(ptr))
    mem = numba.cuda.MemoryPointer(ctx, ptr, datasize, finalizer=finalizer)
    return numba.cuda.cudadrv.devicearray.DeviceNDArray(
        shape, strides, dtype, gpu_data=mem
    )
lmeyerov commented 3 years ago

This is great, thanks @trxcllnt . This will be a fun start to PTO next week!

(The more that I've thought about it, the more I see possible here. E.g., enabling our python API users to share IPC handles!)