uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.79k stars 285 forks source link

global context not imported in transform_spec function with reader_pool_type="process" #524

Open kaiwenw opened 4 years ago

kaiwenw commented 4 years ago

Hello, I'm blocked on trying to perform some preprocessing (i.e. transform) on the data before returning from the (Pytorch) dataloader. However, my preprocessing function's return type is a dataclass, rather than a dictionary. My solution is to "flatten" the typed dataclass object to a dictionary in transform, then "unflatten" it back to a dataclass object in collate_fn, which is a bit messy (involves this flattening and unflattening, as well as specifying a new schema of the flattened dictionary). BTW: I also know that I can simply preprocess within collate_fn, but that is only on the master thread and cannot be parallelized; I would prefer to have preprocessing run in parallel.

So first, I was wondering if there's a better way to do this with Petastorm?

In the code example below, I drafted a simple example with this method. When I ran it with reader_pool_type="thread", I saw no parallelization on different cores, which I suspect is from the GIL issue with python. So I tried reader_pool_type="process", which gives me the error "NameError: name 'PreprocessedTrainingBatch' is not defined". Essentially, the global context is not visible in the transform function. For reference, I also tried running examples/mnist/tests/pytorch_example.py with reader_pool_type="process" and it failed with a similar error, saying that the package "transforms" isn't defined.

My second question is if setting reader_pool_type="process" is the way to get it running on multiple cores and if there's a fix to this issue with the transform functions?

Thanks a lot! Kaiwen @selitvin transform.txt

selitvin commented 4 years ago

Hi.

Second question: the solution is easy: don't have the classes that are supposed to be used on the worker in the main module, as it breaks serialization. I was able to fix it by moving if __name__ == "__main__": clause into a separate module.

In your data structure, you are using only vectors and appears that you do not need multidimensional arrays. In that case, it could be better for you to use vanilla Parquet stores, without Petatorm metadata:

def make_dataset():
    def row_generator(x):
        return {
            "state": [x] * 64,
            "action": x % 4,
            "reward": 1.,
            "mdp_id": x,
            "action_probability": [x] * 4,
        }

    spark = SparkSession.builder.config("spark.driver.memory", "2g").master("local[2]").getOrCreate()

    sc = spark.sparkContext
    sc.setLogLevel("ERROR")

    rows_rdd = (
        sc.parallelize(range(DATASET_SIZE))
        .map(row_generator)
    )

    df = spark.createDataFrame(rows_rdd)
    df = df.repartition(10)
    df.write.mode("overwrite").parquet(PETASTORM_URI)

In order to read a vanilla Parquet store with Petastorm, you can:

def time_with_transform():
    preprocessor = MockDiscreteDQNBatchPreprocessor()
    transform = TransformSpec(transform_row(preprocessor))
    data_reader = make_batch_reader(PETASTORM_URI, num_epochs=None, reader_pool_type=READER_POOL_TYPE, transform_spec=transform)

    # collate_fn = collate_and_unflatten(PreprocessedTrainingBatch)
    with DataLoader(data_reader, batch_size=batch_size) as dataloader:
        for _ in range(10):
            next(data_reader)
        start_time = time.time()
        for i, data in enumerate(dataloader):
            if i == 0:
                print(f"{i} data {type(data)}")
            if i >50:
                break
        print(f"{i} data {type(data)}")

    end_time = time.time()
    print(f"with transform: {end_time-start_time} secs")

Here is how I modified your code.

kaiwenw commented 4 years ago

Hi @selitvin, thanks for the detailed response!

For my use case, each row will probably be 1,000 to 10,000 floats. Do you think Petastorm would make sense in that case?

My consumer is always PyTorch, but I found when using Petastorm, there wasn't a way around it, when:

Yes, I agree that it's fastest to leave data as pd.Dataframe. However, my existing codebase has a bunch of preprocessors that return a dataclass rather than a dictionary, which is why I wanted to do the flattening conversion. So to use the original preprocessors unchanged, I'd have to do this conversion to dictionary, which causes two challenges: 1) need to specify output schema via edited fields, and removed fields. This is messy right now. 2) need to flatten to a dictionary and then unflatten in collate_fn.

Do you see a better way to make this work with petastorm?

Thanks a lot!

selitvin commented 4 years ago

My consumer is always PyTorch, but I found when using Petastorm, there wasn't a way around it, when: In my example I do create a TransformSpec without defining the output schema. Is it different from your case?

If you have no choice and you'd need to go from a dataframe to list of dataclasses and back to dataframe, then indeed, working with make_reader might end up being faster. It might also be possible to iron-out the code path from after the transform to self.publish_func(all_cols) in process in petastorm/py_dict_reader_worker.py, then you might be able to get your datastructs forwarded to your main process. Hacking this should not be too hard of an endeavor. If you find this useful, we can think of a way to land this kind of a change.

kaiwenw commented 4 years ago

For defining the output schema, your example worked because the fields of the transformed dictionary is the same as before. In my case, if I want to "flatten" the dataclass, I would end up with differently named fields than before (as you can see from as_flatten_dict and from_flatten_dict).

Also, why would make_reader end up faster with this dataclass example instead of make_batch_reader if the row size stays the same?

selitvin commented 4 years ago

Pretty soon #512 we will have an automatic mechanism in place to derive the schema of the data produced by the transform. I hope it will simplify your code (feel free to try infer_schema_from_a_row=True on that development branch - it worked for me).

The reason why the code might run faster with make_reader (I am guessing here, so I might be misleading you), is that make_batch_reader would result in the following chain of transformations in your case: pyarrow_table -> pandas -> user_transform (to datastruct, your custom func, back to padnas frame) -> pyarrow_table -> to main process -> convert to datastruct.

While with some massaging, make_reader would: pyarrow_table -> list of dicts -> user transform (dict to datastruct) -> to main process -> no need to do additional transform

So bottom line, going from pandas frame to lists of objects and back and then to list of objects again, seems expensive to me.