huggingface / datasets

🤗 The largest hub of ready-to-use datasets for ML models with fast, easy-to-use and efficient data manipulation tools
https://huggingface.co/docs/datasets
Apache License 2.0
18.98k stars 2.62k forks source link

datasets freezes with streaming mode in multiple-gpu #5123

Open jackfeinmann5 opened 1 year ago

jackfeinmann5 commented 1 year ago

Describe the bug

Hi. I am using this dataloader, which is for processing large datasets in streaming mode mentioned in one of examples of huggingface. I am using it to read c4: https://github.com/huggingface/transformers/blob/b48ac1a094e572d6076b46a9e4ed3e0ebe978afc/examples/research_projects/codeparrot/scripts/codeparrot_training.py#L22 During using multi-gpu in accelerator in one node, the code freezes, but works for 1 GPU:

10/16/2022 14:18:46 - INFO - datasets.info - Loading Dataset Infos from /home/jack/.cache/huggingface/modules/datasets_modules/datasets/c4/df532b158939272d032cc63ef19cd5b83e9b4d00c922b833e4cb18b2e9869b01
Steps:   0%|                                                                                                                                                                                                 | 0/400000 [00:00<?, ?it/s]10/16/2022 14:18:47 - INFO - torch.utils.data.dataloader - Shared seed (135290893754684706) sent to store on rank 0

Code to reproduce

please run this code with accelerate launch code.py

from accelerate import Accelerator
from accelerate.logging import get_logger
from datasets import load_dataset
from torch.utils.data.dataloader import DataLoader
import torch
from datasets import load_dataset
from transformers import AutoTokenizer
import torch
from accelerate.logging import get_logger
from torch.utils.data import IterableDataset
from torch.utils.data.datapipes.iter.combinatorics import ShufflerIterDataPipe

logger = get_logger(__name__)

class ConstantLengthDataset(IterableDataset):
    """
    Iterable dataset that returns constant length chunks of tokens from stream of text files.
        Args:
            tokenizer (Tokenizer): The processor used for proccessing the data.
            dataset (dataset.Dataset): Dataset with text files.
            infinite (bool): If True the iterator is reset after dataset reaches end else stops.
            max_seq_length (int): Length of token sequences to return.
            num_of_sequences (int): Number of token sequences to keep in buffer.
            chars_per_token (int): Number of characters per token used to estimate number of tokens in text buffer.
    """

    def __init__(
        self,
        tokenizer,
        dataset,
        infinite=False,
        max_seq_length=1024,
        num_of_sequences=1024,
        chars_per_token=3.6,
    ):
        self.tokenizer = tokenizer
        # self.concat_token_id = tokenizer.bos_token_id
        self.dataset = dataset
        self.max_seq_length = max_seq_length
        self.epoch = 0
        self.infinite = infinite
        self.current_size = 0
        self.max_buffer_size = max_seq_length * chars_per_token * num_of_sequences
        self.content_field = "text"

    def __iter__(self):
        iterator = iter(self.dataset)
        more_examples = True
        while more_examples:
            buffer, buffer_len = [], 0
            while True:
                if buffer_len >= self.max_buffer_size:
                    break
                try:
                    buffer.append(next(iterator)[self.content_field])
                    buffer_len += len(buffer[-1])
                except StopIteration:
                    if self.infinite:
                        iterator = iter(self.dataset)
                        self.epoch += 1
                        logger.info(f"Dataset epoch: {self.epoch}")
                    else:
                        more_examples = False
                        break
            tokenized_inputs = self.tokenizer(buffer, truncation=False)["input_ids"]
            all_token_ids = []
            for tokenized_input in tokenized_inputs:
                all_token_ids.extend(tokenized_input)
            for i in range(0, len(all_token_ids), self.max_seq_length):
                input_ids = all_token_ids[i : i + self.max_seq_length]
                if len(input_ids) == self.max_seq_length:
                    self.current_size += 1
                    yield torch.tensor(input_ids)

    def shuffle(self, buffer_size=1000):
        return ShufflerIterDataPipe(self, buffer_size=buffer_size)

def create_dataloaders(tokenizer, accelerator):
    ds_kwargs = {"streaming": True}
    # In distributed training, the load_dataset function gaurantees that only one process
    # can concurrently download the dataset.
    datasets = load_dataset(
        "c4",
        "en",
        cache_dir="cache_dir",
        **ds_kwargs,
    )
    train_data, valid_data = datasets["train"], datasets["validation"]
    with accelerator.main_process_first():
        train_data = train_data.shuffle(buffer_size=10000, seed=None)
        train_dataset = ConstantLengthDataset(
            tokenizer,
            train_data,
            infinite=True,
            max_seq_length=256,
        )
        valid_dataset = ConstantLengthDataset(
            tokenizer,
            valid_data,
            infinite=False,
            max_seq_length=256,
        )
        train_dataset = train_dataset.shuffle(buffer_size=10000)
    train_dataloader = DataLoader(train_dataset, batch_size=160, shuffle=True)
    eval_dataloader = DataLoader(valid_dataset, batch_size=160)
    return train_dataloader, eval_dataloader

