uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

Help translating mnist/pytorch_example to tabular data! #624

Closed afogarty85 closed 3 years ago

afogarty85 commented 3 years ago

Good Afternoon!

I figured out much of my issues, which chiefly seemed to be the required use of VectorAssembler for my tabular data.

However, in following this guide: https://databricks.com/notebooks/simple-aws/petastorm-spark-converter-pytorch.html

I am unable to run section 5, the distributed training as I get the following error:

[1,4]<stderr>:Failed loading Unischema from metadata in %s. Assuming the dataset was not created with Petastorm. Will try to construct from native Parquet schema. [1,4]<stderr>:Recovering rowgroup information for the entire dataset. This can take a long time for datasets with large number of files. If this dataset was generated by Petastorm (i.e. by using "with materialize_dataset(...)") and you still see this message, this indicates that the materialization did not finish successfully. NoDataAvailableError: Number of row-groups in the dataset must be greater or equal to the number of requested shards. Otherwise, some of the shards will end up being empty.

I am not sure why there is this error, because in section 2 of the guide, we use:

make_spark_converter to transform spark dataframes to Petastorm.

A potential issue is associated with this result:

print(hvd.rank(), hvd.size()) [0, 1]

selitvin commented 3 years ago

Are you running the exact same code as in the tutorial (converter_train.make_torch_dataloader)? I'd expect this kind of message to show if you are creating a reader using make_reader instead of make_batch_reader. Do you have the full call stack of the failure?

afogarty85 commented 3 years ago

Are you running the exact same code as in the tutorial (converter_train.make_torch_dataloader)? I'd expect this kind of message to show if you are creating a reader using make_reader instead of make_batch_reader. Do you have the full call stack of the failure?

I've actually gotten petastorm to work now despite the errors coming up still.

However, I am stuck on figuring out how to use transform_spec (which seems to be a DataBricks invention?) to generate a torch nn.Embedding() matrix. I think the issue has to deal with the way batches are loaded in this argument:

with converter_train.make_torch_dataloader(transform_spec=get_transform_spec_train()) as dataloader_train:
  #something in the dataloader here causing issues
  sample = next(iter(dataloader_train))
  print(sample)

as I get an index error out of range, but the shapes look perfect:

out of loop shape torch.Size([28930, 4])
in loop shape torch.Size([28930, 4])
Worker 1 terminated: unexpected exception:
Traceback (most recent call last):
  File "/databricks/python/lib/python3.7/site-packages/petastorm/workers_pool/thread_pool.py", line 62, in run
    self._worker_impl.process(*args, **kargs)
  File "/databricks/python/lib/python3.7/site-packages/petastorm/arrow_reader_worker.py", line 167, in process
    lambda: self._load_rows(parquet_file, piece, shuffle_row_drop_partition))
  File "/databricks/python/lib/python3.7/site-packages/petastorm/cache.py", line 39, in get
    return fill_cache_func()
  File "/databricks/python/lib/python3.7/site-packages/petastorm/arrow_reader_worker.py", line 167, in <lambda>
    lambda: self._load_rows(parquet_file, piece, shuffle_row_drop_partition))
  File "/databricks/python/lib/python3.7/site-packages/petastorm/arrow_reader_worker.py", line 195, in _load_rows
    transformed_result = self._transform_spec.func(result_as_pandas)
  File "<command-2158640876504853>", line 31, in transform_row_train
    embeddings.append(emb(cats[:, i]))
  File "/databricks/python/lib/python3.7/site-packages/torch/nn/modules/module.py", line 722, in _call_impl
    result = self.forward(*input, **kwargs)
  File "/databricks/python/lib/python3.7/site-packages/torch/nn/modules/sparse.py", line 126, in forward
    self.norm_type, self.scale_grad_by_freq, self.sparse)
  File "/databricks/python/lib/python3.7/site-packages/torch/nn/functional.py", line 1814, in embedding
    return torch.embedding(weight, input, padding_idx, scale_grad_by_freq, sparse)
**IndexError: index out of range in self**

