uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

No need to specify `removed` argument in `TransformSpec` #367

Open selitvin opened 5 years ago

selitvin commented 5 years ago

We can deduce the list of the removed fields by examining return value of the user transform function:

def user_transform(row):
   del row['deleted_field']
gregw18 commented 4 years ago

I've been looking at some Unischema checking issues - #297, #355, #367 and #402 - and have started work on supporting adding and removing fields. I would like to confirm that I'm heading in the right direction. My current goals are:

  1. When running under TensorFlow, emit helpful warnings before crashing - since can't change outputs after graph is created.

  2. When not running under TensorFlow, want to work with whatever fields we're given.

  3. This is just my opinion, but I'm not sure how much value there is in allowing changing the type of existing fields in a Unischema. Adding a new field (with a new name) and removing the old one would seem to give equivalent functionality, and not require any additional code. (Changing the contents of an existing field, without changing its type, doesn't require any changes to the schema and so should continue to be fine.)

Does this approach sound acceptable?

selitvin commented 4 years ago

Thank you for helping with this!

Not sure I fully understand the context. The schema exposed by the Reader object already includes adjustments declared by a TransformSpec. In this context I am not sure I understand:

  1. When running under TensorFlow, emit helpful warnings before crashing - since can't change outputs after graph is created. Is this achieved by validating the sample at the runtime against the schema?

started work on supporting adding and removing fields To clarify: which classes are affected? When fields are added/removed? Does this mean that the schema could change per-returned-sample?

Can you please elaborate, which classes are expected to change (and how)?

gregw18 commented 4 years ago

Sorry - I've been staring at these issues for too long and lost their context. I believe that all four of the issues I mentioned are discussing when the user defines a TransformSpec.func that makes changes (adds or removes fields) that they didn't list in edit_fields or removed_fields. The general idea, I inferred, was that if running under TensorFlow, we can't handle this - we will crash, but we could be a bit friendlier and emit some helpful excuses before we die. If not running under Tensorflow, as long as we update the appropriate Unichema instance, based on the data returned from the TransformSpec.func, it should be possible to work, and the user doesn't really even need to populate the edit_fields and removed_fields elements of TransformSpec. At the moment I've just hacked together a proof of concept (including tests) to prove to myself that I can do this - it's not handling all cases yet. If we agree on the goals, I would start with a clean branch, move over the required code (you wouldn't believe how many print statements I've scattered around to help understand how this is working) and then finish implementing. I've moved the py_dict_reader_worker.py._apply_transform_spec method to TransformSpec, and modified ArrowReaderWorker._load_rows and _load_rows_with_predicate to also call a new shared function in TransformSpec. (Currently they have slightly different versions of the transform code - with_predicate doesn't manually remove any fields listed in removed_fields that TransformSpec.func didn't remove.) I'm no longer sure that I need to update this UniSchema (Arrow/PyDictReaderWorker._schema) - I haven't tested yet. I've moved the logic from the original source files to TransformSpec to clarify the symmetry, but if I don't need to update these schemas I might leave the code where it is, but still ensure that ArrowReaderWorker's two methods are doing the same thing. The main changes are in UniSchema - make_namedtuple and make_namedtuple_tf both call a new method that verifies that the data that they receive matches the UniSchema._fields dict. I've implemented code to add/remove fields as necessary if they don't. I haven't done the warnings for TF yet. (There's also a new property to track whether we've done this verification for this instance yet, so that it doesn't run every time we get a new chunk of data.) In both modes, when the Unischema._fields didn't match the fields returned by TransformSpec.func the crash was happening in the "return self._get_namedtuple()..." lines, as the fields expected by the namedtuple didn't match the fields provided in the args and kargs. To answer your questions: Yes, the tensorflow warnings come from validating the sample at runtime against Unischema._fields. For non-tf, fields are added/removed from Unischema._fields the first time Unischema.make_namedtuple is called with real data. If desired, the check could be done each time a sample is returned, but I didn't consider that a likely scenario.

selitvin commented 4 years ago

