uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

TransformSpec using Pandas causes incompatibilities with other libraries for make_batch_reader #603

Open KamWithK opened 4 years ago

KamWithK commented 4 years ago

Hey guys, I'm trying to create a compatibility interface between Petastorm and a few PyTorch based libraries (PyTorch Lightning, Hugging Face Transformers and AllenNLP) which I'm trying to use in a project. So far I've managed to get PyTorch Lightning working (pretty much research oriented Keras), but a few design choices within Petastorm seem to prevent usage with the NLP libraries.

My problem is that TransformSpec requires input and output as Pandas DataFrame's. This at first may seem decent, but commonplace NLP libraries like Hugging Face Transformers tokenize lists of strings (this transformation is easy) and directly output tensors. These tensors aren't flat, so they can't be converted to Pandas, meaning that processing textual data (despite being fairly straight forward) seems nearly impossible with Petastorm's built in data loaders.

I've been working on this for a week and these are the methods to mitigate the problem:

  1. Modifying Petastorm's existing TransformSpec/PyTorch DataLoader/PyArrow classes
  2. Creating iterable PyTorch data loaders which just loop through the Reader object

I've been trying to interpret how I'd do the first option (through debug the code), however it looks extremely complicated and (I think) it would require modifying a number of classes (what modifications are needed still elude me). On the other hand, looping through a Reader seems reasonable as we can still read in strings. But, would doing this forfeit any optimisations/performance boosting code from Petastorm's loader (although shouldn't these be in Reader)?

So, would anyone be able to provide some advice on what you believe to be the best approach/course of action (or just what might have to be coded/modified)? Thanks so much for in advance!

selitvin commented 4 years ago

Hi @KamWithK : thank you for sharing the usecase. Pretty interesting.

Looping through reader would make your code run on the main pytorch process. If processing is non trivial then we would loose benefits of thread/process pools provided by petastorm.

Can you please give an example of such 'not flat' tensor? Are you getting back ragged/jagged tensors? Or list of torch tensors? Or something else? A small code snippet demonstrating these types could help.

One workaround that could be used to represent a list of variable size arrays could be to use two tensors: one with data and another one with index into this data. For example, to represent:

ragged_data = [["a", "bc"], ["d"]]

one could use:

data = ["a", "ab", "d"]
data_index = [
  [0, 2],
  [2, 3]
]

Recovering the data from these two tensors would be:

ragged_data[i] == data[data_index[i, 0]:data_index[i, 1]]
KamWithK commented 4 years ago

Hi @KamWithK : thank you for sharing the usecase. Pretty interesting.

Looping through reader would make your code run on the main pytorch process. If processing is non trivial then we would loose benefits of thread/process pools provided by petastorm.

Can you please give an example of such 'not flat' tensor? Are you getting back ragged/jagged tensors? Or list of torch tensors? Or something else? A small code snippet demonstrating these types could help.

One workaround that could be used to represent a list of variable size arrays could be to use two tensors: one with data and another one with index into this data. For example, to represent:

ragged_data = [["a", "bc"], ["d"]]

one could use:

data = ["a", "ab", "d"]
data_index = [
  [0, 2],
  [2, 3]
]

Recovering the data from these two tensors would be:

ragged_data[i] == data[data_index[i, 0]:data_index[i, 1]]

Hey, thanks for the feedback. Here's a small code snippet which illustrates what the output of simple tokenization (using Hugging Face Transformers) looks like:

>>> from transformers import AutoTokenizer
>>> tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
>>> tokenizer(["this is a cat", "pigs go oink", "shoot", "yip yip yippy aaeyya"], padding=True, truncation=True, return_tensors="pt")
{'input_ids': tensor([[  101,  1142,  1110,   170,  5855,   102,     0,     0,     0,     0,
             0,     0],
        [  101, 18348,  1301,   184, 10223,   102,     0,     0,     0,     0,
             0,     0],
        [  101,  5211,   102,     0,     0,     0,     0,     0,     0,     0,
             0,     0],
        [  101,   194,  9717,   194,  9717,   194,  9717,  5005,   170,  5024,
         22549,   102]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0],
        [1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0],
        [1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])}
KamWithK commented 4 years ago

Okay, so I finally managed to get it to work half-decently with Transformers (although not AllenNLP yet I may need to rewrite the Petastorm's data loader for it). In the end, I added in the following code just after row_as_dict = row._asdict() in the _iter_impl function (within Petastorm's PyTorch DataLoader):

row_as_dict.update(safe_string_tokenizer(row_as_dict[self.string_column]))
del row_as_dict[self.string_column]

