rapidsai / crossfit

Metric calculation library
Apache License 2.0
2 stars 6 forks source link

Add more flexible CPU/GPU support #70

Closed sarahyurick closed 2 months ago

sarahyurick commented 3 months ago

See https://github.com/NVIDIA/NeMo-Curator/issues/194 for context.

Currently, when trying out this notebook with a CPU Dask DataFrame, it fails with a TypeError: batch_text_or_text_pairs has to be a list or a tuple (got <class 'pandas.core.series.Series'>).

I have traced this back to several spots in CrossFit where we depend on GPU libraries. Here are the quick-fix changes I made while chasing the errors:

  1. Add import pandas as pd and import numpy as np to https://github.com/rapidsai/crossfit/blob/main/crossfit/op/tokenize.py#L24
  2. Add import pandas as pd to https://github.com/rapidsai/crossfit/blob/main/crossfit/backend/cudf/series.py#L18
  3. Add after https://github.com/rapidsai/crossfit/blob/main/crossfit/op/tokenize.py#L64-L65C13
    elif isinstance(sentences, pd.Series):
    sentences = sentences.tolist()
  4. Change https://github.com/rapidsai/crossfit/blob/main/crossfit/op/tokenize.py#L267 to
    try:
    token_o = {k: cp.asarray(v) for k, v in token_o.items()}
    except cp.cuda.runtime.CUDARuntimeError:
    token_o = {k: np.asarray(v) for k, v in token_o.items()}
  5. Change https://github.com/rapidsai/crossfit/blob/main/crossfit/backend/cudf/series.py#L31-L44 to

    try:
        data = as_column(ar.flatten())
        offset_col = as_column(cp.arange(start=0, stop=len(data) + 1, step=n_cols), dtype="int32")
        mask_col = cp.full(shape=n_rows, fill_value=cp.bool_(True))
        mask = cudf._lib.transform.bools_to_mask(as_column(mask_col))
        lc = cudf.core.column.ListColumn(
            size=n_rows,
            dtype=cudf.ListDtype(data.dtype),
            mask=mask,
            offset=0,
            null_count=0,
            children=(offset_col, data),
        )
        return cudf.Series(lc, index=index)
    
    except RuntimeError:
        reshaped_data = ar.reshape(n_rows, n_cols).tolist()
        return pd.Series(reshaped_data, index=index)
  6. Change https://github.com/rapidsai/crossfit/blob/main/crossfit/op/tokenize.py#L136 to output = pd.DataFrame()

But after change 6, the error message is difficult to trace:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
File <timed exec>:2

File ~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/datasets/doc_dataset.py:103, in DocumentDataset.to_json(self, output_file_dir, write_to_filename)
     [94](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/datasets/doc_dataset.py:94) def to_json(
     [95](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/datasets/doc_dataset.py:95)     self,
     [96](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/datasets/doc_dataset.py:96)     output_file_dir,
     [97](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/datasets/doc_dataset.py:97)     write_to_filename=False,
     [98](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/datasets/doc_dataset.py:98) ):
     [99](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/datasets/doc_dataset.py:99)     """
    [100](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/datasets/doc_dataset.py:100)     See nemo_curator.utils.distributed_utils.write_to_disk docstring for other parameters.
    [101](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/datasets/doc_dataset.py:101) 
    [102](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/datasets/doc_dataset.py:102)     """
--> [103](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/datasets/doc_dataset.py:103)     write_to_disk(
    [104](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/datasets/doc_dataset.py:104)         df=self.df,
    [105](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/datasets/doc_dataset.py:105)         output_file_dir=output_file_dir,
    [106](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/datasets/doc_dataset.py:106)         write_to_filename=write_to_filename,
    [107](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/datasets/doc_dataset.py:107)         output_type="jsonl",
    [108](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/datasets/doc_dataset.py:108)     )

