uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.76k stars 281 forks source link

Seeing worse model performance from using petastorm vs normal pytorch dataloader #793

Open AKhazane opened 1 year ago

AKhazane commented 1 year ago

Hello! I'm using an open-source template of skip-gram to train ID embeddings from parquet data (stored in parquet format, with a few columns all int64 type) with petastorm but I'm getting far lower quality / worse embedding representations from using this library in comparison to just loading data directly from a PyTorch's non-distributed dataloader (with everything held constant, such as batch size, learning rate, data, and so on).

I'm wondering if I'm simply loading data w/ the library in an incorrect way. Here's a snippet of my code that uses petastorm.

...
        for epoch in range(starting_epoch, total_epochs):

            print(f'Beginning epoch: {epoch + 1}/{total_epochs}')
            running_loss = 0.0 
            num_examples = 0
            with DataLoader(make_batch_reader(self.data_path), batch_size=self.args.batch_size) as train_loader:
                for batch_idx, row in enumerate(train_loader):
                # Unpack data
                    center, context, neg1, neg2, neg3 = row['id1_mapped'], row['id2_mapped'], row['id3_mapped'], row['id4_mapped'], row['id5_mapped']
                    center = torch.unsqueeze(center, 1)
                    context = torch.unsqueeze(context, 1)
                    neg1 = torch.unsqueeze(neg1, 1)
                    neg2 = torch.unsqueeze(neg2, 1)
                    neg3 = torch.unsqueeze(neg3, 1)

                    center, context = center.to(self.args.device), context.to(self.args.device)

                    # Remove accumulated gradients
                    self.optim.zero_grad()
                    # Get context vectors
                    center_embed, context_embed = self.model(center, context)
                    # print(center_embed.shape, context_embed.shape)
                    # Calc loss: SGNS
                    loss = self.sgns(center_embed, context_embed, neg1, neg2, neg3)
                    # break
                    # Backprop and update
                    loss.backward()
                    self.optim.step()

                    running_loss += loss.item()
                    global_step += 1
                    num_examples += len(center)  # Last batch's size may not equal args.batch_size

                norm = (batch_idx + 1) * num_examples

                self.evaluate_embeddings(epoch)
                self.log_and_save_epoch(epoch, running_loss / norm)

                self.log_step(epoch, global_step, running_loss / norm)#, testing_loss / norm)
...

My only hunch at this point is that I'm not properly shuffling the data between epochs, which will cause the data within each row group to have the same sequential order unlike PyTorch dataloader which shuffles entire dataset (from a csv file) before each epoch. Is there a straightforward way to add epoch shuffling in petastorm? (since all the data is stored in multiple parquet files).

Data-drone commented 1 year ago

I could be sending you down the wrong tack but if it is to do with shuffling, Then I believe Petastorm only shuffles within a parquet file when it loads the data not across files. Might be worth checking with huggingface datasets does as well since it relies on parquet as the storage format