This isn't perfect, but it does work decently and _sanitize_pytorch_types doesn't need to be changed. I'll fork and modify this in the Petastorm source code (I'll just add in a new argument called "preparse" which would be a function) as it may be useful for others later on (otherwise I'll just create my own library with a bunch of these data loaders).

selitvin commented 4 years ago

There is a longer term solution that might be better (but requires a significant effort) which is to start using pytorch.util.data.DataLoader for parallelization instead of petastorm's custom Thread/ProcessPool. That way we would fit more natively into pytorch eco-system and these kind of operations would be more natural.

As for your solution, is it different from the one you originally considered:

Creating iterable PyTorch data loaders which just loop through the Reader object as all the processing and tokinization is happening on the main process? If yes, then I think the original proposal was preferable as it does not modify petastorm library but part of user code.

dmcguire81 commented 4 years ago

@KamWithK, if I'm understanding correctly, you shouldn't need to tokenize "under-the-hood" in a way that requires modifications to petastorm. A pandas.DataFrame allows tensor-valued fields in the form of numpy.ndarray instances, so you just need the above output to be in terms of those, instead of whatever the default is (PyTorch tensors?). It looks like transformers.BatchEncoding takes a tensor_type argument, and PreTrainedTokenizer.call takes a return_tensors to pass along to that implementation. Can you try adding return_tensors='np' to your tokenizer invocation and try to build a pandas.DataFrame this way?

import numpy as np
import pandas as pd

rows = [{
    "input_ids": np.array(
        [
            [101, 1142, 1110, 170, 5855, 102, 0, 0, 0, 0, 0, 0],
            [101, 18348, 1301, 184, 10223, 102, 0, 0, 0, 0, 0, 0],
            [101, 5211, 102, 0, 0, 0, 0, 0, 0, 0, 0, 0],
            [101, 194, 9717, 194, 9717, 194, 9717, 5005, 170, 5024, 22549, 102],
        ]
    ),
    "token_type_ids": np.array(
        [
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        ]
    ),
    "attention_mask": np.array(
        [
            [1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0],
            [1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0],
            [1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
            [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
        ]
    ),
}]

tokens_df = pd.DataFrame.from_records(rows)

It looks like the input would be pandas.DataFrame with a single list-of-strings-valued field, if that's the granularity of sample that you wanted to do training on. Let me know if I'm not understanding your problem.

selitvin commented 4 years ago

Also, @KamWithK can you please clarify: are using make_batch_reader with BatchDataLoader? If so, what would you expect the datatype to be in the return batch for input_ids, token_type_ids and attention_mask fields?

KamWithK commented 4 years ago

There is a longer term solution that might be better (but requires a significant effort) which is to start using pytorch.util.data.DataLoader for parallelization instead of petastorm's custom Thread/ProcessPool. That way we would fit more natively into pytorch eco-system and these kind of operations would be more natural.

As for your solution, is it different from the one you originally considered:

Creating iterable PyTorch data loaders which just loop through the Reader object as all the processing and tokinization is happening on the main process? If yes, then I think the original proposal was preferable as it does not modify petastorm library but part of user code.

Hmm, I'll give this a try now. Shouldn't be too tough to use. Normally you'd just pass in a PyTorch Dataset/IterableDataset with a few options. However, here we're using Petastorm's own Reader object, so not sure whether it'll allow that (I'll check now though). The other problem is that I can't seem to tokenize within the Reader object because of the need to use Pandas DataFrames.

KamWithK commented 4 years ago

Also, @KamWithK can you please clarify: are using make_batch_reader with BatchDataLoader? If so, what would you expect the datatype to be in the return batch for input_ids, token_type_ids and attention_mask fields?

I'm using Petastorm's DataLoader not BatchDataLoader. Shouldn't make too much of a difference though. Either way I expect the data loader to yield Torch Tensor's. This ideally would be because of preprocessing done through the Loader, but doing it in the DataLoader like I currently do technically does work.

KamWithK commented 4 years ago

@KamWithK, if I'm understanding correctly, you shouldn't need to tokenize "under-the-hood" in a way that requires modifications to petastorm. A pandas.DataFrame allows tensor-valued fields in the form of numpy.ndarray instances, so you just need the above output to be in terms of those, instead of whatever the default is (PyTorch tensors?). It looks like transformers.BatchEncoding takes a tensor_type argument, and PreTrainedTokenizer.call takes a return_tensors to pass along to that implementation. Can you try adding return_tensors='np' to your tokenizer invocation and try to build a pandas.DataFrame this way?

I've run your code @dmcguire81 but Pandas interprets those as objects, which the built-in data loader's will raise an error for. Or am I wrong about that?

How would you'd go about specifying the edit_fields parameter for the TransformSpec anyways? What dimensions and data type do you specify? I've created a TransformSpec but get the following error: pyarrow.lib.ArrowInvalid: ('Can only convert 1-dimensional array values', 'Conversion failed for column input_ids with type object')

Here is my code:

# Tokenised columns replace text based ones
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased", fast=True)
tokenize = lambda rows: pd.concat((rows[[target_column]], pd.DataFrame([tokenizer(rows[text_column].tolist(), padding=True, truncation=True, return_tensors="np").data])))

edit_fields = [("token_type_ids", np.array, (), False), ("attention_mask", np.array, (), False), ("input_ids", np.array, (), False)]
transform_spec = TransformSpec(tokenize, selected_fields=["token_type_ids", "attention_mask", "input_ids", target_column], edit_fields=edit_fields)
KamWithK commented 4 years ago

There is a longer term solution that might be better (but requires a significant effort) which is to start using pytorch.util.data.DataLoader for parallelization instead of petastorm's custom Thread/ProcessPool. That way we would fit more natively into pytorch eco-system and these kind of operations would be more natural.

As for your solution, is it different from the one you originally considered:

Creating iterable PyTorch data loaders which just loop through the Reader object as all the processing and tokinization is happening on the main process? If yes, then I think the original proposal was preferable as it does not modify petastorm library but part of user code.

@selitvin one quick way to achieve this would without having to modify or recreate the current Loader would be to simply create a PyTorch iterable dataset which loops through and yields items from Petastorm's Loader. However, according to PyTorch's docs when multiple processes/workers are used each object needs to be manually set up to read different chunks of the data (not sure how to do it here). Another downside is that the Petastorm worker pool/thread pool would still be in use (meaning it'd be quite a surface-level change, so not all too useful).

What really confuses me here though is where in the Reader class the actual file reading happens. There seems to be a large amount of code there for handling cache, worker/thread/process pool (although didn't you say before that these were part of the Petastorm PyTorch DataLoader before?) and row filter/shuffles. Is there a minimum viable example of how you read and iterate through the Parquet files (with multiple workers)?

By the way, I am aware that PyArrow can cause some trouble when using multiple threads or processes (I've read this online). This would be what stops us from just simply iterating through the ParquetDataset which PyArrow provides right. So, what does Petastorm do to get around this (I believe this is what would need to be added into a PyTorch IterableDataset). By the way here are the IterableDataset docs.

KamWithK commented 4 years ago

Also, I'm trying to read through ArrowReaderWorker right now, and it seems like there is quite a bit of back and forth between Pandas, Numpy and PyArrow data (example here and then again here). Yet, can you not just yield data straight away after reading/processing it? What's the use of converting back to PyArrow format (by the way, I may be sorely confused here, as I'm relatively new to Petastorm and PyArrow)?

selitvin commented 4 years ago
  1. Created a #605 showing how this read pattern can be implemented when using petastorm's DataLoader (BatchDataLoader would be more performant, bit it would require to start supporting nested pytorch tensors which seems to be an experimental feature).

  2. Using pytorch IterableDataset seem like potentially a good and scalable approach. You can use cur_shard/shard_count make*reader arguments to get disjoint set of examples (note that you need to have granular enough parquet column-groups, since sharding is done based on row-groups). To disable multithreading/multiprocess, you can use `make*reader'sreader_pool="dummy"which disables petastorm's built-in parallelization (that being said, leaving it asreader_pool="thread"with fewer of workers might still be more performant thendummy` - would require some experimentation).

KamWithK commented 4 years ago

Okay thanks @selitvin, #605 looks like a decent solution for now.

I've been reading through the docs to try and understand how you handle multiprocessing. Do you just use Parquet row groups? If so, this provides a very easy way to handle multiprocessing using PyTorch IterableDataset and PyArrow. I could do parquet_file.read_row_group(row_group_int) where you define the row_group_int in the init based on the number of row groups parquet_file.num_row_groups in the file (metadata) and process number (PyTorch provides this).

KamWithK commented 4 years ago

Okay I've created a PyTorch IterableDataset class which can handle multiple workers and just uses PyArrow. Right now I haven't specified any column/row specifications, but from what I'm seeing it shouldn't be too hard to modify the code to handle those (it shouldn't be hard to modify it to handle them though, especially because the new PyArrow Tabular Dataset can "push" these requirements down). I unfortunately haven't yet been able to comprehend how exactly you split up/divide the work between processes in Petastorm right now, so I've just gone ahead and done it based on batches (however I'm not entirely sure whether these lazy load or not). If it requires them to load beforehand (hopefully not) manually using row groups would be better. Please take a look and let me know what you think:

class IterableParquetDataset(IterableDataset):
    def __init__(self, path, process_func):
        super().__init__()
        dataset = ds.dataset(path)
        self.process_func = process_func

        self.batches = Queue()
        [self.batches.put(batch) for batch in dataset.to_batches()]

    def __iter__(self):
        while True:
            if self.batches.empty() == True:
                self.batches.close()
                break

            batch = self.batches.get().to_pydict()
            batch.update(self.process_func(batch))
            yield batch

This is simple and works perfectly with Hugging Face Transformers (plus minimum conversions necessary). My only worry is how I'm splitting up the work across the threads.

KamWithK commented 4 years ago

Could something like what I did above be useful for Petastorm?