NVIDIA-Merlin / NVTabular

NVTabular is a feature engineering and preprocessing library for tabular data designed to quickly and easily manipulate terabyte scale datasets used to train deep learning based recommender systems.
Apache License 2.0
1.05k stars 143 forks source link

[BUG] Exception: "IndexError('list index out of range')" #1860

Closed vs385 closed 1 year ago

vs385 commented 1 year ago

Describe the bug I started getting this error when I upgraded to nvtabular 23.08: 2023-08-30 14:53:26,012 - distributed.worker - WARNING - Compute Failed Key: ('transform-6435a10df7c6acb222987cc5dda4ed1d', 0) Function: subgraph_callable-98f2413b-b494-431b-a9e6-3f229f0d args: ({'piece': ('PWD/basedir/data/train_pre_full.parquet/part.0.parquet', [0, 1, 2], [])}) kwargs: {} Exception: "IndexError('list index out of range')"

IndexError                                Traceback (most recent call last)
Cell In[27], line 1
----> 1 proc.fit(full_dataset)

File /usr/local/lib/python3.10/dist-packages/nvtabular/workflow/workflow.py:213, in Workflow.fit(self, dataset)
    199 def fit(self, dataset: Dataset) -> "Workflow":
    200     """Calculates statistics for this workflow on the input dataset
    201 
    202     Parameters
   (...)
    211         This Workflow with statistics calculated on it
    212     """
--> 213     self.executor.fit(dataset, self.graph)
    214     return self

File /usr/local/lib/python3.10/dist-packages/merlin/dag/executors.py:489, in DaskExecutor.fit(self, dataset, graph, refit)
    473         dependencies.difference_update(current_phase)
    475 # This captures the output dtypes of operators like LambdaOp where
    476 # the dtype can't be determined without running the transform
    477 # self._transform_impl(dataset, capture_dtypes=True).sample_dtypes()
    478 #
    479 Dataset(
    480     self.transform(
    481         dataset.to_ddf(),
    482         graph.output_node,
    483         graph.output_dtypes,
    484         capture_dtypes=True,
    485     ),
    486     cpu=dataset.cpu,
    487     base_dataset=dataset.base_dataset,
    488     schema=graph.output_schema,
--> 489 ).sample_dtypes()
    490 graph.construct_schema(dataset.schema, preserve_dtypes=True)
    492 return graph

File /usr/local/lib/python3.10/dist-packages/merlin/io/dataset.py:1262, in Dataset.sample_dtypes(self, n, annotate_lists)
   1255 """Return the real dtypes of the Dataset
   1256 
   1257 Use cached metadata if this operation was
   1258 already performed. Otherwise, call down to the
   1259 underlying engine for sampling logic.
   1260 """
   1261 if self._real_meta.get(n, None) is None:
-> 1262     _real_meta = self.engine.sample_data(n=n)
   1263     if self.dtypes:
   1264         _real_meta = _set_dtypes(_real_meta, self.dtypes)

File /usr/local/lib/python3.10/dist-packages/merlin/io/dataset_engine.py:71, in DatasetEngine.sample_data(self, n)
     69 _ddf = self.to_ddf()
     70 for partition_index in range(_ddf.npartitions):
---> 71     _head = _ddf.partitions[partition_index].head(n)
     72     if len(_head):
     73         return _head

File /usr/local/lib/python3.10/dist-packages/dask/dataframe/core.py:1268, in _Frame.head(self, n, npartitions, compute)
   1266 # No need to warn if we're already looking at all partitions
   1267 safe = npartitions != self.npartitions
-> 1268 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)

File /usr/local/lib/python3.10/dist-packages/dask/dataframe/core.py:1302, in _Frame._head(self, n, npartitions, compute, safe)
   1297 result = new_dd_object(
   1298     graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
   1299 )
   1301 if compute:
-> 1302     result = result.compute()
   1303 return result

