uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

removed_fileds not acknowledged in arrow_reader_worker #416

Closed praateekmahajan closed 5 years ago

praateekmahajan commented 5 years ago

Sadly won't be able to reproduce the error quickly :

transform = TransformSpec(lambda x : x, removed_fields=["columnA"])
reader = make_batch_reader(dataset_url, transform_spec=transform)
# next(reader) gives an error

I was trying to trace the error and realised that

  1. reader.schema : doesn't have columnA
  2. arrow_reader_worker.py calls schema.make_namedtuple(**result_dict) where result_dict comes from worker workers_pool.get_results(). The result_dict seems to have columnA in it.

The error is following :

~/Desktop/github/petastorm/petastorm/reader.py in __next__(self)
    598     def __next__(self):
    599         try:
--> 600             return self._results_queue_reader.read_next(self._workers_pool, self.schema, self.ngram)
    601         except StopIteration:
    602             self.last_row_consumed = True

~/Desktop/github/petastorm/petastorm/arrow_reader_worker.py in read_next(self, workers_pool, schema, ngram)
     71             print(f"The schema has the following keys : \n {schema._fields.keys()}")
     72             print(f"The resulting dict has the folowing keys {result_dict.keys()}")
---> 73             return schema.make_namedtuple(**result_dict)
     74 
     75         except EmptyResultError:

~/Desktop/github/petastorm/petastorm/unischema.py in make_namedtuple(self, **kargs)
    233                 typed_dict[key] = None
    234         print(typed_dict.keys())
--> 235         return self._get_namedtuple()(**typed_dict)
    236 
    237     def make_namedtuple_tf(self, *args, **kargs):

TypeError: __new__() got an unexpected keyword argument 'columnA'
praateekmahajan commented 5 years ago

Suggested (quick) solution is to add an if condition inside the read_next to ensure that column.name in schema._field.keys().

One can have a look here at my fork here.

selitvin commented 5 years ago

Seems to me that the problem her is what appears to be a confusing interface. I see that you assumed that

transform = TransformSpec(lambda x : x, removed_fields=["columnA"])

removes "columnA" from the data-frame for you, while my intent was to indicate that your lambda actually removes the column. I can see how this is confusing. I would expect the code to be like this:

def delColumnA(x):
   del x['columnA']
   return x

transform = TransformSpec(delColumnA, removed_fields=["columnA"])

I'll prepare a diff to support syntax like this: TransformSpec(removed_fields=["columnA"]). Does it makes sense to you?

selitvin commented 5 years ago

Sent you an invite to collaborate on Petastorm. Would appreciate your help reviewing #417 with the fix. Thanks!

praateekmahajan commented 5 years ago

Hey @selitvin, I'd love to review the PR, looks like I don't have the right perms. Also looks like few CircleCI tests are failing!