NVIDIA-Merlin / NVTabular

NVTabular is a feature engineering and preprocessing library for tabular data designed to quickly and easily manipulate terabyte scale datasets used to train deep learning based recommender systems.
Apache License 2.0
1.05k stars 143 forks source link

[QST]How Do I Solve the Problem that Missing Values Cannot Be Converted to Int Values? #1770

Open gukejun1 opened 1 year ago

gukejun1 commented 1 year ago

When I run the case in , an error is reported.

2023-02-22 16:07:04,128 - distributed.worker - WARNING - Compute Failed
Key:       ('write-processed-db337936ac67baee573cd5fd6543337d-partitiondb337936ac67baee573cd5fd6543337d', "('part_4.parquet',)")
Function:  _write_subgraph
args:      (<merlin.io.dask.DaskSubgraph object at 0x7f1772bc9100>, ('part_4.parquet',), '/raid/data/criteo/test_dask/output/train/', <Shuffle.PER_PARTITION: 0>, <fsspec.implementations.local.LocalFileSystem object at 0x7f175d0740a0>, ['C1', 'C2', 'C3', 'C4', 'C5', 'C6', 'C7', 'C8', 'C9', 'C10', 'C11', 'C12', 'C13', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21', 'C22', 'C23', 'C24', 'C25', 'C26'], ['I1', 'I2', 'I3', 'I4', 'I5', 'I6', 'I7', 'I8', 'I9', 'I10', 'I11', 'I12', 'I13'], ['label'], 'parquet', 0, True, '')
kwargs:    {}
Exception: "ValueError('cannot convert NA to integer')"

Traceback (most recent call last):
  File "/Merlin/examples/scaling-criteo/02_etl_with_nvtabular.py", line 190, in <module>
    main()
  File "/Merlin/examples/scaling-criteo/02_etl_with_nvtabular.py", line 167, in main
    workflow.transform(train_dataset).to_parquet(
  File "/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py", line 910, in to_parquet
    _ddf_to_dataset(
  File "/usr/local/lib/python3.8/dist-packages/merlin/io/dask.py", line 367, in _ddf_to_dataset
    out = client.compute(out).result()
  File "/usr/local/lib/python3.8/dist-packages/distributed/client.py", line 280, in result
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py", line 101, in inner
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/merlin/io/dask.py", line 202, in _write_subgraph
    table = subgraph[part]
  File "/usr/local/lib/python3.8/dist-packages/merlin/io/dask.py", line 53, in __getitem__
    return dask.get(dsk, key)
  File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 557, in get_sync
    return get_async(
  File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 500, in get_async
    for key, res_info, failed in queue_get(queue).result():
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 542, in submit
    fut.set_result(fn(*args, **kwargs))
  File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 238, in batch_execute_tasks
    return [execute_task(*a) for a in it]
  File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 238, in <listcomp>
    return [execute_task(*a) for a in it]
  File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 229, in execute_task
    result = pack_exception(e, dumps)
  File "/usr/local/lib/python3.8/dist-packages/dask/local.py", line 224, in execute_task
    result = _execute_task(task, data)
  File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/usr/local/lib/python3.8/dist-packages/dask/optimization.py", line 990, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 149, in get
    result = _execute_task(task, cache)
  File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 113, in _execute_task
    return [_execute_task(a, cache) for a in arg]
  File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 113, in <listcomp>
    return [_execute_task(a, cache) for a in arg]
  File "/usr/local/lib/python3.8/dist-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py", line 89, in __call__
    return read_parquet_part(
  File "/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py", line 587, in read_parquet_part
    dfs = [
  File "/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py", line 588, in <listcomp>
    func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
  File "/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py", line 88, in read_partition
    part[k] = part[k].astype(type_name.replace("Int", "int"))
  File "/usr/local/lib/python3.8/dist-packages/pandas/core/generic.py", line 6240, in astype
    new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
  File "/usr/local/lib/python3.8/dist-packages/pandas/core/internals/managers.py", line 448, in astype
    return self.apply("astype", dtype=dtype, copy=copy, errors=errors)
  File "/usr/local/lib/python3.8/dist-packages/pandas/core/internals/managers.py", line 352, in apply
    applied = getattr(b, f)(**kwargs)
  File "/usr/local/lib/python3.8/dist-packages/pandas/core/internals/blocks.py", line 526, in astype
    new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
  File "/usr/local/lib/python3.8/dist-packages/pandas/core/dtypes/astype.py", line 299, in astype_array_safe
    new_values = astype_array(values, dtype, copy=copy)
  File "/usr/local/lib/python3.8/dist-packages/pandas/core/dtypes/astype.py", line 227, in astype_array
    values = values.astype(dtype, copy=copy)
  File "/usr/local/lib/python3.8/dist-packages/pandas/core/arrays/masked.py", line 471, in astype
    raise ValueError("cannot convert NA to integer")
ValueError: cannot convert NA to integer

Why is the error still reported that nan cannot be converted to an int value? The official website handles the problem of missing values. How to solve this problem?

gukejun1 commented 1 year ago

https://nvidia-merlin.github.io/Merlin/main/examples/scaling-criteo/02-ETL-with-NVTabular.html# When run this code

rnyak commented 1 year ago

@gukejun1 can you provide more info about your env? how and where did you install merlin libraries? Are you using a docker image? if yes, which docker image? thanks.

gukejun1 commented 1 year ago

the code is same from this, I use docker images(nvcr.io/nvidia/merlin/merlin-tensorflow 22.12)

gukejun1 commented 1 year ago

@rnyak this is my full code


BASE_DIR = os.environ.get("BASE_DIR", "/raid/data/criteo")
    INPUT_DATA_DIR = os.environ.get("INPUT_DATA_DIR", BASE_DIR + "/converted/criteo")
    OUTPUT_DATA_DIR = os.environ.get("OUTPUT_DATA_DIR", BASE_DIR + "/test_dask/output")
    USE_HUGECTR = bool(os.environ.get("USE_HUGECTR", ""))
    print(USE_HUGECTR)
    stats_path = os.path.join(OUTPUT_DATA_DIR, "test_dask/stats")
    dask_workdir = os.path.join(OUTPUT_DATA_DIR, "test_dask/workdir")

    # Make sure we have a clean worker space for Dask
    if os.path.isdir(dask_workdir):
        shutil.rmtree(dask_workdir)
    os.makedirs(dask_workdir)

    # Make sure we have a clean stats space for Dask
    if os.path.isdir(stats_path):
        shutil.rmtree(stats_path)
    os.mkdir(stats_path)

    # Make sure we have a clean output path
    if os.path.isdir(OUTPUT_DATA_DIR):
        shutil.rmtree(OUTPUT_DATA_DIR)
    os.mkdir(OUTPUT_DATA_DIR)

    fname = "day_{}.parquet"
    num_days = len(
        [i for i in os.listdir(INPUT_DATA_DIR) if re.match(fname.format("[0-9]{1,2}"), i) is not None]
    )
    train_paths = [os.path.join(INPUT_DATA_DIR, fname.format(day)) for day in range(num_days - 1)]
    valid_paths = [
        os.path.join(INPUT_DATA_DIR, fname.format(day)) for day in range(num_days - 1, num_days)
    ]

    train_paths="/raid/data/criteo/converted/criteo/day_0_40000000.parquet"
    valid_paths="/raid/data/criteo/converted/criteo/day_1_4000000.parquet"
    print(train_paths)
    print(valid_paths)

    # Dask dashboard
    dashboard_port = "8787"

    protocol = "tcp"  # "tcp" or "ucx"
    if numba.cuda.is_available():
        NUM_GPUS = list(range(len(numba.cuda.gpus)))
    else:
        NUM_GPUS = []
    visible_devices = ",".join([str(n) for n in NUM_GPUS])  # Select devices to place workers
    device_limit_frac = 0.7  # Spill GPU-Worker memory to host at this limit.
    device_pool_frac = 0.8
    part_mem_frac = 0.15

    # Use total device size to calculate args.device_limit_frac
    device_size = device_mem_size(kind="total")
    device_limit = int(device_limit_frac * device_size)
    device_pool_size = int(device_pool_frac * device_size)
    part_size = int(part_mem_frac * device_size)

    # Check if any device memory is already occupied
    for dev in visible_devices.split(","):
        fmem = pynvml_mem_size(kind="free", index=int(dev))
        used = (device_size - fmem) / 1e9
        if used > 1.0:
            warnings.warn(f"BEWARE - {used} GB is already occupied on device {int(dev)}!")

    cluster = None  # (Optional) Specify existing scheduler port
    if cluster is None:
        cluster = LocalCUDACluster(
            protocol=protocol,
            n_workers=len(visible_devices.split(",")),
            CUDA_VISIBLE_DEVICES=visible_devices,
            device_memory_limit=device_limit,
            local_directory=dask_workdir,
            dashboard_address=":" + dashboard_port,
            rmm_pool_size=(device_pool_size // 256) * 256
        )

    # Create the distributed client
    client = Client(cluster)
    print(client)

    # define our dataset schema
    CONTINUOUS_COLUMNS = ["I" + str(x) for x in range(1, 14)]
    CATEGORICAL_COLUMNS = ["C" + str(x) for x in range(1, 27)]
    LABEL_COLUMNS = ["label"]
    COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS + LABEL_COLUMNS

    num_buckets = 10000000
    categorify_op = Categorify(out_path=stats_path, max_size=num_buckets, dtype='int32')
    # categorify_op = Categorify(out_path=stats_path, max_size=num_buckets, dtype=np.zeros(0))
    cat_features = CATEGORICAL_COLUMNS >> categorify_op
    cont_features = CONTINUOUS_COLUMNS >> FillMissing() >> Clip(min_value=0) >> Normalize(out_dtype='float32')
    # cont_features = CONTINUOUS_COLUMNS >> FillMissing() >> Clip(min_value=0) >> Normalize(out_dtype=np.zeros(0))
    label_features = LABEL_COLUMNS >> AddMetadata(
        tags=[str(Tags.BINARY_CLASSIFICATION), "target"]
    )

    features = cat_features + cont_features + label_features
    workflow = nvt.Workflow(features)

    dict_dtypes = {}

    # The environment variable USE_HUGECTR defines, if we want to use the output for HugeCTR or another framework
    for col in CATEGORICAL_COLUMNS:
        dict_dtypes[col] = np.int64 if USE_HUGECTR else np.int32

    for col in CONTINUOUS_COLUMNS:
        dict_dtypes[col] = np.float32

    for col in LABEL_COLUMNS:
        dict_dtypes[col] = np.int32

    print(dict_dtypes)

    train_dataset = nvt.Dataset(train_paths, engine="parquet", part_size=part_size,
                                )

    valid_dataset = nvt.Dataset(valid_paths, engine="parquet", part_size=part_size,
                                )

    output_train_dir = os.path.join(OUTPUT_DATA_DIR, "train/")
    output_valid_dir = os.path.join(OUTPUT_DATA_DIR, "valid/")
    # ! mkdir -p $output_train_dir
    # ! mkdir -p $output_valid_dir

    print(workflow)
     workflow.fit(train_dataset)

    # train_dataset.fillna(0, inplace=True)

    workflow.transform(train_dataset).to_parquet(
        output_files=len(NUM_GPUS),
        output_path=output_train_dir,
        shuffle=nvt.io.Shuffle.PER_PARTITION,
        dtypes=dict_dtypes,
        cats=CATEGORICAL_COLUMNS,
        conts=CONTINUOUS_COLUMNS,
        labels=LABEL_COLUMNS,
    )

    workflow.transform(valid_dataset).to_parquet(
        output_path=output_valid_dir,
        dtypes=dict_dtypes,
        cats=CATEGORICAL_COLUMNS,
        conts=CONTINUOUS_COLUMNS,
        labels=LABEL_COLUMNS,
    )

    workflow.save(os.path.join(OUTPUT_DATA_DIR, "workflow"))

I install merlin libraries from the web of https://catalog.ngc.nvidia.com/orgs/nvidia/teams/merlin/containers/merlin-tensorflow

gukejun1 commented 1 year ago

@rnyak The training data is from the first 40 million rows of day_0 in the criteo data set, and the verification data is from the first 4 million rows of day_1.The following figure shows some parquet data visualization. image

rnyak commented 1 year ago

@gukejun1 if you have null values, normally, when you apply the following lines in the NVT workflow the missing/null values should be filled..

cat_features = CATEGORICAL_COLUMNS >> categorify_op
cont_features = CONTINUOUS_COLUMNS >> FillMissing() >> Clip(min_value=0) >> Normalize(out_dtype='float32')

can you share a subset of your parquet file like only couple hundreds rows, so that we can reproduce the issue? thanks.

gukejun1 commented 1 year ago

@rnyak day_1_100.parquet.txt The data comes from the first 100 lines of data in criteo day_1 and is converted to the parquet file using the official method \Merlin\examples\scaling-criteo\01_download_convert.ipynb.

rnyak commented 1 year ago

@gukejun1 I used your small dataset with this notebook and all worked fine for me. I cannot reproduce your error.. are you able to reproduce your error only with this small parquet file?

gukejun1 commented 1 year ago

@rnyak Very strange. image

rnyak commented 1 year ago

@gukejun1 please note that your screenshot shows that you are trying to read in a .parquet.txt file, not a .parquet file, that means your files extension type is not correct. it should be .parquet.

gukejun1 commented 1 year ago

@rnyak It's the same. The only difference is that the file name extension is in the parquet format. Because GitHub cannot upload files with the parquet file name extension, the file name extension is changed to txt. image

gukejun1 commented 1 year ago

@rnyak So, this code didn't work. image

gukejun1 commented 1 year ago

@rnyak Because my graphics card supports up to cuda 11.3, so I reinstalled cupy-cuda to 113. Is it related to this? Does cupy-cuda 113 support populating missing values?

rnyak commented 1 year ago

@gukejun1 what's your graphic card?

your sample set does not have any nulls in the label column. so I am skeptical that this line gives you error. you can remove this line and test it. you can do like below.

CONTINUOUS_COLUMNS = ["I" + str(x) for x in range(1, 14)]
CATEGORICAL_COLUMNS = ["C" + str(x) for x in range(1, 27)]
LABEL_COLUMNS = ["label"]
COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS + LABEL_COLUMNS

num_buckets = 10000000
categorify_op = Categorify(out_path=stats_path, max_size=num_buckets, dtype='int32')
cat_features = CATEGORICAL_COLUMNS >> categorify_op
cont_features = CONTINUOUS_COLUMNS >> FillMissing() >> Clip(min_value=0) >> Normalize(out_dtype='float32')

features = cat_features + cont_features

workflow = nvt.Workflow(features)
...
...
gukejun1 commented 1 year ago

@rnyak The error is still reported. image my graphic card is NVIDIA Tesla P4

rnyak commented 1 year ago

@gukejun1 cudf supports Pascal architecture or better (Compute Capability >=6.0) . see this doc.

can you test if you are able to run the notebooks 01 and 02 in this folder?

thanks.

rnyak commented 1 year ago

@gukejun1 the error looks like because of pandas, and looks like you are running on CPU not on GPU... Please confirm that the visible devices from the following code below does not return empty. if it is empty that means you dont use GPU..

 protocol = "tcp"  # "tcp" or "ucx"
    if numba.cuda.is_available():
        NUM_GPUS = list(range(len(numba.cuda.gpus)))
    else:
        NUM_GPUS = []
    visible_devices = ",".join([str(n) for n in NUM_GPUS])  # Select devices to place workers
gukejun1 commented 1 year ago

@rnyak For the movie_lens case, 01 / 02 is successful. 1、 requirements.txt 2、 image

3、i use docker pull nvcr.io/nvidia/merlin/merlin-tensorflow:22.12 to get the docker images.

gukejun1 commented 1 year ago

@rnyak it used GPU image