uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

Make data transform logic a bit easier for non-TensorFlow users. #465

Closed gregw18 closed 3 years ago

gregw18 commented 4 years ago

Previously, when using make_reader*'s transform_spec argument, both TensorFlow and non-TensorFlow users had to specify what changes they were going to make to the fields returned by the reader, even though only TensorFlow needed to know ahead of time. Now, non-TensorFlow users can just provide a transform function. Hopefully resolves issues #367, #297, #355 and #402.

gregw18 commented 4 years ago

Thanks for the clear and thoughtful review! I apologize for making you feel that you had to spend your Sunday night doing this ... With one exception, I think that I understand your suggestions and agree with them. I had an unstated assumption when I was working on this - that I only needed to check the first batch of data returned using a given Unischema, because all data passing through a given Reader would be the same format. My reasoning, based on my limited experience with machine learning systems, was that you have to feed them the same format of data in each batch. Thus, the data coming from disk would always be the same format, and, while it would certainly be possible to return different fields from a transform function based on arbitrary conditions, it would be self-defeating to do so, and thus checking the format for each batch would be unnecessary overhead. Can you can confirm that it would be best to check every set of data returned from a given reader? If you're happy with the transform parameter change to the make_reader calls, I'll also update the code examples to use transform rather than transform_spec, so that the documentation should be in sync.

selitvin commented 4 years ago

Hey - definitely no need to apologize - it's me that should apologize for not reviewing this earlier. :) I agree with your assumptions that the data format is not expected to change between consequent examples. My reasoning for actually making the check (I assume we are talking only about TransformSepc case, as callable we don't have a schema declaration anyway):

  1. I did not check this edge case, so I might be wrong about it: with a parquet store with multiple files, the schema of the each file might be different: e.g. by-date partitions, and then the schema mutates at some date. The first batch might have correct type, but others (read from different parquet files) might end-up having different fields.
  2. Would think that checking for a set of field names/types per batch (or per record, assuming rows are big, as in make_reader case) should be pretty cheap and go unnoticed.
  3. Mutating schema, as in (1) can be handled by pytorch models and maybe desirable (as python code in the data reading layer may adapt to the schema changes.).

I think your treatment of transform_spec / transform argument is good. Although we'll need to deprecate transform_spec in the future, it's better to do what you propose since it will lead us to a cleaner API.

gregw18 commented 4 years ago

Stepping back a bit, I would like to verify that what I’m trying to accomplish matches what you would like to see done. Again, I don’t have a personal direction that I want to push this project in – I just want to make sure that my limited experience in this area isn’t causing me to head in the wrong direction. My original goals were: 1. To make doing transformations a bit easier for non-TF users, by not requiring them to create a full TransformSpec. 2. To make schema discrepancy errors for TF users more obvious, by giving a warning about the schema discrepancy before crashing. From a usage perspective, non-TF users can always just use a transform function, regardless of whether they are adding or removing fields, and we should just work. For TF users, if they are not adding or removing fields they can also use a transform function, but if they are making schema changes, they need to use a full TransformSpec so that TF knows what fields will be returned. In terms of checking returned data against the expected schema, if the user provided a full TransformSpec, I think we should check it regardless of whether we’re running under TF, as a discrepancy may be an inadvertent error. If they provided just a callable, if non-TF probably shouldn’t check – the user is telling us to trust them. If running under TF, and the callable does add or remove fields, I would still like to tell the user why we’re about to crash. (Even though the user should know better than to do this.) If a given reader is returning different fields over time, for non-TF we should just work. However, for TF, even a TransformSpec can only specify one set of differences from the stored schema, which, with TF’s fixed graph, means that we’re going to crash. However, it would still be good to explain why beforehand. All of which seems to indicate that I should check all data returned by a reader, not just the first batch, but I also need to be able to tell if running under TF or not – which I haven’t been able to figure out how to do. Finally, if we do agree on checking a TransformSpec for the non-TF case, I would lean towards just checking the first batch of data. If there is a discrepancy, and it is intentional, or it is because the fields returned vary over time, I wouldn’t want to emit a huge number of identical warnings. Thoughts?

selitvin commented 4 years ago

Thank you for the detailed brain-dump!

I agree with all your points. My interpretation of what I understood:

  1. TransformSpec provided --> always check compliance with output schema. if a user declared post-transform schema, he should comply (otherwise, it's likely have been an inadvertent error).
  2. callable: non-TF - no schema was declared, user can do whatever they want. Not sure we need to do an extra effort checking against the schema used for graph building, although we could (in tf_utils.py module, perhaps we could capture the schema and check as part of py_func body)

I would lean towards just checking the first batch of data...., I wouldn’t want to emit a huge number of identical warnings. If we raise an error in (1), that wouldn't be the problem, right? Once user-transform did something non-compliant we abort the execution.

gregw18 commented 4 years ago

I've implemented most of your recommendations, but am struggling with hiding reader.schema when not using a TransformSpec and non-TF. I've considered two implementations:

  1. Always saving the schema but hiding it in the non-TransformSpec case until tf_tensors is called, which would set a flag in the reader to make it visible. This assumes that tf_tensors is always called when using TensorFlow.
  2. Passing an "is_tensorflow" parameter to make_reader and make_batch_reader. However, I'm not clear what the benefit of hiding the schema is, and thus whether it outweighs the extra complexity of sometimes hiding it.
gregw18 commented 4 years ago

I'm struggling with implementing my last goal for this project - giving TF users an understandable warning/error if the data being returned doesn't match what we expect. Some options are given below, along with my opinion of each. Any feedback would be appreciated. Also, reader.schema is still always available because it is used by tf_utils.py in various places. Was the goal of hiding it in some cases to avoid providing invalid data in cases where it doesn't match the returned data? Options:

  1. Submit what I currently have – return an error if user provides a TransformSpec but the returned data doesn’t match what is expected. Otherwise, no longer crashes for non-TF if format doesn’t match expected, but TF still crashes, with a rather cryptic error.
  2. Add a warning if using TF but not using a TransformSpec, by doing checking on main thread, using a flag set in dequeue_sample_impl.
  3. Add a warning if using TF but not using a TransformSpec, using a flag set in dequeue_sample_impl, by creating separate thread to do checking, or by letting existing threads know that I want checking.
  4. Add a new parameter to make_reader/make_batch reader, say check_returned_format, which reader.init passes to threads/processes (which do the checking), when created. If the parameter is not provided, default to true if TransformSpec or transform provided, false otherwise. Suggest in documentation that always be set to true if using TF.

Thoughts:

  1. Easy to do, but doesn’t meet all of my goals – if not using TransformSpec, doesn’t explain data mismatch crashes to TF users.
  2. Relatively easy to implement, albeit a bit ugly (have to do in tf_utils.py dequeue_sample_impl in _tf_tensors_nonngram and _tf_tensors_ngram, and validation logic has to handle the different data structures in the main thread vs in the worker_pool), and there are possible performance problems.
  3. Not sure how or where to create new thread, how it should ensure that data is checked before it is passed on to reader.next. Definitely don’t want to create yet another stage to processing. Probably too late to modify parameters for existing threads/processes, as they will have already passed some data into the next stage.
  4. Easy to implement. Consistent and obvious for callers, but is a change from current behavior.