uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

InvalidArgumentError: parquet array column not being transformed to dataset #458

Open DASpringate opened 4 years ago

DASpringate commented 4 years ago

I'm trying to read in data from parquet for a language model.

The parquet contains two columns:

When I try the code below I get an InvalidArgumentError. This seems to be because the int array in the parquet file is not getting transformed to a tf.Dataset but is just passed through as is.

def transform_reader(reader, batch_size, buffer_size, vocab_size):
  def transform_input(x):
    features = x.feature_vec
    target = tf.one_hot(x.target, depth = vocab_size)
    return features, target
  return make_petastorm_dataset(reader).map(transform_input)                                              \
                                       .apply(tf.data.experimental.unbatch()).shuffle(buffer_size)        \
                                       .batch(batch_size, drop_remainder = True)

with make_batch_reader('file:/dbfs/' + train_path, num_epochs = None) as train_reader:
  train_dataset = transform_reader(train_reader, batch_size, buffer_size, vocab_size)

  model = Sequential()
  model.add(Embedding(vocab_size, 50, mask_zero = True, input_length = None))
  model.add(LSTM(64, return_sequences = False))
  model.add(Dense(vocab_size, activation = 'softmax'))
  model.compile(loss = 'categorical_crossentropy', optimizer =  'adam', metrics =  ['categorical_accuracy'])  

  model.fit(train_dataset, epochs = num_epochs, steps_per_epoch = (train_size // batch_size),
              verbose = 1)

warning:

WARNING:tensorflow:Entity <function make_petastorm_dataset.<locals>.<lambda> at 0x7f5819d2b9d8> could not be transformed and will be executed as-is. Please report this to the AutoGraph team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output. Cause: expected exactly one node node, found []
WARNING: Entity <function make_petastorm_dataset.<locals>.<lambda> at 0x7f5819d2b9d8> could not be transformed and will be executed as-is. Please report this to the AutoGraph team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output. Cause: expected exactly one node node, found []
WARNING:tensorflow:Entity <function make_petastorm_dataset.<locals>.<lambda> at 0x7f5818cac378> could not be transformed and will be executed as-is. Please report this to the AutoGraph team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output. Cause: expected exactly one node node, found []
WARNING: Entity <function make_petastorm_dataset.<locals>.<lambda> at 0x7f5818cac378> could not be transformed and will be executed as-is. Please report this to the AutoGraph team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output. Cause: expected exactly one node node, found []

error:

InvalidArgumentError                      Traceback (most recent call last)
<command-2202319388736985> in <module>
      9 
     10   model.fit(train_dataset, epochs = num_epochs, steps_per_epoch = (train_size // batch_size),
---> 11               verbose = 1)

/databricks/python/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py in fit(self, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, max_queue_size, workers, use_multiprocessing, **kwargs)
    778           validation_steps=validation_steps,
    779           validation_freq=validation_freq,
--> 780           steps_name='steps_per_epoch')
    781 
    782   def evaluate(self,

/databricks/python/lib/python3.7/site-packages/tensorflow/python/keras/engine/training_arrays.py in model_iteration(model, inputs, targets, sample_weights, batch_size, epochs, verbose, callbacks, val_inputs, val_targets, val_sample_weights, shuffle, initial_epoch, steps_per_epoch, validation_steps, validation_freq, mode, validation_in_fit, prepared_feed_values_from_dataset, steps_name, **kwargs)
    272           # `ins` can be callable in tf.distribute.Strategy + eager case.
    273           actual_inputs = ins() if callable(ins) else ins
--> 274           batch_outs = f(actual_inputs)
    275         except errors.OutOfRangeError:
    276           if is_dataset:

/databricks/python/lib/python3.7/site-packages/tensorflow/python/keras/backend.py in __call__(self, inputs)
   3290 
   3291     fetched = self._callable_fn(*array_vals,
-> 3292                                 run_metadata=self.run_metadata)
   3293     self._call_fetch_callbacks(fetched[-len(self._fetches):])
   3294     output_structure = nest.pack_sequence_as(

/databricks/python/lib/python3.7/site-packages/tensorflow/python/client/session.py in __call__(self, *args, **kwargs)
   1456         ret = tf_session.TF_SessionRunCallable(self._session._session,
   1457                                                self._handle, args,
-> 1458                                                run_metadata_ptr)
   1459         if run_metadata:
   1460           proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)

InvalidArgumentError: 2 root error(s) found.
  (0) Invalid argument: transpose expects a vector of size 4. But input(1) is a vector of size 3
     [[{{node lstm_107/transpose}}]]
     [[lstm_107/ExpandDims/_6817]]
  (1) Invalid argument: transpose expects a vector of size 4. But input(1) is a vector of size 3
     [[{{node lstm_107/transpose}}]]
0 successful operations.
0 derived errors ignored.

But, if I convert the dataset to an iterator and then run the output X and Ys separately, it runs as expected for that batch:

with make_batch_reader('file:/dbfs/' + train_path, num_epochs = None) as train_reader:
  train_dataset = transform_reader(train_reader, batch_size, buffer_size, vocab_size)
  iterator = train_dataset.make_one_shot_iterator()
  tensor = iterator.get_next()

  with tf.Session() as sess:
    features, target = sess.run(tensor)

    model = Sequential()
    model.add(Embedding(vocab_size, 50, mask_zero = True, input_length = None))
    model.add(LSTM(64, return_sequences = False))
    model.add(Dense(vocab_size, activation = 'softmax'))
    model.compile(loss = 'categorical_crossentropy', optimizer =  'adam', metrics =  ['categorical_accuracy'])  

    model.fit(x = features, y = target,  verbose = 1)
32/600 [>.............................] - ETA: 48s - loss: 6.8960 - categorical_accuracy: 0.0000e+00
 64/600 [==>...........................] - ETA: 28s - loss: 6.8956 - categorical_accuracy: 0.0000e+00
 96/600 [===>..........................] - ETA: 21s - loss: 6.8949 - categorical_accuracy: 0.0208    
128/600 [=====>........................] - ETA: 18s - loss: 6.8938 - categorical_accuracy: 0.0703
160/600 [=======>......................] - ETA: 15s - loss: 6.8930 - categorical_accuracy: 0.0812
192/600 [========>.....................] - ETA: 13s - loss: 6.8922 - categorical_accuracy: 0.0938
224/600 [==========>...................] - ETA: 11s - loss: 6.8917 - categorical_accuracy: 0.1027
256/600 [===========>..................] - ETA: 10s - loss: 6.8909 - categorical_accuracy: 0.1016
288/600 [=============>................] - ETA: 9s - loss: 6.8899 - categorical_accuracy: 0.1146 
320/600 [===============>..............] - ETA: 8s - loss: 6.8892 - categorical_accuracy: 0.1125
352/600 [================>.............] - ETA: 7s - loss: 6.8880 - categorical_accuracy: 0.1250
384/600 [==================>...........] - ETA: 6s - loss: 6.8869 - categorical_accuracy: 0.1328
416/600 [===================>..........] - ETA: 5s - loss: 6.8858 - categorical_accuracy: 0.1346
448/600 [=====================>........] - ETA: 4s - loss: 6.8844 - categorical_accuracy: 0.1429
480/600 [=======================>......] - ETA: 3s - loss: 6.8836 - categorical_accuracy: 0.1417
512/600 [========================>.....] - ETA: 2s - loss: 6.8821 - categorical_accuracy: 0.1465
544/600 [==========================>...] - ETA: 1s - loss: 6.8807 - categorical_accuracy: 0.1489
576/600 [===========================>..] - ETA: 0s - loss: 6.8784 - categorical_accuracy: 0.1545
600/600 [==============================] - 16s 27ms/sample - loss: 6.8767 - categorical_accuracy: 0.1567

Why should this work but the full dataset does not?

DASpringate commented 4 years ago

The InvalidArgumentError is happening in the second (lstm) layer, which doesn't make sense to me since this should just fit the output of the first embedding layer

selitvin commented 4 years ago

Not sure what's going on here - don't have much experience working with Keras/LSTM. Tried reproducing your example locally and observed that the batches were constructed properly. However, you already knew that since in your example, fetching the data to Python and then feeding it back to keras worked fine...

DASpringate commented 4 years ago

Thanks. There's a reproducible toy example of the issue here. Is support for keras/lstm on the roadmap?

selitvin commented 4 years ago

I was under impression that as long as we have a tf.data.Dataset object, we are covered with Keras. Was my assumption incorrect? Let's try figuring out what's going on here. Maybe it can be fixed?

selitvin commented 4 years ago

Digged a little bit. I am not a Keras/LSTM expert, so maybe you can help here. What I saw that the issue is caused by a transpose operator in tensorflow/python/keras/layers/recurrent.py, swap_batch_timestep function.

During graph construction the static shape is assumed to be [?, ?, 20], but in graph evaluation it gets [10 1 100 1], hence the transpose fails.

I double checked the static and dynamic shapes of the tensors returned by make_petastorm_dataset and they appeared to be as expected.

  def swap_batch_timestep(input_t):
    # Swap the batch and timestep dim for the incoming tensor.
    axes = list(range(len(input_t.shape)))
    axes[0], axes[1] = 1, 0
    import tensorflow as tf
    input_t = tf.Print(input_t, ['Static shape:', str(input_t.shape), 'Dynamic shape:', tf.shape(input_t)],  summarize=1000)
    return array_ops.transpose(input_t, axes)

and got:

[Static shape:][(?, ?, 1)][Dynamic shape:][10 1 100 1]

Does it hint you anything?