File ~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/utils/distributed_utils.py:515, in write_to_disk(df, output_file_dir, write_to_filename, output_type)
    [511](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/utils/distributed_utils.py:511)         df.to_json(
    [512](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/utils/distributed_utils.py:512)             output_file_dir, orient="records", lines=True, force_ascii=False
    [513](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/utils/distributed_utils.py:513)         )
    [514](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/utils/distributed_utils.py:514)     else:
--> [515](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/utils/distributed_utils.py:515)         df.to_json(
    [516](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/utils/distributed_utils.py:516)             output_file_dir, orient="records", lines=True, force_ascii=False
    [517](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/utils/distributed_utils.py:517)         )
    [518](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/utils/distributed_utils.py:518) elif output_type == "parquet":
    [519](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/nemo_curator/utils/distributed_utils.py:519)     df.to_parquet(output_file_dir, write_index=False)

File ~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/core.py:2107, in _Frame.to_json(self, filename, *args, **kwargs)
   [2104](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/core.py:2104) """See dd.to_json docstring for more information"""
   [2105](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/core.py:2105) from dask.dataframe.io import to_json
-> [2107](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/core.py:2107) return to_json(self, filename, *args, **kwargs)

File ~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/io/json.py:96, in to_json(df, url_path, orient, lines, storage_options, compute, encoding, errors, compression, compute_kwargs, name_function, **kwargs)
     [94](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/io/json.py:94)     if compute_kwargs is None:
     [95](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/io/json.py:95)         compute_kwargs = dict()
---> [96](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/io/json.py:96)     return list(dask_compute(*parts, **compute_kwargs))
     [97](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/io/json.py:97) else:
     [98](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/io/json.py:98)     return parts

File ~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    [658](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/base.py:658)     postcomputes.append(x.__dask_postcompute__())
    [660](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/base.py:660) with shorten_traceback():
--> [661](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/base.py:661)     results = schedule(dsk, keys, **kwargs)
    [663](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/base.py:663) return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/utils.py:440, in check_matching_columns()
    [438](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/utils.py:438) else:
    [439](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/utils.py:439)     extra_info = "Order of columns does not match"
--> [440](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/utils.py:440) raise ValueError(
    [441](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/utils.py:441)     "The columns in the computed data do not match"
    [442](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/utils.py:442)     " the columns in the provided metadata\n"
    [443](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/utils.py:443)     f"{extra_info}"
    [444](https://vscode-remote+ssh-002dremote-002bipp1-002d3303.vscode-resource.vscode-cdn.net/home/nfs/syurick/NeMo-Curator/tutorials/distributed_data_classification/~/miniconda3/envs/rapids-24.06/lib/python3.10/site-packages/dask/dataframe/utils.py:444) )

ValueError: The columns in the computed data do not match the columns in the provided metadata
Order of columns does not match

One option to fix https://github.com/NVIDIA/NeMo-Curator/issues/194 is to implement a non-CrossFit solution for the CPU case, but that is only a temporary solution. For the long-term, I would like to use CrossFit interchangeably with Dask and Dask-cuDF DataFrames.

cc @VibhuJawa

VibhuJawa commented 3 months ago

For the long-term, I would like to use CrossFit interchangeably with Dask and Dask-cuDF DataFrames.

@sarahyurick , Some clarifications:

  1. CrossFit is meant to be a GPU library and only work in GPU environments. I don't think switching internal data structures is an option because that will also mean we are taking perf hits .

What, I think we should do is something simlar to what we do in cuML

Everything else should remain the same.

This is where we can change the code: https://github.com/rapidsai/crossfit/blob/384f3cf3e12df04678711a4c52cd385bf1ea36e0/crossfit/op/base.py#L49-L54

VibhuJawa commented 3 months ago

@sarahyurick , Do you want to take a stab at fixing this ?

sarahyurick commented 3 months ago

@sarahyurick , Do you want to take a stab at fixing this ?

Yes thanks, I can work on it.