File /usr/local/lib/python3.10/dist-packages/dask/base.py:314, in DaskMethodsMixin.compute(self, **kwargs)
    290 def compute(self, **kwargs):
    291     """Compute this dask collection
    292 
    293     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    312     dask.base.compute
    313     """
--> 314     (result,) = compute(self, traverse=False, **kwargs)
    315     return result

File /usr/local/lib/python3.10/dist-packages/dask/base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    596     keys.append(x.__dask_keys__())
    597     postcomputes.append(x.__dask_postcompute__())
--> 599 results = schedule(dsk, keys, **kwargs)
    600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /usr/local/lib/python3.10/dist-packages/distributed/client.py:3137, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3135         should_rejoin = False
   3136 try:
-> 3137     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3138 finally:
   3139     for f in futures.values():

File /usr/local/lib/python3.10/dist-packages/distributed/client.py:2306, in Client.gather(self, futures, errors, direct, asynchronous)
   2304 else:
   2305     local_worker = None
-> 2306 return self.sync(
   2307     self._gather,
   2308     futures,
   2309     errors=errors,
   2310     direct=direct,
   2311     local_worker=local_worker,
   2312     asynchronous=asynchronous,
   2313 )

File /usr/local/lib/python3.10/dist-packages/distributed/utils.py:338, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    336     return future
    337 else:
--> 338     return sync(
    339         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    340     )

File /usr/local/lib/python3.10/dist-packages/distributed/utils.py:405, in sync(loop, func, callback_timeout, *args, **kwargs)
    403 if error:
    404     typ, exc, tb = error
--> 405     raise exc.with_traceback(tb)
    406 else:
    407     return result

File /usr/local/lib/python3.10/dist-packages/distributed/utils.py:378, in sync.<locals>.f()
    376         future = asyncio.wait_for(future, callback_timeout)
    377     future = asyncio.ensure_future(future)
--> 378     result = yield future
    379 except Exception:
    380     error = sys.exc_info()

File /usr/local/lib/python3.10/dist-packages/tornado/gen.py:767, in Runner.run(self)
    765 try:
    766     try:
--> 767         value = future.result()
    768     except Exception as e:
    769         # Save the exception for later. It's important that
    770         # gen.throw() not be called inside this try/except block
    771         # because that makes sys.exc_info behave unexpectedly.
    772         exc: Optional[Exception] = e

File /usr/local/lib/python3.10/dist-packages/distributed/client.py:2169, in Client._gather(self, futures, errors, direct, local_worker)
   2167         exc = CancelledError(key)
   2168     else:
-> 2169         raise exception.with_traceback(traceback)
   2170     raise exc
   2171 if errors == "skip":

File /usr/local/lib/python3.10/dist-packages/dask/optimization.py:990, in __call__()
    988 if not len(args) == len(self.inkeys):
    989     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File /usr/local/lib/python3.10/dist-packages/dask/core.py:149, in get()
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File /usr/local/lib/python3.10/dist-packages/dask/core.py:119, in _execute_task()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/local/lib/python3.10/dist-packages/dask/utils.py:72, in apply()
     41 """Apply a function given its positional and keyword arguments.
     42 
     43 Equivalent to ``func(*args, **kwargs)``
   (...)
     69 >>> dsk = {'task-name': task}  # adds the task to a low level Dask task graph
     70 """
     71 if kwargs:
---> 72     return func(*args, **kwargs)
     73 else:
     74     return func(*args)

File /usr/local/lib/python3.10/dist-packages/merlin/dag/executors.py:102, in transform()
    100 output_data = None
    101 for node in nodes:
--> 102     transformed_data = self._execute_node(node, transformable, capture_dtypes, strict)
    103     output_data = self._combine_node_outputs(node, transformed_data, output_data)
    105 # If there are any additional columns that weren't produced by one of the supplied nodes
    106 # we grab them directly from the supplied input data. Normally this would happen on a
    107 # per-node basis, but offers a safety net for the multi-node case