Awesome, thank you very much for the detailed explanation! Now I think I understand - I agree. There are painpoints induced on non TF users in the current design. Please, see if I reflect your ideas properly:

Translating this into the API/behavior spec:

  1. Add transform= argument to make_reader and make_batch_reader. Issue a deprecation warning if transform_spec is used.
  2. transform= can take either a TransformSpec instance or any python callable.
  3. If transform= is a TransformSpec, user will: a. Get a proper, transformed unischema from reader.schema, as they do today. b. Get a nice crash if transform function did not produce a dictionary matching the transformed schema
  4. If transform= is a callable: a. We raise a runtime error if reader.schema is accessed.
  5. We take _get_namedtuple out of Unischema class and make it simply do dict-->namedtuple conversion (should be careful not to create a new class everytime, but reuse the same one)

Did I get it right, or I missed something?

gregw18 commented 4 years ago

At first glance, I have two concerns:

  1. There appears to be a current use case that would no longer work - it appears to be possible to remove fields from the returned data by listing them in TransformSpec.removed_fields, without providing a TransformSpec.func. (According to my reading of the comments in PyDictReaderWorker._apply_transform_spec and ArrowReaderWorker._load_rows. Though, ArrowReaderWworker._load_rows_with_predicate doesn't seem to implement this.) It should be possible to allow this by making a special case in 3.b, update the Unischema if not running under TensorFlow.
  2. I believe that we would also have to update PyDictReaderWorker and ArrowReaderWorker to accept either a TransformSpec or a callable.
selitvin commented 4 years ago

update the Unischema if not running under TensorFlow. (3) is a 'stricter' case that is a must for TF, however, non-TF users can also use it. They simply care less about reader.schema attribute being available. I forgot about the use-case you rightfully mentioned: specifying TransformSpec's remove. Should be possible to keep it working.

  1. Yep. These should be updated.

Sounds like a plan!

gregw18 commented 4 years ago

I'm afraid that I've veered off course a bit from what we'd agreed to. As I'm working on this to get some real-world experience with Python and in this application area, I'm completely open to any changes and feedback. Here's what I've implemented, in the pull request that I just submitted. Currently, when want to transform the data retrieved, for tf users, if actual fields returned don’t match what we were told to expect, we crash, with a not particularly helpful message. Non-tf users also crash, even though we should be able to return the data that their transform provides us. Thus, want to make changes so that if running under tf and a transform modified the fields returned such that they don’t match the expected schema, we will give the user an explanation before crashing (since we can’t do anything to prevent tf from crashing.) For non-tf, we don’t need to be told beforehand what return format will be - don’t crash and don’t require that we be told. However, still emit warning, in case discrepancy is inadvertent.

  1. Add transform argument to make_reader and make_batch_reader. Issue a deprecation warning if transform_spec is used.
  2. transform can take either a Transformspec instance or any python callable.
  3. If transform is a TransformSpec, or transform_spec is provided: a. If transform function did not produce a dictionary matching the transformed schema, emit warning regarding differences. If running under TensorFlow, crash. If not running under TensorFlow, don’t crash.
  4. If transform is a callable: a. Raise a runtime error if reader.schema is accessed.
  5. Move _get_namedtuple and make_namedtuple out of Unischema class. _get_namedtuple simply does dict->namedtuple conversion, reusing the same class every time.

There are also some choices that I made that I'm not sure about:

  1. I ended up keeping reader.schema for all cases, so that I could provide a warning for the non-tf case. However, I'm not providing access to it as per 4a above, in case it doesn't match the actual structure.
  2. I moved _get_namedtuple and make_namedtuple out of the Unischema class, but left make_namedtuple_tf inside it, as it requires access to _fields, but I don't really like the asymmetry.
  3. I hardcoded the UniSchema name to "fixed" in make_namedtuple, as it is no longer is associated with a UniSchema, and, the lookup will still be correct as long as there aren't two objects with the same field names but with different types for a given field name. Which feels a bit weak...

All feedback is welcomed!