code to turn categorical variables into nn.Embedding() look up table

def transform_row_train(pd_batch):
  """
  The input and output of this function must be pandas dataframes.
  Do data augmentation for the training dataset only.
  """
  # normalize features
  pd_batch['features'] = pd_batch['features'].map(lambda x: x - train_means.flatten())
  pd_batch['features'] = pd_batch['features'].map(lambda x: np.true_divide(x, train_stds.flatten() + 0.00001))

  # turn to int64
  pd_batch['cats'] = pd_batch['cats'].map(lambda x: np.array(x, dtype = np.int64))
  # create columns
  cats_df = pd.DataFrame(pd_batch['cats'].tolist(), columns=[f'col_{num}' for num in range(4)])

  # get number of unique cats
  cat_dims = [int(cat_df[col].nunique()) for col in cat_df]
  # generate embedding dims
  emb_dims = [(x, min(1, (x + 2) // 2)) for x in cat_dims]
  # generate embedding lookups
  all_embeddings = nn.ModuleList([nn.Embedding(ni, nf) for ni, nf in emb_dims])
  cats = torch.tensor(cats_df.values).long()
  print('out of loop shape', cats.shape)

  # create embeddings -- all is right except for here
  embeddings = []
  for i, emb in enumerate(all_embeddings):
    print('in loop shape', cats.shape)
    embeddings.append(emb(cats[:, i]))
  embeddings = torch.cat(embeddings, 0)
  print(embeddings.shape)

  return pd_batch

def get_transform_spec_train():
  # Note that the output shape of the `TransformSpec` is not automatically known by petastorm, 
  # so we need to specify the shape for new columns in `edit_fields` and specify the order of 
  # the output columns in `selected_fields`.
  # 17 feature columns
  return TransformSpec(partial(transform_row_train),
                       #edit_fields=[('features', np.float32, (), False)],
                       #selected_fields=['DEP_DEL15', 'features', 'cats', 'cats1', 'cats2', 'cats3', 'cats4', 'cats5'])
                      removed_fields=['TAIL_NUM', 'OP_CARRIER_AIRLINE_ID', 'ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID'])
selitvin commented 3 years ago

Petastorm architecture is biased towards working with Tensorflow and hence feels a bit unnatural with pytorch. transform_spec is one example of this, unfortunately.

I am not sure if I see a petastorm issue here since appears that you can operate correctly on pd.DataFrame that you get from pd_batch.

Apologies if saying trivial thing, but len(all_embeddings) is 4? Is it expected to be the batch size? Where do you specify the batchsize in your code (don't see it in your comment)? Is it possible that the last batch has smaller size if the total number of samples is not divisible by 4?

afogarty85 commented 3 years ago

I much appreciate your time and help! I do not have the batch_size specified there, but I run into weird batching issues with transform_spec regardless.

Even with the command set as follows:

with converter_train.make_torch_dataloader(transform_spec=get_transform_spec_train(), batch_size=4) as dataloader_train:
  #something in the dataloader here causing issues
  sample = next(iter(dataloader_train))
  print(sample)

and I run:

print(pd_batch['features'].shape) (28930,)

I get a huge amount of features which is just a fraction of the total training set.

Yet the output from the sample is right, a batch of 4.

selitvin commented 3 years ago

I think I understand what's going on. The batching operation happens after the call to tranform_spec. Some context:

Does it make sense in your application?

afogarty85 commented 3 years ago

Ah, that makes a lot of sense! I appreciate all your time and help understanding these operations. I see how that would be more apparent if I did not use the make_spark_converter.

afogarty85 commented 3 years ago

I hate to keep using your time, but I have one other (hopefully) quick question that I am wondering whether you can address, which I think it has to do with DataBricks perhaps:

I cannot load data while using make_batch_reader as petastorm keeps telling me that I am not passing a file:

# build some features
features = np.random.uniform(0, 1, size=(1000,4))
rdd1 = sc.parallelize(features)
# turn to df
features_df = rdd1.map(lambda x: (DenseVector(x[0:]),)).toDF()

# write parquet
features_df.repartition(2) \
  .write.mode("overwrite") \
  .option("parquet.block.size", 1024 * 1024) \
  .parquet('dbfs:/ml/train')

# view files
display(dbutils.fs.ls('dbfs:/ml/train'))

#yielding
path    name    size
dbfs:/ml/train/_SUCCESS _SUCCESS    0
dbfs:/ml/train/_committed_6677110360593958902   _committed_6677110360593958902  232
dbfs:/ml/train/_committed_8784851706941089556   _committed_8784851706941089556  450
dbfs:/ml/train/_started_6677110360593958902 _started_6677110360593958902    0
dbfs:/ml/train/_started_8784851706941089556 _started_8784851706941089556    0
dbfs:/ml/train/part-00000-tid-8784851706941089556-33b12105-3cbd-460a-af3c-18181e97c939-4778024-1-c000.snappy.parquet    part-00000-tid-8784851706941089556-33b12105-3cbd-460a-af3c-18181e97c939-4778024-1-c000.snappy.parquet   17501
dbfs:/ml/train/part-00001-tid-8784851706941089556-33b12105-3cbd-460a-af3c-18181e97c939-4778025-1-c000.snappy.parquet    part-00001-tid-8784851706941089556-33b12105-3cbd-460a-af3c-18181e97c939-4778025-1-c000.snappy.parquet   17501

Using either or yields an error: Passed non-file path

train_dl = DataLoader(make_batch_reader(dataset_url_or_urls='file:/ml/train'), batch_size=32)
train_dl = DataLoader(make_batch_reader(dataset_url_or_urls='file:ml/train/part-00000-tid-8784851706941089556-33b12105-3cbd-460a-af3c-18181e97c939-4778024-1-c000.snappy.parquet'), batch_size=32)

Any idea how I might be able to load from parquet or what I am doing wrong?

selitvin commented 3 years ago

I am not a DataBricks expert, but based on this warning, if you are storing data ondbfs:/, you should be using fuse mount. Perhaps like this: file:/dbfs/ml/train?

I also ran your sample locally and found out an issue that you probably would run into after resolving the dbfs problem. I tweaked the code, and the following snippet runs successfully end-to-end on pyspark 3.0.1:

import numpy as np
from pyspark.ml.linalg import DenseVector
from pyspark.sql import SparkSession
from petastorm import make_batch_reader

from petastorm.spark.spark_dataset_converter import _convert_vector

spark = SparkSession.builder \
    .master('local[2]') \
    .getOrCreate()
sc = spark.sparkContext

features = np.random.uniform(0, 1, size=(1000,4))
rdd1 = sc.parallelize(features)

features_df = rdd1.map(lambda x: {"some_name": DenseVector(x[0:])}).toDF()

features_df = _convert_vector(features_df,'float32')

# write parquet
features_df.repartition(2) \
  .write.mode("overwrite") \
  .option("parquet.block.size", 1024 * 1024) \
  .parquet('file:///tmp/train1')

for b in make_batch_reader(dataset_url_or_urls='file:///tmp/train1'):
    print(b)
  1. Note the features_df = _convert_vector(features_df,'float32') call. It converts DenseVector into an array type which is compatible with petastorm. Would advise against using Petastorm's implementation of _convert_vector since it is not part of public API. The implementation of it is trivial to replicate.
  2. Explicitly named dataframe column some_name. The default name given to it by spark _1 is incompatible with petastorm.

Hope this helps.

afogarty85 commented 3 years ago

That works so well! I cannot thank you enough for all your time and help! For DataBricks, I just had to tweak:

# write parquet
features_df.repartition(2) \
  .write.mode("overwrite") \
  .option("parquet.block.size", 1024 * 1024) \
  .parquet('file:///dbfs/tmp/train1')

for b in make_batch_reader(dataset_url_or_urls='file:///dbfs/tmp/train1'):
    print(b)
selitvin commented 3 years ago

I am glad it was resolved for you. I am always happy to hear about the ways petastorm users use the library, so it was a pleasure for me to help. Hope petastorm will bring value to your project.