def main():
    # Accelerator.
    logging_dir = "data_save_dir/log"
    accelerator = Accelerator(
        gradient_accumulation_steps=1,
        mixed_precision="bf16",
        log_with="tensorboard",
        logging_dir=logging_dir,
    )
    # We need to initialize the trackers we use, and also store our configuration.
    # The trackers initializes automatically on the main process.
    if accelerator.is_main_process:
        accelerator.init_trackers("test")
    tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

    # Load datasets and create dataloaders.
    train_dataloader, _ = create_dataloaders(tokenizer, accelerator)

    train_dataloader = accelerator.prepare(train_dataloader)
    for step, batch in enumerate(train_dataloader, start=1):
        print(step)
    accelerator.end_training()

if __name__ == "__main__":
    main()

Results expected

Being able to run the code for streamining datasets with multi-gpu

Environment info

@lhoestq I do not have any idea why this freezing happens, and I removed the streaming mode and this was working fine, so I know this is caused by streaming mode of the dataloader part not working well with multi-gpu setting. Since datasets are large, I hope to keep the streamining mode. I very much appreciate your help.

jackfeinmann5 commented 1 year ago

@lhoestq I tested the script without accelerator, and I confirm this is due to datasets part as this gets similar results without accelerator.

lhoestq commented 1 year ago

Hi ! You said it works on 1 GPU but doesn't wortk without accelerator - what's the difference between running on 1 GPU and running without accelerator in your case ?

jackfeinmann5 commented 1 year ago

Hi @lhoestq thanks for coming back to me. Sorry for the confusion I made. I meant this works fine on 1 GPU, but on multi-gpu it is freezing. "accelerator" is not an issue as if you adapt the code without accelerator this still gets the same issue. In order to test it. Please run "accelerate config", then use the setup for multi-gpu in one node. After that run "accelerate launch code.py" and then you would see the freezing occurs.

jackfeinmann5 commented 1 year ago

Hi @lhoestq could you have the chance to reproduce the error by running the minimal example shared? thanks

lhoestq commented 1 year ago

I think you need to do train_dataset = train_dataset.with_format("torch") to work with the DataLoader in a multiprocessing setup :)

The hang is probably caused by our streamign lib fsspec which doesn't work in multiprocessing out of the box - but we made it work with the PyTorch DataLoader when the dataset format is set to "torch"

jackfeinmann5 commented 1 year ago

Hi @lhoestq thanks for the response. I added the line suggested right before calling with accelerator.main_process_first(): in the code above and I confirm this also freezes. to reproduce it please run "accelerate launch code.py". I was wondering if you could have more suggestions for me? I do not have an idea how to fix this or debug this freezing. many thanks.

lhoestq commented 1 year ago

Maybe the fsspec stuff need to be clearer even before - can you try to run this function at the very beginning of your script ?

import fsspec

def _set_fsspec_for_multiprocess() -> None:
    """
    Clear reference to the loop and thread.
    This is necessary otherwise HTTPFileSystem hangs in the ML training loop.
    Only required for fsspec >= 0.9.0
    See https://github.com/fsspec/gcsfs/issues/379
    """
    fsspec.asyn.iothread[0] = None
    fsspec.asyn.loop[0] = None

_set_fsspec_for_multiprocess()
jackfeinmann5 commented 1 year ago

Hi @lhoestq thank you. I tried it, I am getting AttributeError: module 'fsspec' has no attribute 'asyn'. which version of fsspect do you use? I am using
fsspec 2022.8.2 pypi_0 pypi thank you.

jackfeinmann5 commented 1 year ago

Hi @lhoestq I solved fsspec error with this hack for now https://discuss.huggingface.co/t/attributeerror-module-fsspec-has-no-attribute-asyn/19255 but this is still freezing, I greatly appreciate if you could run this script on your side. Many thanks.

import fsspec

def _set_fsspec_for_multiprocess() -> None:
    """
    Clear reference to the loop and thread.
    This is necessary otherwise HTTPFileSystem hangs in the ML training loop.
    Only required for fsspec >= 0.9.0
    See https://github.com/fsspec/gcsfs/issues/379
    """
    fsspec.asyn.iothread[0] = None
    fsspec.asyn.loop[0] = None

_set_fsspec_for_multiprocess()

from accelerate import Accelerator
from accelerate.logging import get_logger
from datasets import load_dataset
from torch.utils.data.dataloader import DataLoader
import torch
from datasets import load_dataset
from transformers import AutoTokenizer
import torch
from accelerate.logging import get_logger
from torch.utils.data import IterableDataset
from torch.utils.data.datapipes.iter.combinatorics import ShufflerIterDataPipe

logger = get_logger(__name__)

