NVIDIA-Merlin / NVTabular

NVTabular is a feature engineering and preprocessing library for tabular data designed to quickly and easily manipulate terabyte scale datasets used to train deep learning based recommender systems.
Apache License 2.0
1.03k stars 143 forks source link

[BUG] LambdaOp doesnt work with multi-GPU cluster, multiple output workers #1626

Open bschifferer opened 2 years ago

bschifferer commented 2 years ago

Describe the bug I run a multi=GPU NVTabular workflow with LambdaOps. It executes the fit functionality, but when the pipeline does tramsform and to parquet, I get following error. I I run on single GPU, it does work.

Failed to transform operator <nvtabular.ops.lambdaop.LambdaOp object at 0x7fbb4a93f2e0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/nvtabular/workflow/workflow.py", line 539, in _transform_partition
    f"Dtype discrepancy detected for column {col_name}: "
  File "/usr/local/lib/python3.8/dist-packages/nvtabular/ops/lambdaop.py", line 103, in label
    source = getsourcelines(self.f)[0][0]
  File "/usr/lib/python3.8/inspect.py", line 979, in getsourcelines
    lines, lnum = findsource(object)
  File "/usr/lib/python3.8/inspect.py", line 798, in findsource
    raise OSError('could not get source code')
OSError: could not get source code
2022-07-20 10:17:49,872 - distributed.worker - WARNING - Compute Failed
Key:       ('write-processed-2c66d3a6a6f1b9c54db536c61f1e311e-partition2c66d3a6a6f1b9c54db536c61f1e311e', "('part_50.parquet', 'part_51.parquet', 'part_52.parquet', 'part_53.parquet', 'part_54.parquet', 'part_55.parquet', 'part_56.parquet', 'part_57.parquet', 'part_58.parquet', 'part_59.parquet')")
Function:  _write_subgraph
args:      (<merlin.io.dask.DaskSubgraph object at 0x7fbb4ab4d550>, ('part_50.parquet', 'part_51.parquet', 'part_52.parquet', 'part_53.parquet', 'part_54.parquet', 'part_55.parquet', 'part_56.parquet', 'part_57.parquet', 'part_58.parquet', 'part_59.parquet'), '/raid/moj_feed_data_v1_sample1_parquet_test/', None, <fsspec.implementations.local.LocalFileSystem object at 0x7fbb4aaee2b0>, [], [], [], 'parquet', 0, False, '')
kwargs:    {}
Exception: "OSError('could not get source code')"

Steps/Code to reproduce bug I use the criteo dask cluster initalization

# Dask dashboard
dashboard_port = "8787"
dask_workdir = '/raid/dask/'

# Deploy a Single-Machine Multi-GPU Cluster
protocol = "tcp"  # "tcp" or "ucx"
if numba.cuda.is_available():
    NUM_GPUS = list(range(len(numba.cuda.gpus)))
else:
    NUM_GPUS = []
visible_devices = ",".join([str(n) for n in NUM_GPUS])  # Delect devices to place workers
device_limit_frac = 0.7  # Spill GPU-Worker memory to host at this limit.
device_pool_frac = 0.8
part_mem_frac = 0.15

# Use total device size to calculate args.device_limit_frac
device_size = device_mem_size(kind="total")
device_limit = int(device_limit_frac * device_size)
device_pool_size = int(device_pool_frac * device_size)
part_size = int(part_mem_frac * device_size)

# Check if any device memory is already occupied
for dev in visible_devices.split(","):
    fmem = pynvml_mem_size(kind="free", index=int(dev))
    used = (device_size - fmem) / 1e9
    if used > 1.0:
        warnings.warn(f"BEWARE - {used} GB is already occupied on device {int(dev)}!")

cluster = None  # (Optional) Specify existing scheduler port
if cluster is None:
    cluster = LocalCUDACluster(
        protocol=protocol,
        n_workers=len(visible_devices.split(",")),
        CUDA_VISIBLE_DEVICES=visible_devices,
        device_memory_limit=device_limit,
        local_directory=dask_workdir,
        dashboard_address=":" + dashboard_port,
        rmm_pool_size=(device_pool_size // 256) * 256
    )

# Create the distributed client
client = Client(cluster)
client

E.g.

col_cat_int8 = col_cat_int8 >> nvt.ops.Categorify() >> nvt.ops.LambdaOp(lambda x: x.astype('int8'))
workflow.transform(dataset).to_parquet(output_path, out_files_per_proc=10)
karlhigley commented 2 years ago

The only thing I know that serialized Python lambdas is cloudpickle, so I guess we could serialize the Workflow to cloudpickle and then deserialize it on the workers? Not too sure how to do that, but it's the only approach I can think of.

benfred commented 2 years ago

@bschifferer, as a short-term workaround - can you try this instead to explicitly set the dtype in the LambdaOp?

col_cat_int8 = col_cat_int8 >> nvt.ops.Categorify() >> nvt.ops.LambdaOp(lambda x: x.astype('int8'), dtype="int8")
workflow.transform(dataset).to_parquet(output_path, out_files_per_proc=10)

This should at least cause your workflow to execute on multiple workers.

I think the root cause of the issue here is that the 'capture_dtypes' functionality doesn't work on a distributed cluster environment - since the dtypes are only captured here https://github.com/NVIDIA-Merlin/NVTabular/blob/bc71785aab96368fb51f60d876fdfecba709e494/nvtabular/workflow/workflow.py#L534-L535 on the worker process, and the dtypes for each node in the graph aren't communicated back to the actual workflow.

The LambdaOp issue is a red-herring - we're already using cloudpickle under the hood (either to save the workflow itself, or distribute work with dask https://distributed.dask.org/en/stable/serialization.html#defaults . The issue was in raising the exception, called the LambdaOp.label functionality which failed =(. I've fixed the LambdaOp.label call here https://github.com/NVIDIA-Merlin/NVTabular/pull/1634- and added a basic test using LambdaOp's with dask there too

bschifferer commented 2 years ago

Yes that worked around works for me - thanks