pytorch / torchrec

Pytorch domain library for recommendation systems
https://pytorch.org/torchrec/
BSD 3-Clause "New" or "Revised" License
1.95k stars 441 forks source link

Error occurs when running `nvt_preproc.sh` #912

Open allenfengjr opened 1 year ago

allenfengjr commented 1 year ago

Hello, I am trying to pre-process criteo TB dataset using NVTabular. I run the nvt_preproc.sh nvt_preproc but get errors.

This is the error code.

2022-12-21 15:04:51,981 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-12-21 15:04:51,982 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/merlin/io/dataset.py:251: UserWarning: Initializing an NVTabular Dataset in CPU mode.This is an experimental feature with extremely limited support!
  warnings.warn(
2022-12-21 16:25:17,116 - distributed.worker - WARNING - Compute Failed
Key:       ('read-csv-90ef43a055712c364b500b995ea7e3d5', 0)
Function:  execute_task
args:      ((subgraph_callable-12d6d57f-182a-4e3a-b65c-e4ee508dcef9, [(<function read_block_from_file at 0x7ff756010a60>, <OpenFile '/N/scratch/haofeng/TB/raw/day_0'>, 0, 128272716, b'\n'), None, True, False]))
kwargs:    {}
Exception: 'ValueError("Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.\\n\\n+--------+---------+----------+\\n| Column | Found   | Expected |\\n+--------+---------+----------+\\n| int_1  | float64 | int64    |\\n| int_11 | float64 | int64    |\\n| int_6  | float64 | int64    |\\n+--------+---------+----------+\\n\\nUsually this is due to dask\'s dtype inference failing, and\\n*may* be fixed by specifying dtypes manually by adding:\\n\\ndtype={\'int_1\': \'float64\',\\n       \'int_11\': \'float64\',\\n       \'int_6\': \'float64\'}\\n\\nto the call to `read_csv`/`read_table`.\\n\\nAlternatively, provide `assume_missing=True` to interpret\\nall unspecified integer columns as floats.")'

finished splitting the last day, took 4719.522433280945
handling the input paths: ['/N/scratch/haofeng/TB/raw/day_0', '/N/scratch/haofeng/TB/raw/day_1', '/N/scratch/haofeng/TB/raw/day_2', '/N/scratch/haofeng/TB/raw/day_3', '/N/scratch/haofeng/TB/raw/day_4', '/N/scratch/haofeng/TB/raw/day_5', '/N/scratch/haofeng/TB/raw/day_6', '/N/scratch/haofeng/TB/raw/day_7', '/N/scratch/haofeng/TB/raw/day_8', '/N/scratch/haofeng/TB/raw/day_9', '/N/scratch/haofeng/TB/raw/day_10', '/N/scratch/haofeng/TB/raw/day_11', '/N/scratch/haofeng/TB/raw/day_12', '/N/scratch/haofeng/TB/raw/day_13', '/N/scratch/haofeng/TB/raw/day_14', '/N/scratch/haofeng/TB/raw/day_15', '/N/scratch/haofeng/TB/raw/day_16', '/N/scratch/haofeng/TB/raw/day_17', '/N/scratch/haofeng/TB/raw/day_18', '/N/scratch/haofeng/TB/raw/day_19', '/N/scratch/haofeng/TB/raw/day_20', '/N/scratch/haofeng/TB/raw/day_21', '/N/scratch/haofeng/TB/raw/day_22', '/N/scratch/haofeng/TB/raw/day_23.part0', '/N/scratch/haofeng/TB/raw/day_23.part1']
Traceback (most recent call last):
  File "/geode2/home/u030/haofeng/BigRed200/torchrec/torchrec/datasets/scripts/nvt/convert_tsv_to_parquet.py", line 107, in <module>
    convert_tsv_to_parquet(args.input_path, args.output_base_path)
  File "/geode2/home/u030/haofeng/BigRed200/torchrec/torchrec/datasets/scripts/nvt/convert_tsv_to_parquet.py", line 72, in convert_tsv_to_parquet
    tsv_dataset = nvt.Dataset(input_paths, **config)
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/merlin/io/dataset.py", line 346, in __init__
    self.infer_schema()
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/merlin/io/dataset.py", line 1127, in infer_schema
    dtypes = self.sample_dtypes(n=n, annotate_lists=True)
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/merlin/io/dataset.py", line 1147, in sample_dtypes
    _real_meta = self.engine.sample_data(n=n)
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/merlin/io/dataset_engine.py", line 71, in sample_data
    _head = _ddf.partitions[partition_index].head(n)
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/dask/dataframe/core.py", line 1265, in head
    return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/dask/dataframe/core.py", line 1299, in _head
    result = result.compute()
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/dask/base.py", line 315, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/dask/base.py", line 600, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/client.py", line 3122, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/client.py", line 2291, in gather
    return self.sync(
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/utils.py", line 339, in sync
    return sync(
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/utils.py", line 406, in sync
    raise exc.with_traceback(tb)
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/utils.py", line 379, in f
    result = yield future
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/client.py", line 2154, in _gather
    raise exception.with_traceback(traceback)
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/dask/optimization.py", line 990, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/dask/core.py", line 149, in get
    result = _execute_task(task, cache)
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/dask/dataframe/io/csv.py", line 141, in __call__
    df = pandas_read_text(
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/dask/dataframe/io/csv.py", line 196, in pandas_read_text
    coerce_dtypes(df, dtypes)
  File "/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/dask/dataframe/io/csv.py", line 297, in coerce_dtypes
    raise ValueError(msg)
ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

+--------+---------+----------+
| Column | Found   | Expected |
+--------+---------+----------+
| int_1  | float64 | int64    |
| int_11 | float64 | int64    |
| int_6  | float64 | int64    |
+--------+---------+----------+

Usually this is due to dask's dtype inference failing, and
*may* be fixed by specifying dtypes manually by adding:

dtype={'int_1': 'float64',
       'int_11': 'float64',
       'int_6': 'float64'}

to the call to `read_csv`/`read_table`.

Alternatively, provide `assume_missing=True` to interpret
all unspecified integer columns as floats.
*** Error in `/N/u/haofeng/BigRed200/anaconda3/envs/rapids-22.12/bin/python': corrupted size vs. prev_size: 0x00007ff06c000b00 ***

I read the error code, according to my current understanding, I think it is caused by a data type mismatch when reading the file. But I do download the original files from this website original data. I'm hoping to get some help telling me why this is and how I should fix it.

I still have two more questions, the first one is that after I downloaded these files and decompressed them with gzip -dk $filename, I got some files which names are day_0, day_1 ... instead of day_0.csv, day_1.csv. Will this cause some mistakes? I think this is only a difference in the suffix, but I am not sure.
Another question is it seems that I am running NVTabular Dataset in CPU mode, I have already install cupy, rmm and some other libraries, what should I do to run this program in CUDA mode?
Thank you for your help!

allenfengjr commented 1 year ago

For my first question, I think it should be day_0, so there is no problem for that.

jackzhan01 commented 5 days ago

Have you successfully preprocessed the criteo 1TB by this method? I would like to know how much RAM is needed for this process. During my trial, it always says "distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 189.95 GiB -- Worker memory limit: 200.00 GiB"

allenfengjr commented 5 days ago

Have you successfully preprocessed the criteo 1TB by this method? I would like to know how much RAM is needed for this process. During my trial, it always says "distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 189.95 GiB -- Worker memory limit: 200.00 GiB"

No, I gave up using NVTabular.