class ConstantLengthDataset(IterableDataset):
    """
    Iterable dataset that returns constant length chunks of tokens from stream of text files.
        Args:
            tokenizer (Tokenizer): The processor used for proccessing the data.
            dataset (dataset.Dataset): Dataset with text files.
            infinite (bool): If True the iterator is reset after dataset reaches end else stops.
            max_seq_length (int): Length of token sequences to return.
            num_of_sequences (int): Number of token sequences to keep in buffer.
            chars_per_token (int): Number of characters per token used to estimate number of tokens in text buffer.
    """

    def __init__(
        self,
        tokenizer,
        dataset,
        infinite=False,
        max_seq_length=1024,
        num_of_sequences=1024,
        chars_per_token=3.6,
    ):
        self.tokenizer = tokenizer
        # self.concat_token_id = tokenizer.bos_token_id
        self.dataset = dataset
        self.max_seq_length = max_seq_length
        self.epoch = 0
        self.infinite = infinite
        self.current_size = 0
        self.max_buffer_size = max_seq_length * chars_per_token * num_of_sequences
        self.content_field = "text"

    def __iter__(self):
        iterator = iter(self.dataset)
        more_examples = True
        while more_examples:
            buffer, buffer_len = [], 0
            while True:
                if buffer_len >= self.max_buffer_size:
                    break
                try:
                    buffer.append(next(iterator)[self.content_field])
                    buffer_len += len(buffer[-1])
                except StopIteration:
                    if self.infinite:
                        iterator = iter(self.dataset)
                        self.epoch += 1
                        logger.info(f"Dataset epoch: {self.epoch}")
                    else:
                        more_examples = False
                        break
            tokenized_inputs = self.tokenizer(buffer, truncation=False)["input_ids"]
            all_token_ids = []
            for tokenized_input in tokenized_inputs:
                all_token_ids.extend(tokenized_input)
            for i in range(0, len(all_token_ids), self.max_seq_length):
                input_ids = all_token_ids[i : i + self.max_seq_length]
                if len(input_ids) == self.max_seq_length:
                    self.current_size += 1
                    yield torch.tensor(input_ids)

    def shuffle(self, buffer_size=1000):
        return ShufflerIterDataPipe(self, buffer_size=buffer_size)

def create_dataloaders(tokenizer, accelerator):
    ds_kwargs = {"streaming": True}
    # In distributed training, the load_dataset function gaurantees that only one process
    # can concurrently download the dataset.
    datasets = load_dataset(
        "c4",
        "en",
        cache_dir="cache_dir",
        **ds_kwargs,
    )
    train_data, valid_data = datasets["train"], datasets["validation"]
    with accelerator.main_process_first():
        train_data = train_data.shuffle(buffer_size=10000, seed=None)
        train_dataset = ConstantLengthDataset(
            tokenizer,
            train_data,
            infinite=True,
            max_seq_length=256,
        )
        valid_dataset = ConstantLengthDataset(
            tokenizer,
            valid_data,
            infinite=False,
            max_seq_length=256,
        )
        train_dataset = train_dataset.shuffle(buffer_size=10000)
    train_dataloader = DataLoader(train_dataset, batch_size=160, shuffle=True)
    eval_dataloader = DataLoader(valid_dataset, batch_size=160)
    return train_dataloader, eval_dataloader

def main():
    # Accelerator.
    logging_dir = "data_save_dir/log"
    accelerator = Accelerator(
        gradient_accumulation_steps=1,
        mixed_precision="bf16",
        log_with="tensorboard",
        logging_dir=logging_dir,
    )
    # We need to initialize the trackers we use, and also store our configuration.
    # The trackers initializes automatically on the main process.
    if accelerator.is_main_process:
        accelerator.init_trackers("test")
    tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

    # Load datasets and create dataloaders.
    train_dataloader, _ = create_dataloaders(tokenizer, accelerator)

    train_dataloader = accelerator.prepare(train_dataloader)
    for step, batch in enumerate(train_dataloader, start=1):
        print(step)
    accelerator.end_training()

if __name__ == "__main__":
    main()
loubnabnl commented 1 year ago

Are you using Pytorch 1.11? Otherwise the script freezes because of the shuffling in this line:

        return ShufflerIterDataPipe(self, buffer_size=buffer_size)

ShufflerIterDataPipe behavior must have changed for newer Pytorch versions. But this doesn't change whether you're using streaming or not in datasets, so probably not the same issue, but something to try.

physercoe commented 1 year ago

Are you using Pytorch 1.11? Otherwise the script freezes because of the shuffling in this line:

        return ShufflerIterDataPipe(self, buffer_size=buffer_size)

ShufflerIterDataPipe behavior must have changed for newer Pytorch versions. But this doesn't change whether you're using streaming or not in datasets, so probably not the same issue, but something to try.

I met the same issue for pytorch 1.12 and 1.13, is there a way to work around for this function for newer pytorch versions?