NVIDIA / NeMo-Curator

Scalable toolkit for data curation
Apache License 2.0
326 stars 32 forks source link

shuffle datasets #115

Open simplew2011 opened 1 week ago

simplew2011 commented 1 week ago
    def _exact_dup_ids(self, df: dd.DataFrame):
        """
        Get the id's for text/documents that are exact duplicates
        Parameters
        ----------
        df: dask.dataframe.core.DataFrame
          A dataframe with the following requirements:
          * A column where each row is the text from one document
          * A unique ID column for each document
        """
        hash_df = self._compute_hashes(df)
        shuffle_context = (
            config.set({"dataframe.shuffle.method": "tasks"})
            if DASK_P2P_ERROR
            else nullcontext()
        )
        with shuffle_context:
            dup_ids = hash_df.shuffle(
                on=["_hashes"],
                ignore_index=True,
                npartitions=max(1, (hash_df.npartitions // 3)),
            ).map_partitions(lambda x: x[x["_hashes"].duplicated(keep=False)])
        return dup_ids
    def _exact_dup_ids(self, df: dd.DataFrame):
        """
        Get the id's for text/documents that are exact duplicates
        Parameters
        ----------
        df: dask.dataframe.core.DataFrame
          A dataframe with the following requirements:
          * A column where each row is the text from one document
          * A unique ID column for each document
        """
        hash_df = self._compute_hashes(df)
        dup_ids = hash_df.map_partitions(lambda x: x[x["_hashes"].duplicated(keep=False)])

        return dup_ids
ayushdg commented 1 week ago

Algorithmically, the shuffle is needed to ensure all duplicates are found. I'll try to illustrate with an example:

# Assume input data is divided into 2 partitions

Partition 0        Partition 1
----------        ----------
ID    hash        ID     hash
0      xyz          3      xyz
1.     abc          4      def
2.     xyz          5      abc

Skipping the shuffle and using map_partitions will only identify 0,1 as a duplicate group. It will fail to identify 3 in the same group & it will also miss out on 1,5 since they were in different partitions. Shuffling by hash ensures that documents with the same hash are in the same partition.

# After shuffle
Partition 0        Partition 1
----------        ----------
ID    hash        ID     hash
0      xyz          1      abc
3.     xyz          4      def
2.     xyz          5      abc 

Now calling the lambda on each partition allows finding the duplicates correctly. If your data fits in a single partition then shuffling is not needed, but for anything more than 1 partition, shuffling is needed to ensure correctness.

@simplew2011 Are you running into issues with shuffle (either errors or performance)?

simplew2011 commented 1 week ago
ayushdg commented 1 week ago

max(1,hash_df.npartitions) is theoretically okay. hash_df.npartitions//3 is just an optimization for improved performance. In our experience, the input document dataset is much larger in size than the hash_df which only contains ID's & hashes. If the input df fits in 100 partitions, it is possible for the smaller hash_df to potentially fit within 30-35 partitions. Calling shuffle with fewer output partitions often leads to better performance. So we picked based on some testing that num_partitions//3 as a good tradeoff between performance and avoiding any OOM errors if we go too low.