File /usr/local/lib/python3.10/dist-packages/merlin/dag/executors.py:116, in _execute_node()
    115 def _execute_node(self, node, transformable, capture_dtypes=False, strict=False):
--> 116     upstream_outputs = self._run_upstream_transforms(
    117         node, transformable, capture_dtypes, strict
    118     )
    119     upstream_columns = self._append_addl_root_columns(node, transformable, upstream_outputs)
    120     formatted_columns = self._standardize_formats(node, upstream_columns)

File /usr/local/lib/python3.10/dist-packages/merlin/dag/executors.py:130, in _run_upstream_transforms()
    127 upstream_outputs = []
    129 for upstream_node in node.parents_with_dependencies:
--> 130     node_output = self._execute_node(
    131         upstream_node,
    132         transformable,
    133         capture_dtypes=capture_dtypes,
    134         strict=strict,
    135     )
    136     if node_output is not None and len(node_output) > 0:
    137         upstream_outputs.append(node_output)

File /usr/local/lib/python3.10/dist-packages/merlin/dag/executors.py:120, in _execute_node()
    116 upstream_outputs = self._run_upstream_transforms(
    117     node, transformable, capture_dtypes, strict
    118 )
    119 upstream_columns = self._append_addl_root_columns(node, transformable, upstream_outputs)
--> 120 formatted_columns = self._standardize_formats(node, upstream_columns)
    121 transform_input = self._merge_upstream_columns(formatted_columns)
    122 transform_output = self._run_node_transform(node, transform_input, capture_dtypes, strict)

File /usr/local/lib/python3.10/dist-packages/merlin/dag/executors.py:168, in _standardize_formats()
    165     supported_formats = op.supported_formats
    167 # Convert the first thing into a supported format
--> 168 tensors = _convert_format(node_input_data[0], supported_formats)
    169 target_format = _data_format(tensors)
    171 # Convert the whole list into the same format

IndexError: list index out of range

Steps/Code to reproduce bug Trying to fit an nvtabular workflow on a full dataset parquet file with 20+ partitions using LocalCUDACluster and writing the parquet file using dask_cudf before loading its as an not dataset for fitting the workflow proc.fit(full_dataset)

Expected behavior It should successfully fit as it used to when I was running with nvtabular 23.06- when I run the same above using multi-gpu instance, the error does not get thrown

Environment details (please complete the following information):

Merlin version: nvcr.io/nvidia/merlin/merlin-tensorflow:23.08 Platform: ec2 g5 instance linux (8xlarge) large single GPU instance Python version: 3.10.12 PyTorch version (GPU?): Tensorflow version (GPU?): 2.12.0+nv23.6

docker run --runtime=nvidia --rm -it -p 8888:8888 -p 8797:8787 -p 8796:8786 --ipc=host --cap-add SYS_NICE nvcr.io/nvidia/merlin/merlin-tensorflow:latest /bin/bash

Additional context Add any other context about the problem here.

rnyak commented 1 year ago

@vs385 where is this error coming from? what's your NVT pipeline? IndexError: list index out of range happens generally pipeline cannot access to the data files, or your data path does not have any file.

can you please use merlin-tensorflow:23.06 image and test again?

vs385 commented 1 year ago

@rnyak this error is coming from fitting the workflow on my nvttabular dataset (which I load from a parquet file that was saved using dask_cudf - I follow the example notebook from here and adapted the column names and others to fit my own dataset.

IndexError: list index out of range happens generally pipeline cannot access to the data files, or your data path does not have any file. Yes I know, and I did also check if the files were in the path and they were all there (there were around 24 parquet files partitions in the directory), which is why I couldn't understand where this error was coming from.

can you please use merlin-tensorflow:23.06 image and test again? I tried with 23.06 and it works without any error- it's only with 23.08 that I'm getting this error, so I had to downgrade back after that.

rnyak commented 1 year ago

closing this ticket since the user can make it work with 23.06 image.